java并发

效率提升:

  • 异步,防止阻塞IO
  • 充分发挥多核CPU

Java 线程

线程创建

  1. 重写thread的run方法
  2. 创建runnable 抽象出来任务
  3. FutureTask 带返回值 线程间通信

核心Thread是创建一个线程,其中run方法或者Runnable只是代表具体的任务。如果main线程中调用Runnable.run该任务就是main执行的

extends Thread 并重写run方法,

1
2
3
4
5
6
7
8
9
// 创建线程对象
Thread t = new Thread() {
public void run() {
// 要执行的任务
}
};

// 启动线程
t.start();

Runnable :当成参数传给thread,run方法默认会检查Runnable如果有就执行Runnable.run

1
2
3
4
5
6
7
8
9
10
11
12
Thread thread = new Thread(new Runnable() {
public void run() {
// 线程的任务逻辑
}
});

// lambda:单个抽象方法的接口
Thread thread = new Thread(() -> {
// 线程的任务逻辑
});

thread.start();

FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadDemo {
public static void main(String[] args) {
Callable call = new MyCallable();
FutureTask<String> task = new FutureTask<>(call);// 继承Runnable
Thread t = new Thread(task);
t.start();
try {
String s = task.get(); // 阻塞等待返回结果
System.out.println(s);
} catch (Exception e) {
e.printStackTrace();
}
}

public class MyCallable implements Callable<String> {
@Override//重写线程任务类方法
public String call() throws Exception {
return Thread.currentThread().getName() + "->" + "Hello World";
}
}

tasklist jps taskkill

top -H -p pid jstack 查看进程中线程信息

每一个线程都有一个独立的栈,栈内每个函数都会有栈帧。main线程中的三个函数:

image-20230607100221439

上下文切换

切换当前执行的线程

  • CPU时间片
  • 垃圾回收
  • 更高优先权
  • 线程主动调用sleep、yield、wait、 join、 park、 synchronized、 lock

如何保存上下文信息?

  • 线程内程序计数器 记录运行到哪里
  • 栈帧记录变量信息

API

方法 说明
public void start() 启动一个新线程,Java虚拟机调用此线程的 run 方法
public void run() 线程启动后调用该方法
public void setName(String name) 给当前线程取名字
public void getName() 获取当前线程的名字 线程存在默认名称:子线程是 Thread-索引,主线程是 main
public static Thread currentThread() 获取当前线程对象,代码在哪个线程中执行
public static void sleep(long time) 让当前线程休眠多少毫秒再继续执行 Thread.sleep(0) : 让操作系统立刻重新进行一次 CPU 竞争
public static native void yield() 提示线程调度器让出当前线程对 CPU 的使用
public final int getPriority() 返回此线程的优先级
public final void setPriority(int priority) 更改此线程的优先级,常用 1 5 10
public void interrupt() 中断这个线程,异常处理机制
public static boolean interrupted() 判断当前线程是否被打断,清除打断标记
public boolean isInterrupted() 判断当前线程是否被打断,不清除打断标记
public final void join() 等待这个线程结束
public final void join(long millis) 等待这个线程死亡 millis 毫秒,0 意味着永远等待
public final native boolean isAlive() 线程是否存活(还没有运行完毕)
public final void setDaemon(boolean on) 将此线程标记为守护线程或用户线程

state

  • 操作系统层面:新建 就绪 运行 阻塞(io) 终止
  • java层面:其中Runnable包含 就绪 运行 阻塞(io)

image-20230607112856036

程状态 导致状态发生条件
NEW(新建) 线程刚被创建,但是并未启动,还没调用 start 方法,只有线程对象,没有线程特征
Runnable(可运行) 线程可以在 Java 虚拟机中运行的状态,可能正在运行自己代码,也可能没有,这取决于操作系统处理器,调用了 t.start() 方法:就绪(经典叫法)
Blocked(阻塞) 当一个线程试图获取一个对象锁,而该对象锁被其他的线程持有,则该线程进入 Blocked 状态;当该线程持有锁时,该线程将变成 Runnable 状态 synchronized EntryList
Waiting(无限等待) 一个线程在等待另一个线程执行一个(唤醒)动作时,该线程进入 Waiting 状态. wait(WaitSet) join(原理wait) park
Timed Waiting (限期等待) 有几个方法有超时参数,调用将进入 Timed Waiting 状态,这一状态将一直保持到超时期满或者接收到唤醒通知。常用方法有 Thread.sleep(time) 、wait(time) join(time) parkUntil(time)
Teminated(结束) run 方法正常退出而死亡,或者因为没有捕获的异常终止了 run 方法而死亡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}

interrupt

API
  • public void interrupt():打断这个线程,异常处理机制
  • public static boolean interrupted():判断当前线程是否被打断,打断返回 true,清除打断标记
  • public boolean isInterrupted():判断当前线程是否被打断,不清除打断标记
  1. 线程处于阻塞状态:如果线程当前处于阻塞状态,如调用了 Thread.sleep()Object.wait()join BlockingQueue.take() 等阻塞方法,调用 interrupt() 方法会中断线程的阻塞状态,抛出 InterruptedException 异常,打断标记。因为线程都不在运行,所以需要抛异常来处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public static void main(String[] args) throws InterruptedException {
    Thread t1 = new Thread(()->{
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }, "t1");
    t1.start();
    Thread.sleep(500);
    t1.interrupt();
    System.out.println(" 打断状态: {}" + t1.isInterrupted());// false
    }
  2. 线程处于非阻塞状态:如果线程当前处于非阻塞状态,调用 interrupt() 方法会将线程的中断状态设置为 true,但不会中断线程的执行。可以通过检查中断状态来决定是否终止线程的执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // 主线程休眠 2 秒后中断子线程
    Thread.sleep(2000);
    myThread.interrupt();

    class MyThread extends Thread {
    @Override
    public void run() {
    while (!Thread.currentThread().isInterrupted()) {
    // 线程执行的逻辑
    System.out.println("Thread is running.");
    }
    System.out.println("Thread interrupted. Exiting...");
    }
    }
两阶段终止

功能:记录系统的利用率,但需要能停止下来。
如果在sleep时被打断,则手动标记一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class TPTInterrupt {
private Thread thread;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(current.isInterrupted()) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
current.interrupt();
}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
thread.interrupt();
}
}
打断park

park阻塞线程类似于一直sleep,但被打断不会清空标记。需要标记为假时才生效

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
System.out.println("park...");
LockSupport.park();
System.out.println("unpark...");
System.out.println("打断状态:" + Thread.currentThread().isInterrupted());//打断状态:true
}, "t1");
t1.start();
Thread.sleep(2000);
t1.interrupt();
}

