效率提升:
Java 线程 线程创建
重写thread的run方法
创建runnable 抽象出来任务
FutureTask 带返回值 线程间通信
核心Thread是创建一个线程,其中run方法或者Runnable只是代表具体的任务。如果main线程中调用Runnable.run该任务就是main执行的
extends Thread 并重写run方法,
1 2 3 4 5 6 7 8 9 Thread t = new Thread () { public void run () { } }; t.start();
Runnable :当成参数传给thread,run方法默认会检查Runnable如果有就执行Runnable.run
1 2 3 4 5 6 7 8 9 10 11 12 Thread thread = new Thread (new Runnable () { public void run () { } });Thread thread = new Thread (() -> { }); thread.start();
FutureTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class ThreadDemo { public static void main (String[] args) { Callable call = new MyCallable (); FutureTask<String> task = new FutureTask <>(call); Thread t = new Thread (task); t.start(); try { String s = task.get(); System.out.println(s); } catch (Exception e) { e.printStackTrace(); } }public class MyCallable implements Callable <String> { @Override public String call () throws Exception { return Thread.currentThread().getName() + "->" + "Hello World" ; } }
tasklist jps
taskkill
top -H -p pid jstack
查看进程中线程信息
每一个线程都有一个独立的栈,栈内每个函数都会有栈帧。main线程中的三个函数:
上下文切换 切换当前执行的线程
CPU时间片
垃圾回收
更高优先权
线程主动调用sleep、yield、wait、 join、 park、 synchronized、 lock
如何保存上下文信息?
线程内程序计数器 记录运行到哪里
栈帧记录变量信息
API
方法
说明
public void start ()
启动一个新线程,Java虚拟机调用此线程的 run 方法
public void run ()
线程启动后调用该方法
public void setName(String name)
给当前线程取名字
public void getName()
获取当前线程的名字 线程存在默认名称:子线程是 Thread-索引,主线程是 main
public static Thread currentThread()
获取当前线程对象,代码在哪个线程中执行
public static void sleep (long time)
让当前线程休眠多少毫秒再继续执行 Thread.sleep(0) : 让操作系统立刻重新进行一次 CPU 竞争
public static native void yield ()
提示线程调度器让出当前线程对 CPU 的使用
public final int getPriority()
返回此线程的优先级
public final void setPriority (int priority)
更改此线程的优先级,常用 1 5 10
public void interrupt()
中断这个线程,异常处理机制
public static boolean interrupted()
判断当前线程是否被打断,清除打断标记
public boolean isInterrupted()
判断当前线程是否被打断,不清除打断标记
public final void join ()
等待这个线程结束
public final void join(long millis)
等待这个线程死亡 millis 毫秒,0 意味着永远等待
public final native boolean isAlive()
线程是否存活(还没有运行完毕)
public final void setDaemon(boolean on)
将此线程标记为守护线程或用户线程
state
操作系统层面 :新建 就绪 运行 阻塞(io) 终止
java层面 :其中Runnable包含 就绪 运行 阻塞(io)
程状态
导致状态发生条件
NEW(新建)
线程刚被创建,但是并未启动,还没调用 start 方法,只有线程对象,没有线程特征
Runnable(可运行)
线程可以在 Java 虚拟机中运行的状态,可能正在运行自己代码,也可能没有,这取决于操作系统处理器,调用了 t.start() 方法:就绪(经典叫法)
Blocked(阻塞)
当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入 Blocked 状态;当该线程持有锁时,该线程将变成 Runnable 状态 synchronized
EntryList
Waiting(无限等待)
一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入 Waiting 状态. wait
(WaitSet) join
(原理wait) park
Timed Waiting (限期等待)
有几个方法有超时参数,调用将进入 Timed Waiting 状态,这一状态将一直保持到超时期满或者接收到唤醒通知。常用方法有 Thread.sleep(time) 、wait(time)
join(time)
parkUntil(time)
Teminated(结束)
run 方法正常退出而死亡,或者因为没有捕获的异常终止了 run 方法而死亡
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
interrupt API
public void interrupt()
:打断这个线程,异常处理机制
public static boolean interrupted()
:判断当前线程是否被打断,打断返回 true,清除打断标记
public boolean isInterrupted()
:判断当前线程是否被打断,不清除打断标记
线程处于阻塞 状态:如果线程当前处于阻塞状态,如调用了 Thread.sleep()
、Object.wait()
、join
BlockingQueue.take()
等阻塞方法,调用 interrupt()
方法会中断线程的阻塞状态,抛出 InterruptedException
异常,打断标记。因为线程都不在运行,所以需要抛异常来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (()->{ try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1" ); t1.start(); Thread.sleep(500 ); t1.interrupt(); System.out.println(" 打断状态: {}" + t1.isInterrupted()); }
线程处于非阻塞 状态:如果线程当前处于非阻塞状态,调用 interrupt()
方法会将线程的中断状态设置为 true
,但不会中断线程的执行。可以通过检查中断状态来决定是否终止线程的执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Thread.sleep(2000 ); myThread.interrupt();class MyThread extends Thread { @Override public void run () { while (!Thread.currentThread().isInterrupted()) { System.out.println("Thread is running." ); } System.out.println("Thread interrupted. Exiting..." ); } }
两阶段终止 功能:记录系统的利用率,但需要能停止下来。 如果在sleep时被打断,则手动标记一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class TPTInterrupt { private Thread thread; public void start () { thread = new Thread (() -> { while (true ) { Thread current = Thread.currentThread(); if (current.isInterrupted()) { log.debug("料理后事" ); break ; } try { Thread.sleep(1000 ); log.debug("将结果保存" ); } catch (InterruptedException e) { current.interrupt(); } } },"监控线程" ); thread.start(); } public void stop () { thread.interrupt(); } }
打断park park阻塞线程类似于一直sleep,但被打断不会清空标记。需要标记为假时才生效
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) throws Exception { Thread t1 = new Thread (() -> { System.out.println("park..." ); LockSupport.park(); System.out.println("unpark..." ); System.out.println("打断状态:" + Thread.currentThread().isInterrupted()); }, "t1" ); t1.start(); Thread.sleep(2000 ); t1.interrupt(); }
join
public final void join()
等待该线程执行完成,原理上可以使用信号量PV
join(long millis)
最大等待时间
守护线程 当其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
t1.setDaemon(true);
垃圾回收器线程就是一种守护线程
Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等待它们处理完当前请求
共享模型之并发 访问共享变量时,代码的原子性(互斥)以及并发协调(同步)
共享问题
synchronized
线程安全分析
Monitor
wait/notify
线程状态转换
活跃性
Lock 加以改进
1 2 3 4 5 6 7 8 9 10 11 12 13 static int counter = 0 ; Thread t1 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { counter++; } }, "t1" ); Thread t2 = new Thread (() -> { for (int i = 0 ; i < 5000 ; i++) { counter--; } }, "t2" );
阻塞:synchronized、lock
非阻塞:原子变量
synchronized 对象锁,同一时刻只有一个线程获取到针对该对象的锁,获取失败进入等待队列
1 2 3 4 5 6 7 8 9 synchronized (对象) { 临界区 } 大括号相当于 获取锁 + 释放锁。 对象相当于locked 没有获取到时会阻塞释放资源 相当于Futex void lock () { while (xchg(&locked, 1 )) ; }void unlock () { xchg(&locked, 0 ); }
加在成员方法上,锁对象 synchronized(this)
1 2 3 4 5 public synchronized void methodName () { } 同一类中的多个函数,如果synchronized 了 也不能并行,因为对象被锁了而不是函数被锁了
加在静态方法上,锁类对象 synchronized(MyClass.class)
1 2 3 4 5 public class MyClass { public static synchronized void staticMethod () { } }
局部变量理论是线程安全的(每个栈都有栈帧),成员变量和静态变量不是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class ThreadSafe { public final void method1 (int loopNumber) { ArrayList<String> list = new ArrayList <>(); for (int i = 0 ; i < loopNumber; i++) { method2(list); method3(list); } } private void method2 (ArrayList<String> list) { list.add("1" ); } private void method3 (ArrayList<String> list) { list.remove(0 ); } }
但如果方法是public并且在子类中被修改了,就可能出错,所以private 或者 final的修饰符是有必要的,满足开闭 原则
1 2 3 4 5 6 7 8 class ThreadSafeSubClass extends ThreadSafe { @Override public void method3 (ArrayList<String> list) { new Thread (() -> { list.remove(0 ); }).start(); } }
常见类 线程安全的类: 单一方法是安全的,组合不一定(想要安全还要额外上锁)
java.lang.String、 java.lang.Integer 不可变对象
java.lang.StringBuffer
java.lang.Float
java.lang.Boolean
java.util.Vector
java.util.Hashtable
java.util.concurrent.ConcurrentHashMap
非线程安全的类:
java.lang.StringBuilder
java.util.ArrayList
java.util.LinkedList
java.util.HashMap
java.util.HashSet
例 MyServlet只有一个,共享的。所以userService也是一个共享的。count不是安全的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class MyServlet extends HttpServlet { private UserService userService = new UserServiceImpl (); public void doGet (HttpServletRequest request, HttpServletResponse response) { userService.update(...); } } public class UserServiceImpl implements UserService { private int count = 0 ; public void update () { count++; } }
单例中的成员变量都是共享的。 改成环绕通知中的局部变量就解决了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Aspect @Component public class MyAspect { private long start = 0L ; @Before("execution(* *(..))") public void before () { start = System.nanoTime(); } @After("execution(* *(..))") public void after () { long end = System.nanoTime(); System.out.println("cost time:" + (end-start)); } }
dao中的数据库连接,不能共享否则被别人close了,所以每个查询都要在局部变量中获取一个
1 2 3 4 5 6 7 8 9 10 public class UserDaoImpl implements UserDao { private Connection conn = null ; public void update () throws SQLException { String sql = "update user set password = ? where username = ?" ; conn = DriverManager.getConnection("" ,"" ,"" ); conn.close(); } }
转账 锁住类对象,简单的解决办法但效率不高
1 2 3 4 5 6 7 8 9 10 class Account { public void transfer (Account target, int amount) { synchronized (Account.class){ if (this .money > amount) { this .setMoney(this .getMoney() - amount); target.setMoney(target.getMoney() + amount); } } } }
monitor 对象头
Mark Word
Monitor Monitor是JVM中提供的一个对象,负责管理某个对象的锁。我们的对象通过MarkWord中的指针指向monitor对象(一对一) c++实现的
获取成功为Owner
失败加入EntryList(还可以先进行自旋 几次,如果还失败才加入,减少上下文切换 自适应);
在thread-2释放时唤醒一个(线程的阻塞和唤醒操作是在Java虚拟机内部 进行的,而不涉及到底层操作系统的系统调用)
字节码角度 1 2 3 4 synchronized (lock) { count++; }
除了正常处理外,19~23为异常处理,也会释放锁
优化 优化:轻量级、偏向锁
轻量级锁 :在竞争比较少的情况下,每次上锁太麻烦了;房门上挂书包 对使用者透明
偏向锁 :直接在房门上课上名字,专属于谁
轻量级锁 锁记录 :线程中负责记录 该线程锁住了哪些对象
加锁:如果对象没被锁(01),通过CAS 让对象头保留锁记录地址,锁记录保存原对象头信息
加锁失败:如果对象已经被锁了(00),锁膨胀 :申请一个monitor,对象头指向monitor,加入entrylist
解锁:CAS再交换回来,如果发现对象被重量级锁锁住了,就进入重量级锁解锁流程
偏向锁 轻量级锁问题:自己调用时,还需要指向CAS 操作(这次一定会失败),偏向锁优化掉这个操作
初始时默认状态就是该状态,但程序加载会有延时
可以手动禁用,或者hashCode()时会禁用(因为放不下,而在轻量级锁记录 重量级monitor会记录hash)
锁消除 锁压根不会发生冲突,则直接被优化掉
1 2 3 4 5 6 public void b () throws Exception { Object o = new Object (); synchronized (o) { x++; } }
wait / notify 介绍 等价于万能条件变量法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Thread threadA = new Thread (() -> { synchronized (lock) { System.out.println("Thread A is waiting" ); lock.wait(); System.out.println("Thread A is resumed" ); } });Thread threadB = new Thread (() -> { synchronized (lock) { System.out.println("Thread B is performing some task" ); lock.notify(); } });
在获取锁后,发现不满足情况,lock.wait()
释放锁并进入WaitSet
1 2 mutex_lock(&lk);if (!cond) cond_wait(&cv, &lk);
在被Owner lock.notify
后,重新进入EntryList
。notifyAll()
唤醒全部
1 2 cond_signal(&cv); broadcast(&cv);
和操作系统不同点 就是这里锁lk和唤醒信号cv都是lock对象
同样可能会存在错误叫醒的情况,while + 广播
1 2 3 4 5 6 7 8 9 synchronized (lock) { while (!cond) { lock.wait(); } }synchronized (lock) { lock.notifyAll(); }
wait sleep区别
wait 是Object方法;sleep是Thread方法
wait必须要先获取锁并且再释放锁,sleep不用且不会释放锁
同步模式之保护性暂停
一个线程等待另外一个线程结果
JDK 中,join 的实现、Future 的实现,采用的就是此模式
但如果是一直产生:消息队列(见生产者/消费者)
如果用join实现,必须要下载线程结束,并且变量要设置为全局的
基本实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class GuardedObject { private Object response; private final Object lock = new Object (); public Object get () { synchronized (lock) { while (response == null ) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return response; } } public void complete (Object response) { synchronized (lock) { this .response = response; lock.notifyAll(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject (); new Thread (() -> { List<String> response = download(); log.debug("download complete..." ); guardedObject.complete(response); }).start(); Object response = guardedObject.get(); }
带超时 直接wait(time) break不行,因为存在虚假唤醒。记录等待时间防止多等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public Object get (long millis) { synchronized (lock) { long begin = System.currentTimeMillis(); long timePassed = 0 ; while (response == null ) { long waitTime = millis - timePassed; log.debug("waitTime: {}" , waitTime); if (waitTime <= 0 ) { log.debug("break..." ); break ; } try { lock.wait(waitTime); } catch (InterruptedException e) { e.printStackTrace(); } timePassed = System.currentTimeMillis() - begin; log.debug("timePassed: {}, object is null {}" , timePassed, response == null ); } return response; } }
扩展多个 多加一个中间者,实现多对多,但其中每一对还是一一对应的。解耦产生和消费; PRC框架
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class Mailboxes { private static Map<Integer, GuardedObject> boxes = new Hashtable <>(); private static int id = 1 ; private static synchronized int generateId () { return id++; } public static GuardedObject getGuardedObject (int id) { return boxes.remove(id); } public static GuardedObject createGuardedObject () { GuardedObject go = new GuardedObject (generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds () { return boxes.keySet(); } }
Join原理 1 2 3 4 5 6 7 线程对象也是对象 A线程调用B.join()时,会先获取锁synchronized 然后执行B.wait(delay); B线程运行结束后,调用notifyAll唤醒所有等待public final synchronized void join (long millis) { wait(0 ); }
异步模式之生产者/消费者
生产者消费者不需要一一对应
JDK中的阻塞队列原理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class MessageQueue { private LinkedList<Message> queue; private int capacity; public MessageQueue (int capacity) { queue = new LinkedList <>(); this .capacity = capacity; } public Message get () { synchronized (queue){ while (queue.size() == 0 ){ try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.notifyAll(); return queue.getFirst(); } } public void add (Message m) { synchronized (queue){ while (queue.size() == capacity){ try { queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.notifyAll(); queue.add(m); return ; } } }
Park & Unpark wait状态,有点像值最大为1的信号量 但是是以线程 为单位 不需要获取锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.locks.LockSupport;Thread t1 = new Thread (() -> { log.debug("start..." ); sleep(1 ); log.debug("park..." ); LockSupport.park(); log.debug("resume..." ); },"t1" ); t1.start(); sleep(2 ); log.debug("unpark..." ); LockSupport.unpark(t1);
不需要monitor,唤醒比较精确
可以先恢复再暂停
原理 每个线程一个parker对象
其中有一个_counter(干粮数量)=0或者1
unpark:_counter++ if(线程在等待) {唤醒, _counter=0}
park:_counter-- if ( _counter<0) {wait , _counter=0}
细粒度锁 睡觉和学习应该能并发,所以需要将锁细粒度化,而不是直接锁住this
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class BigRoom { private final Object studyRoom = new Object (); private final Object bedRoom = new Object (); public void sleep () { synchronized (bedRoom) { log.debug("sleeping 2 小时" ); Sleeper.sleep(2 ); } } public void study () { synchronized (studyRoom) { log.debug("study 1 小时" ); Sleeper.sleep(1 ); } } }
死锁 在一个线程需要获取多把锁时就可能导致
死锁检测 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 jps jconsole jstack pid Found one Java-level deadlock: ============================="Thread-1" : waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0 , a java.lang.Object), which is held by "Thread-0" "Thread-0" : waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0 , a java.lang.Object), which is held by "Thread-1" Java stack information for the threads listed above: ==================================================="Thread-1" : at thread.TestDeadLock.lambda$main$1 (TestDeadLock.java:28 ) - waiting to lock <0x000000076b5bf1c0 > (a java.lang.Object) - locked <0x000000076b5bf1d0 > (a java.lang.Object) at thread.TestDeadLock$$Lambda$2 /883049899. run(Unknown Source) at java.lang.Thread.run(Thread.java:745 )"Thread-0" : at thread.TestDeadLock.lambda$main$0 (TestDeadLock.java:15 ) - waiting to lock <0x000000076b5bf1d0 > (a java.lang.Object) - locked <0x000000076b5bf1c0 > (a java.lang.Object) at thread.TestDeadLock$$Lambda$1 /495053715. run(Unknown Source) at java.lang.Thread.run(Thread.java:745 ) Found 1 deadlock.
活锁 活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。开发中可以增加随机时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 new Thread (() -> { while (count > 0 ) { sleep(0.2 ); count--; log.debug("count: {}" , count); } }, "t1" ).start();new Thread (() -> { while (count < 20 ) { sleep(0.2 ); count++; log.debug("count: {}" , count); } }, "t2" ).start();
饥饿 某个线程始终不能运行,如设置了线程优先级,优先级低的可能难以运行
ReentrantLock
可中断
可以设置超时时间
可以设置为公平锁 先到先得而不是随机
支持多个条件变量 相当于不同条件变量进入不同WaitSet
现在就完全相当于万能条件变量法 等价于synchronized+wait notifyall 升级
都可以重入
java实现的
1 2 3 4 5 6 7 8 reentrantLock.lock();try { } finally { reentrantLock.unlock(); }
可中断 等待锁的过程中可以被叫醒
1 2 3 4 5 6 7 8 9 10 11 12 13 try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("等锁的过程中被打断" ); return ; } lock.lock(); t1.start(); sleep(1 ); t1.interrupt();
可超时 避免无限制的等待
1 2 3 4 5 6 7 8 9 10 11 if (!lock.tryLock()) { log.debug("获取立刻失败,返回" ); return ; }try { log.debug("获得了锁" ); } finally { lock.unlock(); } lock.tryLock(1 , TimeUnit.SECONDS)
解决哲学家 获取锁时,如果右手获取不到,需要立马不等并且左手要解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (left.tryLock()) { try { if (right.tryLock()) { try { eat(); } finally { right.unlock(); } } } finally { left.unlock(); } }
公平锁 会降低并发度 默认为false lock = new ReentrantLock(true);
条件变量 之前等待队列只有一个,直接是lock对象
1 2 3 4 synchronized (lock) { System.out.println("Thread A is waiting" ); lock.wait(); }
在lock基础上,创建一个condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private ReentrantLock lock = new ReentrantLock ();private Condition condition = lock.newCondition();private boolean conditionMet = false ;public void await () throws InterruptedException { lock.lock(); try { while (!conditionMet) { condition.await(); } System.out.println("Condition is met. Resuming execution." ); } finally { lock.unlock(); } }public void signal () { lock.lock(); try { conditionMet = true ; condition.signal(); System.out.println("Condition is signaled." ); } finally { lock.unlock(); } }
同步模式之顺序控制 线程执行顺序
wait notify: 还需要一个额外变量标记代表cond
park unpark 非常简洁
打印指定形状 例如打印abcabc 和打印🐟一个原理
直接万能条件变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void print (int waitFlag, int nextFlag, String str) { synchronized (this ) { while (this .flag != waitFlag) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.print(str); flag = nextFlag; this .notifyAll(); } }
ReentrantLock + condition 每个线程都有等待的cond以及唤醒的cond 感觉没有必要 不如直接上面whlie+notifyAll
1 2 3 4 5 6 7 8 9 10 11 12 13 public void print (String str, Condition current, Condition next) { this .lock(); try { current.await(); log.debug(str); next.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { this .unlock(); } }
park unpark:每次unpark下一个想打印的线程,需要一个数组以获得下一个线程
1 2 3 LockSupport.park(); System.out.print(str); LockSupport.unpark(nextThread());
小结 分析多线程访问共享资源时,哪些代码片段属于临界区
使用 synchronized 互斥解决临界区的线程安全问题
掌握 synchronized 锁对象语法
掌握 synchronzied 加载成员方法this 和静态方法语法 this.getClass()
掌握 wait/notify 同步方法
使用 lock 互斥解决临界区的线程安全问题
掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
相当于synchronzied 的升级
学会分析变量的线程安全性、掌握常见线程安全类 的使用
了解线程活跃性问题:死锁、活锁、饥饿
应用方面
互斥:使用 synchronized 或 Lock 达到共享资源互斥效果
同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果
原理方面
monitor、synchronized 、wait/notify 原理
synchronized 进阶原理 轻量级 偏向锁 锁消除
park & unpark 原理
模式方面
同步模式之保护性暂停
异步模式之生产者消费者
同步模式之顺序控制
内存模型 引入 通过volatile解决由于缓存引发的可见性问题,以及重排序引发的有序性问题
JMM java memory model
原子性
可见性 不受缓存影响
有序性 不受cpu指令并行优化影响
JMM的主要内容包括:
主内存(Main Memory):主内存是所有线程共享的内存区域,用于存储共享变量。主内存中的数据对所有线程可见。
工作内存(Working Memory):每个线程都有自己的工作内存,用于存储线程执行时需要使用的数据。工作内存中包含了主内存中的部分数据副本。
内存间交互操作:JMM定义了一组规则,用于线程在主内存和工作内存之间进行数据交互。这些操作包括读取、写入和同步操作。
顺序一致性(Sequential Consistency):JMM保证线程的执行结果与顺序一致的执行结果相同。即,对于一个线程来说,它的操作将按照程序中的顺序执行,并且对其他线程可见。
可见性(Visibility):JMM保证一个线程对共享变量的修改对其他线程是可见的。这意味着一个线程对变量的修改,将会在之后的操作中对其他线程可见。
原子性(Atomicity):JMM提供了一些原子性的保证。例如,对volatile
变量的读写具有原子性,单个读写操作不会被线程中断。
重排序(Reordering):JMM允许编译器和处理器对指令进行优化和重排序,但要求保持程序的顺序一致性和线程的可见性。
可见性
实际上停不下来,为什么?
while中如果有sout,就可以停下来了 为什么?
1 2 3 4 5 6 7 8 9 10 11 12 static boolean run = true ;public static void main (String[] args) throws InterruptedException { Thread t = new Thread (() -> { while (run) { } }); t.start(); Thread.sleep(1 ); run = false ; }
线程中存储了run = true;
的副本
volatile :强制到主存中读取,修饰成员变量和静态成员变量。不能保证原子性,一个i++ 一个i– 还是会错,适合一个写其他读的情况 (轻量级的同步机制)
synchronized :也可以实现必须去主存读取,但复杂度高。可以实现原子性(代码块内也可能重排序)
改进两阶段 使用volatile标记
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class TPTVolatile { private Thread thread; private volatile boolean stop = false ; public void start () { thread = new Thread (() -> { while (true ) { Thread current = Thread.currentThread(); if (stop) { log.debug("料理后事" ); break ; } try { Thread.sleep(1000 ); log.debug("将结果保存" ); } catch (InterruptedException e) { } } },"监控线程" ); thread.start(); } public void stop () { stop = true ; thread.interrupt(); } }
balking 监控线程只有一个,但如果有人多次start()其实会调用多个。balking避免重复执行某个操作 任务调度
加一个volatile变量if判断 ? 不行 不能保证原子
加synchronized 可以实现,但每次都要synchronized同步比较慢(如用这个实现单例)
缩小synchronized 范围。 为什么不直接用单例 单例是保证只有一个实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class MonitorService { private volatile boolean starting; public void start () { log.info("尝试启动监控线程..." ); synchronized (this ) { if (starting) { return ; } starting = true ; } } }
有序性 流水线
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序 和组合来实现指令级并行,单线程下正确,但多线程下有问题
1 2 3 4 5 6 7 int a = 10 ; int b = 20 ; System.out.println( a + b );int a = 10 ; int b = a - 5 ;
指令重排序问题 1 4为正常输出 但可能出现0(概率比较低)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 int num = 0 ;boolean ready = false ;public void actor1 (I_Result r) { if (ready) { r.r1 = num + num; } else { r.r1 = 1 ; } }public void actor2 (I_Result r) { num = 2 ; ready = true ; }
对变量ready添加volatile会禁用重排序
volatile原理 volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)
对 volatile 变量的写指令后会加入写屏障
对 volatile 变量的读指令前会加入读屏障
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void actor2 (I_Result r) { num = 2 ; ready = true ; }public void actor1 (I_Result r) { if (ready) { r.r1 = num + num; } else { r.r1 = 1 ; } }
可见性:
遇到写屏障(sfence),对所有共享变量的改动,都同步到主存当中
遇到读屏障(lfence),去主存加载最新数据
有序性:
double-check-locking 普通写法
1 2 3 4 5 6 7 8 9 10 11 12 public final class Singleton { private Singleton () { } private static Singleton INSTANCE = null ; public static synchronized Singleton getInstance () { if ( INSTANCE != null ){ return INSTANCE; } INSTANCE = new Singleton (); return INSTANCE; } }
double
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public final class Singleton { private Singleton () { } private static volatile Singleton INSTANCE = null ; public static Singleton getInstance () { if (INSTANCE != null ) { return INSTANCE; } synchronized (Singleton.class) { if (INSTANCE != null ) { return INSTANCE; } INSTANCE = new Singleton (); return INSTANCE; } } }
可见性进阶
synchronized中的变量
volatile修饰的变量
在线程开始前修改变量(可以理解为创建副本)
t1.join() 后,可以看到t1中的修改
t1打断t2, t2.interrupt(); t2可以看到t1的写
对变量默认值的写,其他线程可见
具有传递性 ,在写屏障前的全部修改都可见 y = 10 x = 1 (x是volatile)
单例习题 饿汉式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final class Singleton implements Serializable { private Singleton () {} private static final Singleton INSTANCE = new Singleton (); public static Singleton getInstance () { return INSTANCE; } public Object readResolve () { return INSTANCE; } }
枚举类
1 2 3 4 5 6 7 8 9 enum Singleton { INSTANCE; }
类加载实现懒汉式单例:
1 2 3 4 5 6 7 8 9 10 11 public final class Singleton { private Singleton () { } private static class LazyHolder { static final Singleton INSTANCE = new Singleton (); } public static Singleton getInstance () { return LazyHolder.INSTANCE; } }
无锁并发
本章内容
CAS 与 volatile
原子整数
原子引用
原子累加器
Unsafe
CAS可以实现锁,0代表空闲1代表占用,下一章连接池中使用atomic数组实现多个连接池的锁
reentrantlock底层其实还是CAS,外带一个等待队列(park实现等待) 见原理部分
Atomic 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 AtomicInteger atomicInteger = new AtomicInteger (0 );int value = atomicInteger.get(); atomicInteger.set(10 );int oldValue = atomicInteger.getAndSet(20 );boolean result = atomicInteger.compareAndSet(20 , 30 );int newValue = atomicInteger.getAndIncrement(); newValue = atomicInteger.getAndDecrement(); newValue = atomicInteger.getAndAdd(5 ); newValue = atomicInteger.incrementAndGet(); newValue = atomicInteger.decrementAndGet(); newValue = atomicInteger.addAndGet(5 );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 while (true ) { int prev = balance.get(); int next = prev - amount; if (balance.compareAndSet(prev, next)) { break ; } }public final int getAndAddInt (Object this , long offset, int var4=1 ) { int old; do { old = this .getIntVolatile(this , offset); } while (!this .compareAndSwapInt(this , offset, old, old + 1 )); return old; }
CAS Compare And Swap
:先比较是否是旧值,旧值没被修改才swap(乐观锁)
核心一个函数:this和offset用于确定地址
1 public final native boolean compareAndSwapInt (Object this , long offset, int old, int new) ;
底层:lock cmpxchg
指令(X86 架构)
变量存储在一个volatile 值中,因为每次都要保证可见性
1 private volatile int value;
效率高 :不需要 锁的获取 以及 线程的上下文下切换,但需要更高的cpu资源,受限cpu内核数
原子整数
AtomicBoolean
AtomicInteger
AtomicLong
操作 原理都是while中尝试compareAndSwapInt
getAndIncrement(); incrementAndGet
getAndAdd(10);
getAndUpdate(p -> p * 2);
原子引用
AtomicReference
AtomicMarkableReference
AtomicStampedReference
对象不是基本类型,提供CAS对对象进行操作,compare比较的是地址
AtomicReference 保证多线程环境下取钱操作正常,并且不需要加锁
1 2 3 4 5 6 7 8 9 10 private AtomicReference<Double> balance = new AtomicReference <>(0.0 );public void withdraw (double amount) { Double oldValue; Double newValue; do { oldValue = balance.get(); newValue = oldValue - amount; } while (!balance.compareAndSet(oldValue, newValue)); }
ABA 有人修改了值,但又改回来了,如何察觉到被修改了呢?
AtomicStampedReference 添加版本号,每次操作版本号+1,除了值要匹配,版本号也要匹配
1 2 3 4 5 6 7 8 9 static AtomicStampedReference<String> ref = new AtomicStampedReference <>("A" , 0 );String prev = ref.getReference();int stamp = ref.getStamp(); ref.compareAndSet(prev, "C" , stamp, stamp + 1 )
AtomicMarkableReference 不关心换了几次,只关心有没有换。用一个bool 来判断
1 2 3 4 5 6 7 8 9 10 AtomicMarkableReference<T> ref = new AtomicMarkableReference <>(initialRef, initialMark=true );T currentRef = ref.getReference();boolean currentMark = ref.isMarked();boolean success = ref.compareAndSet(expectedRef, newRef, expectedMark, newMark);boolean [] mark = new boolean [1 ];int value = counter.get(mark); counter.compareAndSet(value, value + 1 , mark[0 ], !mark[0 ])
原子数组
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
前面的是保证引用的对象不变,现在需要保护引用对象的内部不被改变,例如数组对象的内容没有修改(多个引用对象),底层就是偏移量不同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private static <T> void demo ( Supplier<T> arraySupplier, Function<T, Integer> lengthFun, BiConsumer<T, Integer> putConsumer, Consumer<T> printConsumer ) { List<Thread> ts = new ArrayList <>(); T array = arraySupplier.get(); int length = lengthFun.apply(array); for (int i = 0 ; i < length; i++) { ts.add(new Thread (() -> { for (int j = 0 ; j < 10000 ; j++) { putConsumer.accept(array, j%length); } })); } ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); }
1 2 3 4 5 6 7 8 9 10 11 12 13 demo( ()->new int [10 ], (array)->array.length, (array, index) -> array[index]++, array-> System.out.println(Arrays.toString(array)) ); demo( ()-> new AtomicIntegerArray (10 ), (array) -> array.length(), (array, index) -> array.getAndIncrement(index), array -> System.out.println(array) );
字段更新器
AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
某个对象内部的字段,保证原子操作。必须volatile
1 2 3 4 5 6 7 private volatile int field;public static void main (String[] args) { AtomicIntegerFieldUpdater fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field" ); Test5 test5 = new Test5 (); fieldUpdater.compareAndSet(test5, 0 , 10 ); }
LongAdder 更高级的自增
1 2 3 4 5 6 for (int i = 0 ; i < 5 ; i++) { demo(() -> new LongAdder (), adder -> adder.increment()); }for (int i = 0 ; i < 5 ; i++) { demo(() -> new AtomicLong (), adder -> adder.getAndIncrement()); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static <T> void demo (Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); long start = System.nanoTime(); List<Thread> ts = new ArrayList <>(); for (int i = 0 ; i < 40 ; i++) { ts.add(new Thread (() -> { for (int j = 0 ; j < 500000 ; j++) { action.accept(adder); } })); } ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start)/1000_000 ); }
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
源码 1 2 3 4 5 6 transient volatile Cell[] cells;transient volatile long base;transient volatile int cellsBusy;
@sun.misc.Contended 防止cell伪共享
缓存行64 byte,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value) 一个缓存行可能有多个Cell,注解添加128B的pad防止在同一个缓存行
从 cpu 到
大约需要的时钟周期
寄存器
1 cycle (4GHz 的 CPU 约为0.25ns)
L1
3~4 cycle
L2
10~20 cycle
L3
40~45 cycle
内存
120~240 cycle
Unsafe 会操作内存,比较危险
获取unsafe
1 2 3 4 5 6 Unsafe.getUnsafe(); 被CallerSensitive修饰,能在引导类加载器加载的类中访问 抛出 SecurityException Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe" ); unsafeField.setAccessible(true );Unsafe unsafe = (Unsafe) unsafeField.get(null );
操作field
1 2 3 4 5 6 7 Teacher teacher = new Teacher ();long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id" ));long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name" )); unsafe.compareAndSwapInt(teacher, idOffset, 0 , 1 ); System.out.println(teacher);
模拟原子整数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class AtomicData { private volatile int data; static final Unsafe unsafe; static final long DATA_OFFSET; static { unsafe = UnsafeAccessor.getUnsafe(); try { DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data" )); } catch (NoSuchFieldException e) { throw new Error (e); } } public AtomicData (int data) { this .data = data; } public void decrease (int amount) { int oldValue; while (true ) { oldValue = data; if (unsafe.compareAndSwapInt(this , DATA_OFFSET, oldValue, oldValue - amount)) { return ; } } } public int getData () { return data; } }
不可变 日期转换的问题 1 2 3 4 5 6 SimpleDateFormat sdf = new SimpleDateFormat ("yyyy-MM-dd" ); sdf.parse("1951-04-21" ); synchronized (sdf){ sdf.parse("1951-04-21" ); }
内部属性不可变实现安全
1 DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd" );
不可变设计 1 2 3 4 5 6 7 8 9 10 11 12 public final class String implements java .io.Serializable, Comparable<String>, CharSequence { private final char value[]; private int hash; }this .value = Arrays.copyOf(value, value.length); this .value = Arrays.copyOfRange(value, offset, offset+count);
享元模式 minimizes memory usage by sharing
包装类 在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的 valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对 象:
1 2 3 4 5 6 7 public static Long valueOf (long l) { final int offset = 128 ; if (l >= -128 && l <= 127 ) { return LongCache.cache[(int )l + offset]; } return new Long (l); }
String 串池 BigDecimal BigInteger 模拟连接池! 有多个连接,所以需要array标记使用状态:AtomicArray
AtomicIntegerArray来标记连接状态,并且是线程安全的,注意需要使用cas修改标记 在所有的连接都占用时,wait等待以节约cpu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray states; public Pool (int poolSize) { this .poolSize = poolSize; this .connections = new Connection [poolSize]; this .states = new AtomicIntegerArray (new int [poolSize]); for (int i = 0 ; i < poolSize; i++) { connections[i] = new MockConnection ("连接" + (i+1 )); } } public Connection borrow () { while (true ) { for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ) { if (states.compareAndSet(i, 0 , 1 )) { log.debug("borrow {}" , connections[i]); return connections[i]; } } } synchronized (this ) { try { log.debug("wait..." ); this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } public void free (Connection conn) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0 ); synchronized (this ) { log.debug("free {}" , conn); this .notifyAll(); } break ; } } } }class MockConnection implements Connection { }
以上实现没有考虑:
连接的动态增长与收缩
连接保活(可用性检测)
等待超时处理
分布式 hash
对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apache commons pool,例如redis连接池可以参考jedis中关于连接池的实现
tomcat jdbc连接池比较简单易读
线程池
线程池 ThreadPollExecutor Fork/Join
JUC Lock Semaphore CountdownLatch CyclicBarrier…
第三方
自己线程池 无救急线程
高并发下,并不是越大越好,而是需要充分发挥已有线程的潜力
需要一个线程set
当一个线程结束后,查看有没有BlockingQueue有没有任务,有就run(实现复用)
BlockingQueue 实现保存暂时没有执行的任务列表,任务队列相当于生产者消费者模型 ,但使用lock实现
获取一个任务take
为了实现如果等一段时间内还没有任务结束线程(不是一直死等),需要添加超时的等待take -> poll(timeout, unit)
添加一个任务put
等待任务队列也不是无穷大,有一个capcity
当满了以后,有不同的策略
put
死等,阻塞主线程 fullWaitSet.await()
offer(task, timeout, unit)
带超时等待
添加一个拒绝策略,策略模式
具体实现的策略可以是死等、超时…
1 2 3 4 5 6 7 8 9 10 11 12 (queue, task)->{ queue.put(task); queue.offer(task, 1500 , TimeUnit.MILLISECONDS); log.debug("放弃{}" , task); throw new RuntimeException ("任务执行失败 " + task); task.run(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 class BlockingQueue <T> { private Deque<T> queue = new ArrayDeque <>(); private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); private int capcity; public BlockingQueue (int capcity) { this .capcity = capcity; } public T poll (long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0 ) { return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public T take () { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; } finally { lock.unlock(); } } public void put (T task) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("等待加入任务队列 {} ..." , task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); } finally { lock.unlock(); } } public boolean offer (T task, long timeout, TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capcity) { try { if (nanos <= 0 ) { return false ; } log.debug("等待加入任务队列 {} ..." , task); nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); return true ; } finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capcity) { rejectPolicy.reject(this , task); } else { log.debug("加入任务队列 {}" , task); queue.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } }
ThreadPool 1 2 3 4 5 6 7 8 9 private BlockingQueue<Runnable> taskQueue;private HashSet<Worker> workers = new HashSet <>();private int coreSize;private RejectPolicy<Runnable> rejectPolicy;
创建任务:
执行与任务完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package threadpoolself;class ThreadPool { private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet <>(); private int coreSize; private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; public void execute (Runnable task) { synchronized (workers) { if (workers.size() < coreSize) { Worker worker = new Worker (task); log.debug("新增 worker{}, {}" , worker, task); workers.add(worker); worker.start(); } else { taskQueue.tryPut(rejectPolicy, task); } } } public ThreadPool (int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .timeUnit = timeUnit; this .taskQueue = new BlockingQueue <>(queueCapcity); this .rejectPolicy = rejectPolicy; } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null ) { try { log.debug("正在执行...{}" , task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null ; } } synchronized (workers) { log.debug("worker 被移除{}" , this ); workers.remove(this ); } } } }
线程池 引入
int 的高 3 位来表示线程池状态,低 29 位表示线程数量,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
状态名
高3位
新任务
处理阻塞队列任务
说明
RUNNING
111
Y
Y
SHUTDOWN
000
N
Y
不会接收新任务,但会处理阻塞队列剩余任务shutdown
STOP
001
N
N
会中断正在执行的任务,并抛弃阻塞队列任务shutdownNow
TIDYING
010
任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED
011
终结状态
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize 最大线程数目 减去核心线程就是救急线程数量,多出来的会被销毁
keepAliveTime 生存时间 - 针对救急线程
unit 时间单位 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略
当阻塞队列满时,会先创建救济线程,再考虑拒绝策略
拒绝策略
AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy 让调用者运行任务
DiscardPolicy 放弃本次任务
DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题
Netty 的实现,是创建一个新线程来执行任务
ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
newFixedThreadPool 1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
阻塞队列是无界的,可以放任意数量的任务
任务量已知,任务耗时
newCachedThreadPool 1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
全部都是救急线程(60s 后可以回收)
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
适合任务数比较密集,但每个任务执行时间较短的情况
newSingleThreadExecutor 1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
使用场景: 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
FinalizableDelegatedExecutorService
限制了一些方法的暴露
提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 void execute (Runnable command) ; <T> Future<T> submit (Callable<T> task) ; futer.get()获取结果(原理保护性暂停模式) 有异常返回异常 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
其他 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void shutdown () ; - 会将队列中的任务返回 - 并用 interrupt 的方式中断正在执行的任务public List<Runnable> shutdownNow () boolean isShutdown () ;boolean isTerminated () ; 情,可以利用此方法等待boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException;
异步模式之工作线程 分工模式,不同的任务采用不同的线程池。如点餐线程池和做饭线程池
如果同一个线程池中,可能会没有做饭的导致点餐的一直在等待,导致饥饿。但jconsole死锁检测不到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ExecutorService waiterPool = Executors.newFixedThreadPool(1 );ExecutorService cookPool = Executors.newFixedThreadPool(1 ); waiterPool.execute(() -> { log.debug("处理点餐..." ); Future<String> f = cookPool.submit(() -> { log.debug("做菜" ); return cooking(); }); try { log.debug("上菜: {}" , f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } });
线程池数量
过小会导致程序不能充分地利用系统资源、容易导致饥饿
过大会导致更多的线程上下文切换,占用更多内存
CPU 密集 型运算
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因 导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型 运算
CPU 容易闲下来(IO RPC),你可以利用多线程提高它的利用率。 时间占比可以用工具估算
线程数 = 核数 * 期望 CPU 利用率(1.0) * 总时间(CPU计算时间+等待时间) / CPU 计算时间
任务调度 如何实现一些延时的任务,或者反复执行
Timmer 同一时间只能有一个任务在执行,出现异常后续不可以执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { Timer timer = new Timer (); TimerTask task1 = new TimerTask () { @Override public void run () { log.debug("task 1" ); sleep(2 ); } }; TimerTask task2 = new TimerTask () { @Override public void run () { log.debug("task 2" ); } }; timer.schedule(task1, 1000 ); timer.schedule(task2, 1000 ); }
ScheduledExecutorService 延时任务或者重复执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2 ); executor.schedule(() -> { System.out.println("任务1,执行时间:" + new Date ()); try { Thread.sleep(2000 ); } catch (InterruptedException e) { } }, 1000 , TimeUnit.MILLISECONDS); executor.schedule(() -> { System.out.println("任务2,执行时间:" + new Date ()); }, 1000 , TimeUnit.MILLISECONDS); executor.scheduleAtFixedRate(() -> { log.debug("running..." ); }, 2 , 1 , TimeUnit.SECONDS); executor.scheduleWithFixedDelay(() -> { log.debug("running..." ); }, 1 , 1 , TimeUnit.SECONDS);
异常处理
Tomcat线程池
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
Acceptor 只负责【接收新的 socket 连接】
Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
一旦可读,封装一个任务对象(socketProcessor implement Runnable),提交给 Executor 线程池处理
Executor 线程池中的工作线程最终负责【处理请求】
线程池源码 Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同 如果总线程数达到 maximumPoolSize 这时不会立刻抛 RejectedExecutionException 异常 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException ("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException (x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
配置 Connector 配置
Executor 线程配置,优先级高于上面的配置
此外,对救急线程的激活逻辑做了修改,先创建救急线程而不是加入队列
Fork/Join 大任务拆分为算法上相同的小任务,小任务分配到不同线程从而并行
1 2 3 4 public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (4 ); System.out.println(pool.invoke(new AddTask1 (5 ))); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 class AddTask1 extends RecursiveTask <Integer> { int n; public AddTask1 (int n) { this .n = n; } @Override public String toString () { return "{" + n + '}' ; } @Override protected Integer compute () { if (n == 1 ) { log.debug("join() {}" , n); return n; } AddTask1 t1 = new AddTask1 (n - 1 ); t1.fork(); log.debug("fork() {} + {}" , n, t1); int result = n + t1.join(); log.debug("join() {} + {} = {}" , n, t1, result); return result; } }
拆分优化:拆分成 begin-mid,mid+1-end 能提高并行度
1 2 3 4 public AddTask3 (int begin, int end) { this .begin = begin; this .end = end; }
并发工具 AQS 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
用 state volatile属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁
getState - 获取 state 状态
setState - 设置 state 状态
compareAndSetState - cas 机制设置 state 状态
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
1 2 3 4 5 6 7 8 9 if (!tryAcquire(arg)) { }if (tryRelease(arg)) { }
需要实现以下方法: 不同的实现代表不同锁类型 AQS其他方法会调用下面的方法,详情见ReentrantLock
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
队列
入队需要把当前Node变成tail,CAS操作防止并发影响
1 2 3 4 5 do {Node prev = tail; } while (tail.compareAndSet(prev, node))
某个线程释放锁后,会唤醒Head的下一个,并尝试tryAcquire;成功后设置当前节点为head,并且出队列,失败继续等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; }final Node p = node.predecessor();if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; }
自定义非重入锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 final class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int acquires) { if (acquires == 1 ){ if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } } return false ; } @Override protected boolean tryRelease (int acquires) { if (acquires == 1 ) { if (getState() == 0 ) { throw new IllegalMonitorStateException (); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } return false ; } protected Condition newCondition () { return new ConditionObject (); } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } }class MyLock implements Lock { static MySync sync = new MySync (); @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } }
ReentrantLock
获取锁 当前线程在获取失败后会park
有一个Node链表连接所有线程(有一个虚假head),前一个负责unpark后一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
释放锁
设置state
unpark队列中离head最近的Thread
1 2 3 4 5 6 7 8 9 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
可重入原理 state:代表计数,获取时++,释放时–
可打断原理 获取锁时会进入park,如果被打断就立马抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
公平性原理 1 2 3 4 5 6 tryAcquire if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; }
条件变量 每一个ConditionObject有一个Node等待队列,nextWaiter串起来,firstWaiter为第一个
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; } await Node node = new Node (Thread.currentThread(), Node.CONDITION); int savedState = fullyRelease(node); LockSupport.park(this ); signal 从ConditionObject.firstWaiter转移到等待列表
await
signal
读写锁 让读-读
并发
1 2 3 private ReentrantReadWriteLock rw = new ReentrantReadWriteLock ();private ReentrantReadWriteLock.ReadLock r = rw.readLock();private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
1 2 3 4 5 6 7 8 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start();new Thread (() -> { dataContainer.write(); }, "t2" ).start();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock (); void processCachedData () { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true ; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
读写锁实现缓存一致性 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑 适合读多写少,如果写操作比较频繁,以上实现性能低 没有考虑缓存容量 没有考虑缓存过期 只适合单机 并发性还是低,目前只会用一把锁 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key) 乐观锁实现:用 CAS 去更新
原理: 写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位,Node链表还是只有一个
没有仔细读源码
StampedLock 进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
1 2 3 4 5 long stamp = lock.readLock(); lock.unlockRead(stamp);long stamp = lock.writeLock(); lock.unlockWrite(stamp);
1 2 3 4 5 6 long stamp = lock.tryOptimisticRead(); sleep(readTime); if (!lock.validate(stamp)){ }
Semaphore 1 2 3 4 new Semaphore (3 ); semaphore.acquire(); semaphore.release();
优化连接池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 this .semaphore = new Semaphore (poolSize);public Connection borrow () { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ) { if (states.compareAndSet(i, 0 , 1 )) { log.debug("borrow {}" , connections[i]); return connections[i]; } } } return null ; }public void free (Connection conn) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0 ); log.debug("free {}" , conn); semaphore.release(); break ; } } }
原理 原理:state记录数量,cas操作。减完小于零时进入队列 doAcquireSharedInterruptibly
1 2 3 4 5 6 7 8 9 final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
释放,检查唤醒后面的
1 2 3 4 5 if (tryReleaseShared(arg)) { doReleaseShared(); return true ; }return false ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } }private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
CountDownLatch 用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值new CountDownLatch(3)
,await()
用来等待计数归零,countDown()
用来让计数减一
原理可以看源码,非常短countDown:state– await:state不等于零就加入队列
可以用来实现等待线程完成,为什么不用join?线程池!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); ExecutorService service = Executors.newFixedThreadPool(4 ); service.submit(() -> { log.debug("begin..." ); sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); service.submit(() -> { log.debug("begin..." ); sleep(1.5 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); service.submit(() -> { log.debug("begin..." ); sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); service.submit(()->{ try { log.debug("waiting..." ); latch.await(); log.debug("wait end..." ); } catch (InterruptedException e) { e.printStackTrace(); } }); }
应用:
CyclicBarrier 人满发车,可重复使用,state变成0后,会修改为init
await: state– 不为零就加入队列;为零就唤醒所有人,并重置state
1 2 3 4 5 6 7 8 9 CyclicBarrier cb = new CyclicBarrier (2 ); t1: cb.await(); t2: cb.await(); new CyclicBarrier (2 , ()->{ "发车了" });
注意线程池数量要和CyclicBarrier一样,否则可能出现同时两次都是task1的await触发