join

  • public final void join()等待该线程执行完成,原理上可以使用信号量PV
  • join(long millis) 最大等待时间

守护线程

当其它非守护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。

t1.setDaemon(true);

  • 垃圾回收器线程就是一种守护线程
  • Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等待它们处理完当前请求

共享模型之并发

访问共享变量时,代码的原子性(互斥)以及并发协调(同步)

  • 共享问题
  • synchronized
  • 线程安全分析
  • Monitor
  • wait/notify
  • 线程状态转换
  • 活跃性
  • Lock 加以改进
1
2
3
4
5
6
7
8
9
10
11
12
13
static int counter = 0;  // 共享资源

Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter++;
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
counter--;
}
}, "t2");

  • 阻塞:synchronized、lock
  • 非阻塞:原子变量

synchronized

对象锁,同一时刻只有一个线程获取到针对该对象的锁,获取失败进入等待队列

1
2
3
4
5
6
7
8
9
synchronized(对象) // 线程1, 线程2(blocked)
{
临界区
}

大括号相当于 获取锁 + 释放锁。 对象相当于locked
没有获取到时会阻塞释放资源 相当于Futex
void lock() { while (xchg(&locked, 1)) ; }
void unlock() { xchg(&locked, 0); }

加在成员方法上,锁对象 synchronized(this)

1
2
3
4
5
public synchronized void methodName() {
// 同步的代码块
}

同一类中的多个函数,如果synchronized了 也不能并行,因为对象被锁了而不是函数被锁了

加在静态方法上,锁类对象 synchronized(MyClass.class)

1
2
3
4
5
public class MyClass {
public static synchronized void staticMethod() {
// 同步的静态方法
}
}

局部变量理论是线程安全的(每个栈都有栈帧),成员变量和静态变量不是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ThreadSafe {
public final void method1(int loopNumber) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
method2(list);
method3(list);
}
}
private void method2(ArrayList<String> list) {
list.add("1");
}
private void method3(ArrayList<String> list) {
list.remove(0);
}
}

但如果方法是public并且在子类中被修改了,就可能出错,所以private 或者 final的修饰符是有必要的,满足开原则

1
2
3
4
5
6
7
8
class ThreadSafeSubClass extends ThreadSafe{
@Override
public void method3(ArrayList<String> list) {
new Thread(() -> {
list.remove(0);
}).start();
}
}

常见类

线程安全的类: 单一方法是安全的,组合不一定(想要安全还要额外上锁)

  • java.lang.String、 java.lang.Integer 不可变对象
  • java.lang.StringBuffer
  • java.lang.Float
  • java.lang.Boolean
  • java.util.Vector
  • java.util.Hashtable
  • java.util.concurrent.ConcurrentHashMap

非线程安全的类:

  • java.lang.StringBuilder
  • java.util.ArrayList
  • java.util.LinkedList
  • java.util.HashMap
  • java.util.HashSet

MyServlet只有一个,共享的。所以userService也是一个共享的。count不是安全的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyServlet extends HttpServlet {
// 是否安全?
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
// 记录调用次数
private int count = 0;

public void update() {
// ...
count++;
}
}

单例中的成员变量都是共享的。 改成环绕通知中的局部变量就解决了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Aspect
@Component
public class MyAspect {
// 是否安全?
private long start = 0L;

@Before("execution(* *(..))")
public void before() {
start = System.nanoTime();
}

@After("execution(* *(..))")
public void after() {
long end = System.nanoTime();
System.out.println("cost time:" + (end-start));
}
}

dao中的数据库连接,不能共享否则被别人close了,所以每个查询都要在局部变量中获取一个

1
2
3
4
5
6
7
8
9
10
public class UserDaoImpl implements UserDao {
// 是否安全
private Connection conn = null;
public void update() throws SQLException {
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
// ...
conn.close();
}
}

转账 锁住类对象,简单的解决办法但效率不高

1
2
3
4
5
6
7
8
9
10
class Account {
public void transfer(Account target, int amount) {
synchronized(Account.class){
if (this.money > amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
}

monitor

对象头

image-20230609132432143

Mark Word

image-20230609132408757

Monitor

Monitor是JVM中提供的一个对象,负责管理某个对象的锁。我们的对象通过MarkWord中的指针指向monitor对象(一对一) c++实现的

  • 获取成功为Owner
  • 失败加入EntryList(还可以先进行自旋几次,如果还失败才加入,减少上下文切换 自适应);
  • 在thread-2释放时唤醒一个(线程的阻塞和唤醒操作是在Java虚拟机内部进行的,而不涉及到底层操作系统的系统调用)

image-20230609133445269

字节码角度

1
2
3
4
synchronized(lock) 
{
count++;
}

除了正常处理外,19~23为异常处理,也会释放锁

image-20230609134409660

优化

优化:轻量级、偏向锁

  • 轻量级锁:在竞争比较少的情况下,每次上锁太麻烦了;房门上挂书包 对使用者透明
  • 偏向锁:直接在房门上课上名字,专属于谁
轻量级锁

锁记录:线程中负责记录 该线程锁住了哪些对象

  • 加锁:如果对象没被锁(01),通过CAS让对象头保留锁记录地址,锁记录保存原对象头信息

    image-20230610113617600
  • 加锁失败:如果对象已经被锁了(00),锁膨胀:申请一个monitor,对象头指向monitor,加入entrylist

  • 解锁:CAS再交换回来,如果发现对象被重量级锁锁住了,就进入重量级锁解锁流程

image-20230610114230079
偏向锁

轻量级锁问题:自己调用时,还需要指向CAS操作(这次一定会失败),偏向锁优化掉这个操作

image-20230610123937238

  • 把线程的ID放入MarkWord,以后轻量级锁前先查看线程ID是否是自己,自己就不用CAS了

  • 如果threadID不是自己,升级为轻量级锁,解锁后变成normal

    • (如果超过20次,会发生批量重新偏向,全部直接偏向t2)
    • (如果超过40次,直接所有新建对象不可偏向)
  • 偏向锁释放时,并不会清空threadID

image-20230610124244401
  • 初始时默认状态就是该状态,但程序加载会有延时
  • 可以手动禁用,或者hashCode()时会禁用(因为放不下,而在轻量级锁记录 重量级monitor会记录hash)

锁消除

锁压根不会发生冲突,则直接被优化掉

1
2
3
4
5
6
public void b() throws Exception {
Object o = new Object();
synchronized (o) {
x++;
}
}

wait / notify

介绍

等价于万能条件变量法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Thread threadA = new Thread(() -> {
synchronized (lock) {
System.out.println("Thread A is waiting");
lock.wait(); // 线程A等待
System.out.println("Thread A is resumed");
}
});

Thread threadB = new Thread(() -> {
synchronized (lock) {
System.out.println("Thread B is performing some task");
lock.notify(); // 唤醒等待的线程A
}
});

image-20230610133052012

  • 在获取锁后,发现不满足情况,lock.wait()释放锁并进入WaitSet

    1
    2
    mutex_lock(&lk);
    if (!cond) cond_wait(&cv, &lk); // 睡眠前释放锁;在唤醒后,会重新尝试获取锁
  • 在被Owner lock.notify后,重新进入EntryListnotifyAll()唤醒全部

    1
    2
    cond_signal(&cv);
    broadcast(&cv);
  • 操作系统不同点就是这里锁lk和唤醒信号cv都是lock对象

同样可能会存在错误叫醒的情况,while + 广播

1
2
3
4
5
6
7
8
9
synchronized(lock) {
while(!cond) {
lock.wait();
}
}

synchronized(lock) {
lock.notifyAll();
}

wait sleep区别

  • wait 是Object方法;sleep是Thread方法
  • wait必须要先获取锁并且再释放锁,sleep不用且不会释放锁

同步模式之保护性暂停

  • 一个线程等待另外一个线程结果
  • JDK 中,join 的实现、Future 的实现,采用的就是此模式
  • 但如果是一直产生:消息队列(见生产者/消费者)

如果用join实现,必须要下载线程结束,并且变量要设置为全局的

基本实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class GuardedObject {
private Object response;
private final Object lock = new Object();

public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
// 子线程执行下载 耗时
List<String> response = download();
log.debug("download complete...");
guardedObject.complete(response);

}).start();

// 主线程阻塞等待
Object response = guardedObject.get();

}
带超时

直接wait(time) break不行,因为存在虚假唤醒。记录等待时间防止多等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}", timePassed, response == null);
}
return response;
}
}
扩展多个

多加一个中间者,实现多对多,但其中每一对还是一一对应的。解耦产生和消费; PRC框架

image-20230610155101901

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}

Join原理

1
2
3
4
5
6
7
线程对象也是对象
A线程调用B.join()时,会先获取锁synchronized 然后执行B.wait(delay);
B线程运行结束后,调用notifyAll唤醒所有等待

public final synchronized void join(long millis){
wait(0);
}

异步模式之生产者/消费者

  • 生产者消费者不需要一一对应
  • JDK中的阻塞队列原理

image-20230610155149434

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class MessageQueue {
private LinkedList<Message> queue;
private int capacity;

public MessageQueue(int capacity){
queue = new LinkedList<>();
this.capacity = capacity;
}

public Message get(){
synchronized(queue){
while (queue.size() == 0){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.notifyAll();
return queue.getFirst();
}

}

public void add(Message m){
synchronized(queue){
while (queue.size() == capacity){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.notifyAll();
queue.add(m);
return ;
}
}
}

Park & Unpark

wait状态,有点像值最大为1的信号量 但是是以线程为单位 不需要获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.locks.LockSupport;

Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park(); // 暂停自己
log.debug("resume...");
},"t1");
t1.start();

sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1); // 恢复某个线程运行,可以在线程暂停前使用
  • 不需要monitor,唤醒比较精确
  • 可以先恢复再暂停

原理

每个线程一个parker对象

其中有一个_counter(干粮数量)=0或者1

  • unpark:_counter++ if(线程在等待) {唤醒, _counter=0}
  • park:_counter-- if ( _counter<0) {wait , _counter=0}

细粒度锁

睡觉和学习应该能并发,所以需要将锁细粒度化,而不是直接锁住this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}

死锁

在一个线程需要获取多把锁时就可能导致

死锁检测
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
jps // 输出java进程 pid

jconsole // 切换到线程,然后检测死锁
jstack pid // 显示进程运行信息 可以检测死锁 但只能检测java自己的 imple-db项目中锁是用map定义的就无法检测

Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
- locked <0x000000076b5bf1d0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"Thread-0":
at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
- locked <0x000000076b5bf1c0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束。开发中可以增加随机时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();

饥饿

某个线程始终不能运行,如设置了线程优先级,优先级低的可能难以运行

ReentrantLock

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁 先到先得而不是随机
  • 支持多个条件变量 相当于不同条件变量进入不同WaitSet 现在就完全相当于万能条件变量法 等价于synchronized+wait notifyall 升级

都可以重入

java实现的

1
2
3
4
5
6
7
8
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}

可中断

等待锁的过程中可以被叫醒

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
lock.lockInterruptibly(); // 如果别人获取了锁,我在等待过程可以被打断
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}


lock.lock();
t1.start();
sleep(1);
t1.interrupt();

可超时

避免无限制的等待

1
2
3
4
5
6
7
8
9
10
11
if (!lock.tryLock()) { // 查看当前状态下是否能够获取锁 立即返回true false
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}

lock.tryLock(1, TimeUnit.SECONDS) // 尝试1s
解决哲学家

获取锁时,如果右手获取不到,需要立马不等并且左手要解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 尝试获得左手筷子
if (left.tryLock()) {
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try {
eat();
} finally {
right.unlock();
}
}
} finally {
left.unlock();
}
}

公平锁

会降低并发度 默认为false lock = new ReentrantLock(true);

条件变量

之前等待队列只有一个,直接是lock对象

1
2
3
4
synchronized (lock) {
System.out.println("Thread A is waiting");
lock.wait(); // 线程A等待
}

在lock基础上,创建一个condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private ReentrantLock lock = new ReentrantLock();
// 创建一个condition
private Condition condition = lock.newCondition();
private boolean conditionMet = false;

public void await() throws InterruptedException {
lock.lock();
try {
while (!conditionMet) {
condition.await(); // 在该condition上等待 进入该队列
}
System.out.println("Condition is met. Resuming execution.");
} finally {
lock.unlock();
}
}

public void signal() {
lock.lock();
try {
conditionMet = true;
condition.signal(); // signalAll()
System.out.println("Condition is signaled.");
} finally {
lock.unlock();
}
}

同步模式之顺序控制

线程执行顺序

  • wait notify: 还需要一个额外变量标记代表cond
  • park unpark 非常简洁

打印指定形状

例如打印abcabc 和打印🐟一个原理

  1. 直接万能条件变量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public void print(int waitFlag, int nextFlag, String str) {
    synchronized (this) {
    while (this.flag != waitFlag) {
    try {
    this.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    System.out.print(str);
    flag = nextFlag;
    this.notifyAll();
    }
    }
  2. ReentrantLock + condition 每个线程都有等待的cond以及唤醒的cond 感觉没有必要 不如直接上面whlie+notifyAll

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public void print(String str, Condition current, Condition next) {
    this.lock();
    try {
    current.await();
    log.debug(str);
    next.signal();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    this.unlock();
    }
    }

  3. park unpark:每次unpark下一个想打印的线程,需要一个数组以获得下一个线程

    1
    2
    3
    LockSupport.park();
    System.out.print(str);
    LockSupport.unpark(nextThread());

小结

分析多线程访问共享资源时,哪些代码片段属于临界区

使用 synchronized 互斥解决临界区的线程安全问题

  • 掌握 synchronized 锁对象语法
  • 掌握 synchronzied 加载成员方法this 和静态方法语法 this.getClass()
  • 掌握 wait/notify 同步方法

使用 lock 互斥解决临界区的线程安全问题

  • 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
  • 相当于synchronzied 的升级

学会分析变量的线程安全性、掌握常见线程安全类的使用

了解线程活跃性问题:死锁、活锁、饥饿

应用方面

  • 互斥:使用 synchronized 或 Lock 达到共享资源互斥效果
  • 同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果

原理方面

  • monitor、synchronized 、wait/notify 原理
  • synchronized 进阶原理 轻量级 偏向锁 锁消除
  • park & unpark 原理

模式方面

  • 同步模式之保护性暂停
  • 异步模式之生产者消费者
  • 同步模式之顺序控制

内存模型

引入

通过volatile解决由于缓存引发的可见性问题,以及重排序引发的有序性问题

JMM java memory model

  • 原子性
  • 可见性 不受缓存影响
  • 有序性 不受cpu指令并行优化影响

JMM的主要内容包括:

  1. 主内存(Main Memory):主内存是所有线程共享的内存区域,用于存储共享变量。主内存中的数据对所有线程可见。
  2. 工作内存(Working Memory):每个线程都有自己的工作内存,用于存储线程执行时需要使用的数据。工作内存中包含了主内存中的部分数据副本。
  3. 内存间交互操作:JMM定义了一组规则,用于线程在主内存和工作内存之间进行数据交互。这些操作包括读取、写入和同步操作。
  4. 顺序一致性(Sequential Consistency):JMM保证线程的执行结果与顺序一致的执行结果相同。即,对于一个线程来说,它的操作将按照程序中的顺序执行,并且对其他线程可见。
  5. 可见性(Visibility):JMM保证一个线程对共享变量的修改对其他线程是可见的。这意味着一个线程对变量的修改,将会在之后的操作中对其他线程可见。
  6. 原子性(Atomicity):JMM提供了一些原子性的保证。例如,对volatile变量的读写具有原子性,单个读写操作不会被线程中断。
  7. 重排序(Reordering):JMM允许编译器和处理器对指令进行优化和重排序,但要求保持程序的顺序一致性和线程的可见性。

可见性

  • 实际上停不下来,为什么?
  • while中如果有sout,就可以停下来了 为什么?
1
2
3
4
5
6
7
8
9
10
11
12
static boolean run = true;

public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while (run) {
// ....
}
});
t.start();
Thread.sleep(1);
run = false; // 线程t会在下一次循环中停下来
}

线程中存储了run = true; 的副本

image-20230612163339523

  • volatile:强制到主存中读取,修饰成员变量和静态成员变量。不能保证原子性,一个i++ 一个i– 还是会错,适合一个写其他读的情况 (轻量级的同步机制)
  • synchronized:也可以实现必须去主存读取,但复杂度高。可以实现原子性(代码块内也可能重排序)
    • 进入时从主存读取最新
    • 退出时将修改刷新到主存

改进两阶段

使用volatile标记

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class TPTVolatile {
private Thread thread;
private volatile boolean stop = false;
public void start(){
thread = new Thread(() -> {
while(true) {
Thread current = Thread.currentThread();
if(stop) {
log.debug("料理后事");
break;
}
try {
Thread.sleep(1000);
log.debug("将结果保存");
} catch (InterruptedException e) {
}
// 执行监控操作
}
},"监控线程");
thread.start();
}
public void stop() {
stop = true;
thread.interrupt(); // 立即打断 不用等待sleep
}
}

balking

监控线程只有一个,但如果有人多次start()其实会调用多个。balking避免重复执行某个操作 任务调度

  • 加一个volatile变量if判断 ? 不行 不能保证原子
  • 加synchronized 可以实现,但每次都要synchronized同步比较慢(如用这个实现单例)
  • 缩小synchronized 范围。 为什么不直接用单例 单例是保证只有一个实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MonitorService {
// 用来表示是否已经有线程已经在执行启动了
private volatile boolean starting; // 如果只在synchronized内读写可以不加volatile

public void start() {
log.info("尝试启动监控线程...");
synchronized (this) {
if (starting) {
return;
}
starting = true;
}

// 真正启动监控线程...
}
}

有序性

流水线

image-20230612172143500

在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行,单线程下正确,但多线程下有问题

1
2
3
4
5
6
7
// 可以重排的例子
int a = 10; // 指令1
int b = 20; // 指令2
System.out.println( a + b );
// 不能重排的例子
int a = 10; // 指令1
int b = a - 5; // 指令2

指令重排序问题

1 4为正常输出 但可能出现0(概率比较低)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int num = 0;
boolean ready = false;
// 线程1 执行此方法
public void actor1(I_Result r) {
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
// 线程2 执行此方法
public void actor2(I_Result r) {
num = 2; // 这两条可能被交换位置 或者num = 2并没有被写入主存中
ready = true;
}

对变量ready添加volatile会禁用重排序

volatile原理

volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence)

  • 对 volatile 变量的写指令后会加入写屏障
  • 对 volatile 变量的读指令前会加入读屏障
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void actor2(I_Result r) {
num = 2;
ready = true; // ready 是 volatile 赋值带写屏障
// 写屏障
}

public void actor1(I_Result r) {
// 读屏障
// ready 是 volatile 读取值带读屏障
if(ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
  • 可见性:
    • 遇到写屏障(sfence),对所有共享变量的改动,都同步到主存当中
    • 遇到读屏障(lfence),去主存加载最新数据
  • 有序性:
    • 指令不能跨越屏障

double-check-locking

普通写法

1
2
3
4
5
6
7
8
9
10
11
12
public final class Singleton {
private Singleton() { }
private static Singleton INSTANCE = null;
// 分析这里的线程安全, 并说明有什么缺点
public static synchronized Singleton getInstance() {
if( INSTANCE != null ){
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}

double

  • 需要把同步的范围缩小,只在第一次加锁

  • 并且通过volatile保证读取最新值保证可见性,并且有序性防止指令重排导致 还没初始化完毕的对象

    1
    2
    3
    4
    5
    new 的过程
    17 表示创建对象,将对象引用入栈 // new Singleton
    20 表示复制一份对象引用 // 引用地址
    21 表示利用一个对象引用,调用构造方法
    24 表示利用一个对象引用,赋值给 static INSTANCE // 可能先执行 需要加volatile写屏障
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class Singleton {
private Singleton() { }
// 问题1:解释为什么要加 volatile ?
// volatile会在synchronized读取,synchronized外面需要volatile保证从内存读取
private static volatile Singleton INSTANCE = null;

// 问题2:对比实现3, 说出这样做的意义
public static Singleton getInstance() {
if (INSTANCE != null) {
return INSTANCE;
}
synchronized (Singleton.class) {
// 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗 EntryList都会进到这里
if (INSTANCE != null) { // t2
return INSTANCE;
}
INSTANCE = new Singleton();
return INSTANCE;
}
}
}

可见性进阶

  1. synchronized中的变量
  2. volatile修饰的变量
  3. 在线程开始前修改变量(可以理解为创建副本)
  4. t1.join() 后,可以看到t1中的修改
  5. t1打断t2, t2.interrupt(); t2可以看到t1的写
  6. 对变量默认值的写,其他线程可见
  7. 具有传递性 ,在写屏障前的全部修改都可见 y = 10 x = 1 (x是volatile)

单例习题

饿汉式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 问题1:为什么加 final 防止被修改
// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例
public final class Singleton implements Serializable {
// 问题3:为什么设置为私有? 是否能防止反射创建新的实例? 不能
private Singleton() {}
// 问题4:这样初始化是否能保证单例对象创建时的线程安全? jvm实现
private static final Singleton INSTANCE = new Singleton();
// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由
public static Singleton getInstance() {
return INSTANCE;
}

// 避免反序列化
public Object readResolve() {
return INSTANCE;
}
}

枚举类

1
2
3
4
5
6
7
8
9
// 问题1:枚举单例是如何限制实例个数的      
// 问题2:枚举单例在创建时是否有并发问题 jvm避免
// 问题3:枚举单例能否被反射破坏单例 不能
// 问题4:枚举单例能否被反序列化破坏单例 不会
// 问题5:枚举单例属于懒汉式还是饿汉式 饿汉式 static 静态成员变量
// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做 构造方法
enum Singleton {
INSTANCE;
}

类加载实现懒汉式单例:

1
2
3
4
5
6
7
8
9
10
11
public final class Singleton {
private Singleton() { }
// 问题1:属于懒汉式还是饿汉式
private static class LazyHolder {
static final Singleton INSTANCE = new Singleton();
}
// 问题2:在创建时是否有并发问题 不会 JVM保证只有一个
public static Singleton getInstance() {
return LazyHolder.INSTANCE;
}
}

无锁并发

  • 本章内容
  • CAS 与 volatile
  • 原子整数
  • 原子引用
  • 原子累加器
  • Unsafe
  1. CAS可以实现锁,0代表空闲1代表占用,下一章连接池中使用atomic数组实现多个连接池的锁
  2. reentrantlock底层其实还是CAS,外带一个等待队列(park实现等待) 见原理部分

Atomic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
AtomicInteger atomicInteger = new AtomicInteger(0);

// 获取当前值
int value = atomicInteger.get();

// 设置新的值
atomicInteger.set(10);

// 获取当前值,并设置新的值
int oldValue = atomicInteger.getAndSet(20);

// 比较当前值是否等于期望值,如果等于则设置新的值
boolean result = atomicInteger.compareAndSet(20, 30);
// public final native boolean compareAndSwapInt(Object this, long offset, int old, int new);

// 获取当前值,并将其加1
int newValue = atomicInteger.getAndIncrement();

// 获取当前值,并将其减1
newValue = atomicInteger.getAndDecrement();

// 获取当前值,并将其加上delta
newValue = atomicInteger.getAndAdd(5);

// 将当前值加1,并返回新的值
newValue = atomicInteger.incrementAndGet();

// 将当前值减1,并返回新的值
newValue = atomicInteger.decrementAndGet();

// 将当前值加上delta,并返回新的值
newValue = atomicInteger.addAndGet(5);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// CAS修改过程
while (true) {
// 比如拿到了旧值 1000
int prev = balance.get();
// 在这个基础上 1000-10 = 990
int next = prev - amount;
if (balance.compareAndSet(prev, next)) { //compareAndSwapInt(this, offset, prev, next)
break;
}
}

// 包括getAndIncrement也是这个原理
public final int getAndAddInt(Object this, long offset, int var4=1) {
int old;
do {
old = this.getIntVolatile(this, offset);
} while(!this.compareAndSwapInt(this, offset, old, old + 1));

return old;
}

CAS

Compare And Swap:先比较是否是旧值,旧值没被修改才swap(乐观锁)

核心一个函数:this和offset用于确定地址

1
public final native boolean compareAndSwapInt(Object this, long offset, int old, int new);

底层:lock cmpxchg 指令(X86 架构)

变量存储在一个volatile值中,因为每次都要保证可见性

1
private volatile int value;

效率高:不需要 锁的获取 以及 线程的上下文下切换,但需要更高的cpu资源,受限cpu内核数

原子整数

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

操作 原理都是while中尝试compareAndSwapInt

  • getAndIncrement(); incrementAndGet
  • getAndAdd(10);
  • getAndUpdate(p -> p * 2);

原子引用

  • AtomicReference
  • AtomicMarkableReference
  • AtomicStampedReference

对象不是基本类型,提供CAS对对象进行操作,compare比较的是地址

AtomicReference

保证多线程环境下取钱操作正常,并且不需要加锁

1
2
3
4
5
6
7
8
9
10
private AtomicReference<Double> balance = new AtomicReference<>(0.0);

public void withdraw(double amount) {
Double oldValue;
Double newValue;
do {
oldValue = balance.get();
newValue = oldValue - amount;
} while (!balance.compareAndSet(oldValue, newValue));
}

ABA

有人修改了值,但又改回来了,如何察觉到被修改了呢?

AtomicStampedReference

添加版本号,每次操作版本号+1,除了值要匹配,版本号也要匹配

1
2
3
4
5
6
7
8
9
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);

// 获取值 A
String prev = ref.getReference();
// 获取版本号
int stamp = ref.getStamp();


ref.compareAndSet(prev, "C", stamp, stamp + 1)

AtomicMarkableReference

不关心换了几次,只关心有没有换。用一个bool 来判断

1
2
3
4
5
6
7
8
9
10
AtomicMarkableReference<T> ref = new AtomicMarkableReference<>(initialRef, initialMark=true);

T currentRef = ref.getReference();
boolean currentMark = ref.isMarked();

boolean success = ref.compareAndSet(expectedRef, newRef, expectedMark, newMark);

boolean[] mark = new boolean[1];
int value = counter.get(mark);
counter.compareAndSet(value, value + 1, mark[0], !mark[0])

原子数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

前面的是保证引用的对象不变,现在需要保护引用对象的内部不被改变,例如数组对象的内容没有修改(多个引用对象),底层就是偏移量不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
参数1,提供数组、可以是线程不安全数组或线程安全数组
参数2,获取数组长度的方法
参数3,自增方法,回传 array, index
参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
private static <T> void demo(
Supplier<T> arraySupplier,
Function<T, Integer> lengthFun,
BiConsumer<T, Integer> putConsumer,
Consumer<T> printConsumer ) {
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
int length = lengthFun.apply(array);
for (int i = 0; i < length; i++) {
// 每个线程对数组作 10000 次操作
ts.add(new Thread(() -> {
for (int j = 0; j < 10000; j++) {
putConsumer.accept(array, j%length);
}
}));
}
ts.forEach(t -> t.start()); // 启动所有线程
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}); // 等所有线程结束
printConsumer.accept(array);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
demo(
()->new int[10],
(array)->array.length,
(array, index) -> array[index]++,
array-> System.out.println(Arrays.toString(array))
);

demo(
()-> new AtomicIntegerArray(10),
(array) -> array.length(),
(array, index) -> array.getAndIncrement(index),
array -> System.out.println(array)
);

字段更新器

  • AtomicReferenceFieldUpdater // 域 字段
  • AtomicIntegerFieldUpdater
  • AtomicLongFieldUpdater

某个对象内部的字段,保证原子操作。必须volatile

1
2
3
4
5
6
7
private volatile int field;

public static void main(String[] args) {
AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");
Test5 test5 = new Test5();
fieldUpdater.compareAndSet(test5, 0, 10);
}

LongAdder

更高级的自增

1
2
3
4
5
6
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 4 个线程,每人累加 50 万
for (int i = 0; i < 40; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}

性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。

源码

1
2
3
4
5
6
// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁 CAS实现
transient volatile int cellsBusy;
@sun.misc.Contended

防止cell伪共享

缓存行64 byte,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value) 一个缓存行可能有多个Cell,注解添加128B的pad防止在同一个缓存行

image-20230630145548007

从 cpu 到 大约需要的时钟周期
寄存器 1 cycle (4GHz 的 CPU 约为0.25ns)
L1 3~4 cycle
L2 10~20 cycle
L3 40~45 cycle
内存 120~240 cycle

Unsafe

会操作内存,比较危险

获取unsafe

1
2
3
4
5
6
Unsafe.getUnsafe();  被CallerSensitive修饰,能在引导类加载器加载的类中访问 抛出 SecurityException 

// 反射获取
Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
unsafeField.setAccessible(true);
Unsafe unsafe = (Unsafe) unsafeField.get(null);

操作field

1
2
3
4
5
6
7
Teacher teacher = new Teacher();

long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));

unsafe.compareAndSwapInt(teacher, idOffset, 0 , 1);
System.out.println(teacher);

模拟原子整数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class AtomicData {
private volatile int data;
static final Unsafe unsafe;
static final long DATA_OFFSET;
static {
unsafe = UnsafeAccessor.getUnsafe();
try {
// data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性
DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public AtomicData(int data) {
this.data = data;
}
public void decrease(int amount) {
int oldValue;
while(true) {
// 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解
oldValue = data;
// cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 false
if (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {
return;
}
}
}
public int getData() {
return data;
}
}

不可变

日期转换的问题

1
2
3
4
5
6
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
sdf.parse("1951-04-21"); // 不是线程安全的 多个线程调用会出错

synchronized (sdf){ // 可以解决 但太慢了
sdf.parse("1951-04-21");
}

内部属性不可变实现安全

1
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");

不可变设计

1
2
3
4
5
6
7
8
9
10
11
12
public final class String
implements java.io.Serializable, Comparable<String>, CharSequence {
/** The value is used for character storage. */
private final char value[];
/** Cache the hash code for the string */
private int hash; // Default to 0
// ...
}

// 保护性拷贝,防止内容被修改
this.value = Arrays.copyOf(value, value.length);
this.value = Arrays.copyOfRange(value, offset, offset+count);

享元模式

minimizes memory usage by sharing

包装类

在JDK中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf 方法,例如 Long 的
valueOf 会缓存 -128~127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对
象:

1
2
3
4
5
6
7
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}

String 串池

BigDecimal BigInteger

模拟连接池!

有多个连接,所以需要array标记使用状态:AtomicArray

AtomicIntegerArray来标记连接状态,并且是线程安全的,注意需要使用cas修改标记
在所有的连接都占用时,wait等待以节约cpu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private AtomicIntegerArray states;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("连接" + (i+1));
}
}
// 5. 借连接
public Connection borrow() {
while(true) {
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 如果没有空闲连接,当前线程进入等待
synchronized (this) {
try {
log.debug("wait...");
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
synchronized (this) {
log.debug("free {}", conn);
this.notifyAll();
}
break;
}
}
}
}
class MockConnection implements Connection {
// 实现略
}

以上实现没有考虑:

  • 连接的动态增长与收缩
  • 连接保活(可用性检测)
  • 等待超时处理
  • 分布式 hash

对于关系型数据库,有比较成熟的连接池实现,例如c3p0, druid等 对于更通用的对象池,可以考虑使用apache
commons pool,例如redis连接池可以参考jedis中关于连接池的实现

tomcat jdbc连接池比较简单易读

线程池

  • 线程池 ThreadPollExecutor Fork/Join
  • JUC Lock Semaphore CountdownLatch CyclicBarrier…
  • 第三方

自己线程池

无救急线程

高并发下,并不是越大越好,而是需要充分发挥已有线程的潜力

image-20230702093206100

  • 需要一个线程set
  • 当一个线程结束后,查看有没有BlockingQueue有没有任务,有就run(实现复用)

BlockingQueue

实现保存暂时没有执行的任务列表,任务队列相当于生产者消费者模型,但使用lock实现

  • 获取一个任务take

    • 为了实现如果等一段时间内还没有任务结束线程(不是一直死等),需要添加超时的等待take -> poll(timeout, unit)
  • 添加一个任务put

    • 等待任务队列也不是无穷大,有一个capcity

    • 当满了以后,有不同的策略

      1. put 死等,阻塞主线程 fullWaitSet.await()

      2. offer(task, timeout, unit) 带超时等待

      3. 添加一个拒绝策略,策略模式

        • 具体实现的策略可以是死等、超时…

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          (queue, task)->{
          // 1. 死等
          queue.put(task);
          // 2) 带超时等待
          queue.offer(task, 1500, TimeUnit.MILLISECONDS);
          // 3) 让调用者放弃任务执行
          log.debug("放弃{}", task);
          // 4) 让调用者抛出异常
          throw new RuntimeException("任务执行失败 " + task);
          // 5) 让调用者自己执行任务
          task.run();
          }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 带超时阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转换为 纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回值是剩余时间
if (nanos <= 0) {
return null;
}
nanos = emptyWaitSet.awaitNanos(nanos);//自动返回剩余时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时时间阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capcity) {
try {
if(nanos <= 0) {
return false;
}
log.debug("等待加入任务队列 {} ...", task);
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满
if(queue.size() == capcity) {
rejectPolicy.reject(this, task);
} else { // 有空闲
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}

ThreadPool

1
2
3
4
5
6
7
8
9
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;

// 等待队列满了时,拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
创建任务:
  • 任务数量小于coreSize时,创建一个Worker并加入线程集合中

  • 否则加入taskQueue

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public void execute(Runnable task) {
    // 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
    // 如果任务数超过 coreSize 时,加入任务队列暂存
    synchronized (workers) { // 保证线程安全
    if(workers.size() < coreSize) {
    Worker worker = new Worker(task);
    log.debug("新增 worker{}, {}", worker, task);
    workers.add(worker);
    worker.start();
    } else {
    // taskQueue.put(task);
    // 1) 死等
    // 2) 带超时等待
    // 3) 让调用者放弃任务执行
    // 4) 让调用者抛出异常
    // 5) 让调用者自己执行任务
    taskQueue.tryPut(rejectPolicy, task);
    }
    }
    }
执行与任务完成:
  • 当任务完成后,Worker需要从taskQueue取出下一个任务,实现Worker的复用

    • 一直等待 或者 等一段时间后结束线程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    class Worker extends Thread{
    private Runnable task;
    public void run() {
    // 执行任务
    // 1) 当 task 不为空,执行任务
    // 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
    // while(task != null || (task = taskQueue.take()) != null) { // 会一直等awiati
    while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
    try {
    log.debug("正在执行...{}", task);
    task.run();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    task = null;
    }
    }
    synchronized (workers) {
    log.debug("worker 被移除{}", this);
    workers.remove(this);
    }
    }
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package threadpoolself;

class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet<>();
// 核心线程数
private int coreSize;
// 获取任务时的超时时间
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行
// 如果任务数超过 coreSize 时,加入任务队列暂存
synchronized (workers) {
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("新增 worker{}, {}", worker, task);
workers.add(worker);
worker.start();
} else {
// taskQueue.put(task);
// 1) 死等
// 2) 带超时等待
// 3) 让调用者放弃任务执行
// 4) 让调用者抛出异常
// 5) 让调用者自己执行任务
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,
RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapcity);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行任务
// 1) 当 task 不为空,执行任务
// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行
// while(task != null || (task = taskQueue.take()) != null) {
while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
log.debug("正在执行...{}", task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
synchronized (workers) {
log.debug("worker 被移除{}", this);
workers.remove(this);
}
}
}
}

线程池

引入

image-20230702105624890

int 的高 3 位来表示线程池状态,低 29 位表示线程数量,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

状态名 高3位 新任务 处理阻塞队列任务 说明
RUNNING 111 Y Y
SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余任务shutdown
STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列任务shutdownNow
TIDYING 010 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 终结状态
1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目 减去核心线程就是救急线程数量,多出来的会被销毁
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

当阻塞队列满时,会先创建救济线程,再考虑拒绝策略

拒绝策略

image-20230702111206853

  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略

  • CallerRunsPolicy 让调用者运行任务

  • DiscardPolicy 放弃本次任务

  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
    便定位问题

  • Netty 的实现,是创建一个新线程来执行任务

  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略

  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务
  • 任务量已知,任务耗时

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
  • 全部都是救急线程(60s 后可以回收)
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
  • 适合任务数比较密集,但每个任务执行时间较短的情况

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

使用场景:
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

FinalizableDelegatedExecutorService限制了一些方法的暴露

提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task); futer.get()获取结果(原理保护性暂停模式) 有异常返回异常

// 提交 tasks 中所有任务, 会等最后一个完成
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;


// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 非阻塞
void shutdown();

- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
public List<Runnable> shutdownNow()

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

异步模式之工作线程

分工模式,不同的任务采用不同的线程池。如点餐线程池和做饭线程池

如果同一个线程池中,可能会没有做饭的导致点餐的一直在等待,导致饥饿。但jconsole死锁检测不到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
线程池数量
  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿

  • 过大会导致更多的线程上下文切换,占用更多内存

  • CPU 密集型运算

    ​ 通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因
    导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

  • I/O 密集型运算

    ​ CPU 容易闲下来(IO RPC),你可以利用多线程提高它的利用率。 时间占比可以用工具估算

    ​ 线程数 = 核数 * 期望 CPU 利用率(1.0) * 总时间(CPU计算时间+等待时间) / CPU 计算时间

任务调度

如何实现一些延时的任务,或者反复执行

Timmer

同一时间只能有一个任务在执行,出现异常后续不可以执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task 1");
sleep(2);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
ScheduledExecutorService

延时任务或者重复执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,希望它们都在 1s 后执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 1000, TimeUnit.MILLISECONDS);
executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);


// 延时2s后 每隔1s反复执行,延时包括执行时间,任务开始间隔时间 = max(执行时间,等待时间)
executor.scheduleAtFixedRate(() -> {
log.debug("running...");
}, 2, 1, TimeUnit.SECONDS);

// 延时从上一个任务结束开始算 任务开始间隔时间 = 执行时间+等待时间
executor.scheduleWithFixedDelay(() -> {
log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);

异常处理

  • 方法1:主动捉异常

  • 方法2:使用submit中的 Future,f.get()会返回异常信息

Tomcat线程池

image-20230703111719298

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor implement Runnable),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

线程池源码

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
如果总线程数达到 maximumPoolSize
这时不会立刻抛 RejectedExecutionException 异常
而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}

配置

Connector 配置

image-20230703112719086

Executor 线程配置,优先级高于上面的配置

image-20230703112911572

此外,对救急线程的激活逻辑做了修改,先创建救急线程而不是加入队列

image-20230703113021772

Fork/Join

大任务拆分为算法上相同的小任务,小任务分配到不同线程从而并行

1
2
3
4
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class AddTask1 extends RecursiveTask<Integer> {
int n;
public AddTask1(int n) {
this.n = n;
}
@Override
public String toString() {
return "{" + n + '}';
}
@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}
// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork(); // 启动另一个线程执行该任务

log.debug("fork() {} + {}", n, t1);
// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}

拆分优化:拆分成 begin-mid,mid+1-end 能提高并行度

1
2
3
4
public AddTask3(int begin, int end) {
this.begin = begin;
this.end = end;
}

并发工具

AQS

全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架

特点:

  • state volatile属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取
    锁和释放锁
    • getState - 获取 state 状态
    • setState - 设置 state 状态
    • compareAndSetState - cas 机制设置 state 状态
    • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
  • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
1
2
3
4
5
6
7
8
9
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队, 可以选择阻塞当前线程 park unpark
}

// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}

需要实现以下方法: 不同的实现代表不同锁类型 AQS其他方法会调用下面的方法,详情见ReentrantLock

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

队列

image-20230704100702089

入队需要把当前Node变成tail,CAS操作防止并发影响

1
2
3
4
5
do {
// 原来的 tail
Node prev = tail;
// 用 cas 在原来 tail 的基础上改为 node
} while(tail.compareAndSet(prev, node))

某个线程释放锁后,会唤醒Head的下一个,并尝试tryAcquire;成功后设置当前节点为head,并且出队列,失败继续等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 释放锁
if (tryRelease(arg)) { // 修改state 自己定义的方法
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // unpark下一个thread,优先叫醒head下一个;只是unpark,后面还要获取锁
return true;
}

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 老二被叫醒 尝试获取锁,成功了才被删除
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}

自定义非重入锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
final class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
if (acquires == 1){
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
if(acquires == 1) {
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
protected Condition newCondition() {
return new ConditionObject();
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}

class MyLock implements Lock {
static MySync sync = new MySync();
@Override
// 尝试,不成功,进入等待队列
public void lock() {
sync.acquire(1);
}
@Override
// 尝试,不成功,进入等待队列,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
// 尝试一次,不成功返回,不进入队列
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
// 尝试,不成功,进入等待队列,有时限
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
// 释放锁
public void unlock() {
sync.release(1);
}
@Override
// 生成条件变量
public Condition newCondition() {
return sync.newCondition();
}
}

ReentrantLock

image-20230703151551509

获取锁

当前线程在获取失败后会park

有一个Node链表连接所有线程(有一个虚假head),前一个负责unpark后一个

image-20230703154753396

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // add加入到链表中
selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 老二被叫醒 尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && // pre的state设置为-1
parkAndCheckInterrupt()) // park 并返回等待过程中有没有被打断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

释放锁

  • 设置state
  • unpark队列中离head最近的Thread

image-20230703155757360

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) { // 修改state 自己定义的方法
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // unpark下一个thread,优先叫醒head下一个;只是unpark,后面还要获取锁
return true;
}
return false;
}

可重入原理

state:代表计数,获取时++,释放时–

可打断原理

获取锁时会进入park,如果被打断就立马抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); // 被打断后 立马抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 如果被打断 返回true
return Thread.interrupted();
}

公平性原理

1
2
3
4
5
6
tryAcquire
if (!hasQueuedPredecessors() && // 添加这一句 队列中等待有时,新来的不可以tryAcquire
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}

条件变量

每一个ConditionObject有一个Node等待队列,nextWaiter串起来,firstWaiter为第一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}

await
// 添加一个节点,CONDITION 并加入到Waiter队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 释放掉当前lock上的锁,并唤醒等待队列
int savedState = fullyRelease(node);

// park自己
LockSupport.park(this);

signal
从ConditionObject.firstWaiter转移到等待列表

await

image-20230703173859988

signal

image-20230703173947214

读写锁

读-读并发

1
2
3
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
1
2
3
4
5
6
7
8
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();

new Thread(() -> {
dataContainer.write();
}, "t2").start();
  • 写锁才有条件变量
  • 不支持升级但支持降级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

读写锁实现缓存一致性

以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
适合读多写少,如果写操作比较频繁,以上实现性能低
没有考虑缓存容量
没有考虑缓存过期
只适合单机
并发性还是低,目前只会用一把锁
更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
乐观锁实现:用 CAS 去更新

原理:

写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位,Node链表还是只有一个

image-20230704112731294

没有仔细读源码

StampedLock

进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用

1
2
3
4
5
long stamp = lock.readLock();
lock.unlockRead(stamp);

long stamp = lock.writeLock();
lock.unlockWrite(stamp);
1
2
3
4
5
6
long stamp = lock.tryOptimisticRead();  // 无锁
sleep(readTime);
// 验戳 如果有人修改了,那么会失败
if(!lock.validate(stamp)){
// 锁升级
}

Semaphore

1
2
3
4
new Semaphore(3);

semaphore.acquire();
semaphore.release();

优化连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
this.semaphore = new Semaphore(poolSize);

public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}

原理

原理:state记录数量,cas操作。减完小于零时进入队列 doAcquireSharedInterruptibly

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg) 
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

释放,检查唤醒后面的

1
2
3
4
5
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

CountDownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值new CountDownLatch(3)await() 用来等待计数归零,countDown() 用来让计数减一

原理可以看源码,非常短countDown:state– await:state不等于零就加入队列

可以用来实现等待线程完成,为什么不用join?线程池!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
ExecutorService service = Executors.newFixedThreadPool(4);
service.submit(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
});
service.submit(()->{
try {
log.debug("waiting...");
latch.await();
log.debug("wait end...");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

应用:

  • 等待线程完成,如用户加载
  • 等待远程调用完成

CyclicBarrier

人满发车,可重复使用,state变成0后,会修改为init

await: state– 不为零就加入队列;为零就唤醒所有人,并重置state

1
2
3
4
5
6
7
8
9
CyclicBarrier cb = new CyclicBarrier(2); 

t1: cb.await(); // 当个数不足时,等
t2: cb.await(); // 2 秒后,线程个数够2,继续运行

// 发车后执行的操作
new CyclicBarrier(2, ()->{
"发车了"
});

注意线程池数量要和CyclicBarrier一样,否则可能出现同时两次都是task1的await触发