Java面试八股文-并发篇

1. 线程基础

1.1 什么是线程?线程和进程的区别?

线程(Thread)是进程中的一个执行单元,是操作系统进行CPU调度的基本单位。一个进程可以包含多个线程,这些线程共享进程的资源,但各自拥有独立的执行栈和程序计数器。

进程(Process)是操作系统进行资源分配的基本单位,每个进程拥有独立的内存空间、文件描述符等资源。

线程和进程的区别

特性 进程 线程
资源分配 独立的内存空间、文件描述符等 共享进程的资源
调度单位 不是CPU调度的基本单位 CPU调度的基本单位
创建开销 较大,需要分配资源 较小,共享进程资源
切换开销 较大,需要保存和恢复整个进程的状态 较小,只需要保存和恢复线程的状态
通信方式 复杂,需要使用IPC机制(如管道、共享内存等) 简单,可以直接共享进程内的变量
安全性 一个进程崩溃不会影响其他进程 一个线程崩溃可能会影响整个进程
数量限制 系统能运行的进程数量较少 系统能运行的线程数量较多

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 进程示例:每个Java程序都是一个进程
public class ProcessExample {
public static void main(String[] args) {
System.out.println("This is a Java process");
// 启动一个新进程
try {
Process process = Runtime.getRuntime().exec("notepad.exe");
process.waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
}

// 线程示例:在一个进程中创建多个线程
public class ThreadExample {
public static void main(String[] args) {
// 创建并启动多个线程
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 1: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread thread2 = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread 2: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

thread1.start();
thread2.start();
}
}

1.2 如何创建线程?

Java中创建线程的方式有三种:

1.2.1 继承Thread类

步骤

  1. 继承Thread类
  2. 重写run()方法
  3. 创建线程对象并调用start()方法启动线程

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyThread extends Thread {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
MyThread thread1 = new MyThread();
thread1.setName("Thread-1");
thread1.start();

MyThread thread2 = new MyThread();
thread2.setName("Thread-2");
thread2.start();
}
}

1.2.2 实现Runnable接口

步骤

  1. 实现Runnable接口
  2. 实现run()方法
  3. 创建Runnable对象
  4. 将Runnable对象作为参数传递给Thread构造函数
  5. 调用start()方法启动线程

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
MyRunnable runnable = new MyRunnable();

Thread thread1 = new Thread(runnable, "Thread-1");
thread1.start();

Thread thread2 = new Thread(runnable, "Thread-2");
thread2.start();
}
}

1.2.3 实现Callable接口

步骤

  1. 实现Callable接口
  2. 实现call()方法(可以返回结果)
  3. 创建Callable对象
  4. 将Callable对象包装成FutureTask
  5. 将FutureTask作为参数传递给Thread构造函数
  6. 调用start()方法启动线程
  7. 调用FutureTask的get()方法获取结果

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i <= 100; i++) {
sum += i;
}
return sum;
}

public static void main(String[] args) {
MyCallable callable = new MyCallable();
FutureTask<Integer> futureTask = new FutureTask<>(callable);

Thread thread = new Thread(futureTask, "Callable-Thread");
thread.start();

try {
// 获取线程执行结果
Integer result = futureTask.get();
System.out.println("Sum of 1-100: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
}

三种方式的对比

方式 优点 缺点
继承Thread类 实现简单,直接使用this获取当前线程 不能继承其他类,单继承限制
实现Runnable接口 可以继承其他类,更灵活 无法直接获取线程执行结果
实现Callable接口 可以返回执行结果,可以抛出异常 实现较复杂,需要FutureTask包装

1.3 线程的生命周期?

线程的生命周期包括以下五个状态:

1.3.1 新建(New)

状态说明:线程被创建但尚未启动,此时线程对象已经创建,但还没有调用start()方法。

示例

1
2
3
4
Thread thread = new Thread(() -> {
System.out.println("Thread running");
});
// 此时线程处于新建状态

1.3.2 就绪(Runnable)

状态说明:线程调用了start()方法后进入就绪状态,此时线程可以运行,但尚未获取CPU时间片。

示例

1
2
3
4
Thread thread = new Thread(() -> {
System.out.println("Thread running");
});
thread.start(); // 线程进入就绪状态

1.3.3 运行(Running)

状态说明:线程获取了CPU时间片,正在执行run()方法中的代码。

示例

1
2
3
4
5
6
7
8
9
Thread thread = new Thread(() -> {
System.out.println("Thread running"); // 执行此处代码时,线程处于运行状态
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();

1.3.4 阻塞(Blocked)

状态说明:线程因某些原因暂停执行,放弃CPU时间片,直到阻塞原因消除后重新进入就绪状态。

阻塞的原因

  • 等待获取锁:线程试图进入synchronized块或方法,但锁被其他线程持有
  • 等待IO操作:线程等待IO操作完成
  • 等待其他线程:线程调用了wait()方法,等待其他线程的通知
  • 睡眠:线程调用了sleep()方法

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 等待获取锁导致的阻塞
Object lock = new Object();

Thread thread1 = new Thread(() -> {
synchronized (lock) {
System.out.println("Thread1 got the lock");
try {
Thread.sleep(2000); // 持有锁并睡眠
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});

Thread thread2 = new Thread(() -> {
System.out.println("Thread2 waiting for lock");
synchronized (lock) { // 尝试获取锁,此时会阻塞
System.out.println("Thread2 got the lock");
}
});

thread1.start();
try {
Thread.sleep(500); // 确保thread1先获取锁
} catch (InterruptedException e) {
e.printStackTrace();
}
thread2.start();

1.3.5 终止(Terminated)

状态说明:线程执行完毕或被终止,线程的生命周期结束。

终止的原因

  • 正常终止:run()方法执行完毕
  • 异常终止:run()方法抛出未捕获的异常
  • 强制终止:调用了stop()方法(已废弃)或interrupt()方法

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Thread thread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("Thread running: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
break; // 响应中断,退出循环
}
}
System.out.println("Thread finished"); // 执行完毕,线程终止
});

thread.start();
try {
Thread.sleep(300); // 等待一段时间后中断线程
thread.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}

线程状态转换图

1
2
3
新建(New) → 就绪(Runnable) → 运行(Running) → 终止(Terminated)
↑ ↓
← 阻塞(Blocked)

2. 线程同步

2.1 什么是线程同步?为什么需要线程同步?

线程同步是指多个线程在访问共享资源时,按照一定的顺序执行,避免数据竞争和不一致的问题。需要线程同步的原因是:当多个线程同时访问共享资源时,可能会导致数据不一致或其他并发问题。

2.2 Java中的线程同步方式有哪些?

Java中的线程同步方式包括:

  • synchronized关键字:修饰方法或代码块
  • ReentrantLock:可重入锁
  • volatile关键字:保证变量的可见性
  • Atomic类:原子操作类
  • 线程安全的集合类:如ConcurrentHashMap
  • 信号量(Semaphore):控制并发访问的数量
  • 倒计时门闩(CountDownLatch):等待多个线程完成
  • 循环屏障(CyclicBarrier):等待所有线程到达某个点
  • 读写锁(ReadWriteLock):允许多个读操作同时进行

2.3 synchronized和ReentrantLock的区别?

synchronized和ReentrantLock是Java中用于实现线程同步的两种主要方式,它们的区别如下:

特性 synchronized ReentrantLock
实现方式 关键字,由JVM实现 类,由Java代码实现
锁类型 只能是非公平锁 可以是公平锁或非公平锁
锁状态 无法获取锁的状态 可以通过isLocked()等方法获取锁的状态
响应中断 不支持 支持,可通过lockInterruptibly()方法
尝试获取锁 不支持 支持,可通过tryLock()方法
释放锁 自动释放 必须手动释放(在finally块中)
条件变量 不支持 支持,可通过newCondition()方法
锁的范围 方法或代码块 代码块

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// synchronized示例
public class SynchronizedExample {
private int count = 0;

// 修饰方法
public synchronized void increment() {
count++;
}

// 修饰代码块
public void decrement() {
synchronized (this) {
count--;
}
}

public int getCount() {
return count;
}
}

// ReentrantLock示例
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
private int count = 0;
private ReentrantLock lock = new ReentrantLock(); // 非公平锁
// private ReentrantLock lock = new ReentrantLock(true); // 公平锁

public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock(); // 必须手动释放锁
}
}

public void decrement() {
lock.lock();
try {
count--;
} finally {
lock.unlock();
}
}

// 尝试获取锁
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}

public int getCount() {
return count;
}
}

// 测试
public class Test {
public static void main(String[] args) throws InterruptedException {
// 测试synchronized
SynchronizedExample syncExample = new SynchronizedExample();
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
syncExample.increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
syncExample.decrement();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("Synchronized count: " + syncExample.getCount());

// 测试ReentrantLock
ReentrantLockExample lockExample = new ReentrantLockExample();
Thread thread3 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
lockExample.increment();
}
});
Thread thread4 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
lockExample.decrement();
}
});
thread3.start();
thread4.start();
thread3.join();
thread4.join();
System.out.println("ReentrantLock count: " + lockExample.getCount());
}
}

使用场景

  • synchronized:适用于简单的同步场景,代码简洁,易于使用
  • ReentrantLock:适用于复杂的同步场景,需要更多的控制(如公平锁、尝试获取锁、响应中断等)

2.4 什么是死锁?如何避免死锁?

死锁是指两个或多个线程互相等待对方释放资源,导致所有线程都无法继续执行的情况。

死锁的四个必要条件

  1. 互斥条件:资源不能被多个线程同时使用
  2. 持有并等待条件:线程持有资源的同时等待其他资源
  3. 不可剥夺条件:资源不能被强制剥夺
  4. 循环等待条件:线程之间形成循环等待关系

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class DeadlockExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();

public static void main(String[] args) {
// 线程1:先获取lock1,再获取lock2
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("Thread 1: Holding lock1");
try {
Thread.sleep(100); // 让线程2有时间获取lock2
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 1: Waiting for lock2");
synchronized (lock2) {
System.out.println("Thread 1: Holding lock1 and lock2");
}
}
});

// 线程2:先获取lock2,再获取lock1
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("Thread 2: Holding lock2");
try {
Thread.sleep(100); // 让线程1有时间获取lock1
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 2: Waiting for lock1");
synchronized (lock1) {
System.out.println("Thread 2: Holding lock2 and lock1");
}
}
});

thread1.start();
thread2.start();
}
}

避免死锁的方法

  1. 按顺序获取锁:所有线程按照相同的顺序获取锁,避免循环等待

    1
    2
    3
    4
    5
    6
    7
    8
    // 正确的做法:按顺序获取锁
    public void method() {
    synchronized (lock1) {
    synchronized (lock2) {
    // 操作
    }
    }
    }
  2. 使用超时机制:使用tryLock()方法尝试获取锁,并设置超时时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    ReentrantLock lock1 = new ReentrantLock();
    ReentrantLock lock2 = new ReentrantLock();

    public boolean method() {
    try {
    if (lock1.tryLock(1, TimeUnit.SECONDS)) {
    try {
    if (lock2.tryLock(1, TimeUnit.SECONDS)) {
    try {
    // 操作
    return true;
    } finally {
    lock2.unlock();
    }
    }
    } finally {
    lock1.unlock();
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return false;
    }
  3. 减少锁的范围:只在必要的代码块上加锁,减少持有锁的时间

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 不好的做法:锁的范围过大
    public synchronized void method() {
    // 非临界区代码
    // 临界区代码
    // 非临界区代码
    }

    // 好的做法:只锁临界区
    public void method() {
    // 非临界区代码
    synchronized (this) {
    // 临界区代码
    }
    // 非临界区代码
    }
  4. 使用无锁数据结构:如ConcurrentHashMapAtomic类等

    1
    2
    3
    4
    5
    6
    // 使用AtomicInteger代替synchronized
    private AtomicInteger count = new AtomicInteger(0);

    public void increment() {
    count.incrementAndGet();
    }
  5. 使用Lock的tryLock()方法:尝试获取锁,如果获取不到就放弃

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    ReentrantLock lock = new ReentrantLock();

    public void method() {
    if (lock.tryLock()) {
    try {
    // 操作
    } finally {
    lock.unlock();
    }
    } else {
    // 处理获取不到锁的情况
    }
    }
  6. 使用线程池:通过线程池管理线程,避免创建过多线程

  7. 使用信号量:控制对资源的访问数量

死锁检测

  • 使用jstack命令查看线程堆栈信息
  • 使用jconsoleVisualVM等工具监控线程状态

3. 线程池

3.1 什么是线程池?为什么使用线程池?

线程池是一种线程管理机制,它预先创建一组线程,用于执行提交的任务。线程池可以有效地管理线程资源,提高系统性能和稳定性。

1. 线程池的概念

  • 线程池是由多个线程组成的集合,这些线程可以被重复使用
  • 线程池维护着一个任务队列,用于存储待执行的任务
  • 线程池负责管理线程的创建、调度和销毁

2. 使用线程池的原因

(1) 减少线程创建和销毁的开销

  • 线程的创建和销毁是比较昂贵的操作,需要分配和释放系统资源
  • 线程池预先创建线程,避免了频繁创建和销毁线程的开销

(2) 控制并发线程的数量

  • 过多的线程会导致系统资源竞争,降低系统性能
  • 线程池可以限制并发线程的数量,避免资源耗尽

(3) 提高线程的复用性

  • 线程池中的线程可以重复执行多个任务,提高了线程的利用率
  • 避免了为每个任务创建新线程的开销

(4) 便于管理和监控线程

  • 线程池提供了统一的线程管理机制
  • 可以监控线程池的状态,如线程数量、任务执行情况等
  • 可以设置线程池的参数,如核心线程数、最大线程数等

(5) 提高系统的响应速度

  • 线程池中的线程是预先创建的,当有任务提交时,可以立即执行
  • 避免了线程创建的延迟,提高了系统的响应速度

3. 线程池的应用场景

  • Web服务器:处理大量的HTTP请求
  • 数据库连接池:管理数据库连接
  • 消息队列:处理消息的消费
  • 批量任务处理:处理大量的计算任务
  • 实时系统:需要快速响应的系统

4. 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {
public static void main(String[] args) {
// 创建一个固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

// 提交10个任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
System.out.println("Task " + taskId + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed");
});
}

// 关闭线程池
executorService.shutdown();
}
}

5. 线程池的优点总结

  • 性能优化:减少线程创建和销毁的开销
  • 资源管理:控制并发线程的数量,避免资源耗尽
  • 提高响应速度:线程预先创建,立即执行任务
  • 可管理性:统一管理线程,便于监控和调优
  • 稳定性:避免因线程过多导致的系统不稳定

6. 注意事项

  • 线程池的大小应该根据系统资源和任务特性来设置
  • 线程池需要及时关闭,避免资源泄漏
  • 任务执行时间不宜过长,否则会阻塞线程池
  • 应该合理处理任务执行过程中的异常

3.2 Java中的线程池有哪些?

Java中提供了多种类型的线程池,以满足不同的应用场景。这些线程池都是通过Executors类创建的,主要包括以下几种:

1. FixedThreadPool(固定大小线程池)

特点

  • 线程池大小固定,核心线程数和最大线程数相等
  • 当线程池中的线程都在工作时,新提交的任务会进入队列等待
  • 队列使用无界队列(LinkedBlockingQueue)

适用场景

  • 适用于处理稳定的并发任务,如Web服务器处理HTTP请求
  • 适用于需要控制并发线程数量的场景

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小为5的线程池
ExecutorService executorService = Executors.newFixedThreadPool(5);

for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
System.out.println("Task " + taskId + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

executorService.shutdown();
}
}

2. CachedThreadPool(可缓存线程池)

特点

  • 线程池大小不固定,核心线程数为0,最大线程数为Integer.MAX_VALUE
  • 当有新任务提交时,如果没有空闲线程,会创建新线程
  • 线程空闲时间超过60秒会被回收
  • 队列使用SynchronousQueue,不存储任务

适用场景

  • 适用于处理短期、突发的任务
  • 适用于任务执行时间短、任务数量波动大的场景

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
public static void main(String[] args) {
// 创建可缓存的线程池
ExecutorService executorService = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
System.out.println("Task " + taskId + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

executorService.shutdown();
}
}

3. ScheduledThreadPool(定时任务线程池)

特点

  • 用于执行定时任务或周期性任务
  • 核心线程数固定,最大线程数为Integer.MAX_VALUE
  • 队列使用DelayedWorkQueue

适用场景

  • 适用于需要定期执行任务的场景,如定时备份、定时清理等
  • 适用于需要延迟执行任务的场景

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
public static void main(String[] args) {
// 创建核心线程数为3的定时任务线程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

// 延迟1秒执行任务
scheduledExecutorService.schedule(() -> {
System.out.println("Delayed task executed at: " + System.currentTimeMillis());
}, 1, TimeUnit.SECONDS);

// 延迟2秒后,每3秒执行一次任务
scheduledExecutorService.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed at: " + System.currentTimeMillis());
}, 2, 3, TimeUnit.SECONDS);

// 延迟2秒后,每次执行完任务后等待3秒再执行
scheduledExecutorService.scheduleWithFixedDelay(() -> {
System.out.println("Delayed periodic task executed at: " + System.currentTimeMillis());
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 2, 3, TimeUnit.SECONDS);

// 注意:定时任务线程池需要手动关闭
// scheduledExecutorService.shutdown();
}
}

4. SingleThreadPool(单线程线程池)

特点

  • 线程池只有一个线程
  • 所有任务按照提交顺序执行
  • 队列使用无界队列(LinkedBlockingQueue)

适用场景

  • 适用于需要保证任务顺序执行的场景
  • 适用于需要串行处理任务的场景
  • 适用于需要线程安全的单线程操作

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadPoolExample {
public static void main(String[] args) {
// 创建单线程线程池
ExecutorService executorService = Executors.newSingleThreadExecutor();

for (int i = 0; i < 10; i++) {
final int taskId = i;
executorService.submit(() -> {
System.out.println("Task " + taskId + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

executorService.shutdown();
}
}

5. ForkJoinPool(分叉/合并线程池)

特点

  • 适用于处理可以分解为子任务的大规模任务
  • 采用工作窃取算法,提高线程利用率
  • 核心线程数默认为CPU核心数

适用场景

  • 适用于处理大规模的计算任务,如排序、搜索等
  • 适用于需要递归分解任务的场景

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinPoolExample {
public static void main(String[] args) {
// 创建ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool();

// 计算1-10000的和
SumTask task = new SumTask(1, 10000);
Integer result = forkJoinPool.invoke(task);
System.out.println("Sum: " + result);

forkJoinPool.shutdown();
}

static class SumTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 1000;
private int start;
private int end;

public SumTask(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 直接计算
int sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
// 分解任务
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(start, mid);
SumTask rightTask = new SumTask(mid + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 合并结果
return leftTask.join() + rightTask.join();
}
}
}
}

6. 线程池的选择建议

线程池类型 核心线程数 最大线程数 队列类型 适用场景
FixedThreadPool 固定 与核心线程数相同 LinkedBlockingQueue 稳定的并发任务
CachedThreadPool 0 Integer.MAX_VALUE SynchronousQueue 短期突发任务
ScheduledThreadPool 固定 Integer.MAX_VALUE DelayedWorkQueue 定时或周期性任务
SingleThreadPool 1 1 LinkedBlockingQueue 顺序执行任务
ForkJoinPool CPU核心数 CPU核心数 WorkQueue 大规模计算任务

7. 注意事项

  • Executors创建的线程池存在一些问题,如无界队列可能导致内存溢出
  • 对于生产环境,建议使用ThreadPoolExecutor手动创建线程池,以更好地控制线程池参数
  • 线程池的大小应该根据系统资源和任务特性来设置
  • 线程池需要及时关闭,避免资源泄漏

3.3 线程池的核心参数有哪些?

线程池的核心参数是通过ThreadPoolExecutor类的构造函数来设置的,这些参数决定了线程池的行为和性能。以下是线程池的核心参数:

1. corePoolSize(核心线程数)

作用

  • 线程池中的核心线程数量,这些线程会一直存在,即使处于空闲状态
  • 当提交任务时,如果当前线程数小于核心线程数,会创建新的核心线程来执行任务

取值建议

  • 对于CPU密集型任务(如计算),核心线程数建议设置为CPU核心数
  • 对于IO密集型任务(如网络IO、磁盘IO),核心线程数可以设置为CPU核心数的2-4倍

2. maximumPoolSize(最大线程数)

作用

  • 线程池可以创建的最大线程数量
  • 当核心线程数已满,且任务队列也已满时,会创建非核心线程来执行任务
  • 非核心线程在空闲时间超过keepAliveTime后会被回收

取值建议

  • 对于CPU密集型任务,最大线程数建议设置为CPU核心数
  • 对于IO密集型任务,最大线程数可以设置为CPU核心数的2-4倍
  • 最大线程数应该大于或等于核心线程数

3. keepAliveTime(线程空闲时间)

作用

  • 非核心线程的空闲时间阈值
  • 当非核心线程的空闲时间超过这个值时,会被回收
  • 可以通过allowCoreThreadTimeOut(true)使核心线程也遵循这个规则

取值建议

  • 对于执行时间短的任务,keepAliveTime可以设置得短一些
  • 对于执行时间长的任务,keepAliveTime可以设置得长一些
  • 通常设置为60秒

4. workQueue(任务队列)

作用

  • 用于存储待执行的任务
  • 当核心线程数已满时,新提交的任务会进入任务队列

常见的队列类型

  • LinkedBlockingQueue:无界队列,可存储无限个任务,可能导致内存溢出
  • ArrayBlockingQueue:有界队列,可限制任务数量
  • SynchronousQueue:同步队列,不存储任务,直接传递给线程
  • PriorityBlockingQueue:优先级队列,根据任务优先级执行

取值建议

  • 对于固定大小的线程池,建议使用有界队列,避免任务堆积导致内存溢出
  • 对于可缓存的线程池,建议使用SynchronousQueue

5. threadFactory(线程工厂)

作用

  • 用于创建线程的工厂
  • 可以自定义线程的名称、优先级、 daemon状态等

默认实现

  • Executors.defaultThreadFactory():创建的线程具有默认的名称和优先级

自定义线程工厂

  • 可以通过实现ThreadFactory接口来自定义线程工厂

6. handler(拒绝策略)

作用

  • 当线程池和任务队列都已满时,用于处理新提交的任务

常见的拒绝策略

  • AbortPolicy:默认策略,抛出RejectedExecutionException异常
  • CallerRunsPolicy:由提交任务的线程执行任务
  • DiscardPolicy:直接丢弃任务
  • DiscardOldestPolicy:丢弃队列中最旧的任务,然后尝试提交新任务

取值建议

  • 对于关键任务,建议使用AbortPolicy,及时发现问题
  • 对于非关键任务,建议使用DiscardPolicy或DiscardOldestPolicy
  • 对于需要保证任务执行的场景,建议使用CallerRunsPolicy

7. 示例:手动创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolParametersExample {
public static void main(String[] args) {
// 手动创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize:核心线程数
10, // maximumPoolSize:最大线程数
60, // keepAliveTime:线程空闲时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(100), // 任务队列:有界队列,容量为100
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.AbortPolicy() // 拒绝策略
);

// 提交任务
for (int i = 0; i < 150; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

// 关闭线程池
executor.shutdown();
}
}

8. 线程池参数的调优建议

(1) 核心线程数和最大线程数

  • CPU密集型任务:核心线程数 = CPU核心数
  • IO密集型任务:核心线程数 = CPU核心数 * 2

(2) 任务队列

  • 建议使用有界队列,避免任务堆积导致内存溢出
  • 队列大小应该根据任务执行时间和系统资源来设置

(3) 拒绝策略

  • 根据任务的重要性选择合适的拒绝策略
  • 对于关键任务,应该使用能够及时发现问题的策略

(4) 线程工厂

  • 自定义线程工厂,设置有意义的线程名称,便于调试和监控

(5) keepAliveTime

  • 根据任务的执行时间和频率来设置
  • 对于短期任务,可以设置较短的keepAliveTime

9. 注意事项

  • 线程池的参数设置应该根据具体的应用场景来调整
  • 应该监控线程池的状态,如线程数量、任务队列大小等
  • 线程池需要及时关闭,避免资源泄漏
  • 避免使用Executors创建的线程池,因为它们可能存在资源耗尽的风险

3.4 线程池的工作原理?

线程池的工作原理是指线程池如何处理提交的任务,以及如何管理线程的生命周期。以下是线程池的详细工作原理:

1. 线程池的工作流程

当提交一个任务到线程池时,线程池会按照以下步骤处理:

步骤1:检查核心线程数

  • 如果当前线程数小于核心线程数(corePoolSize),创建新的核心线程来执行任务
  • 核心线程会一直存在,即使处于空闲状态(除非设置了allowCoreThreadTimeOut(true)

步骤2:检查任务队列

  • 如果当前线程数达到核心线程数,将任务加入任务队列(workQueue)
  • 任务队列用于存储待执行的任务

步骤3:检查最大线程数

  • 如果任务队列已满,且当前线程数小于最大线程数(maximumPoolSize),创建非核心线程来执行任务
  • 非核心线程在空闲时间超过keepAliveTime后会被回收

步骤4:执行拒绝策略

  • 如果任务队列已满,且当前线程数达到最大线程数,执行拒绝策略(handler)
  • 拒绝策略决定如何处理无法执行的任务

2. 线程池的状态

线程池有以下几种状态:

  • RUNNING:线程池处于运行状态,接受新任务并处理队列中的任务
  • SHUTDOWN:线程池处于关闭状态,不接受新任务,但处理队列中的任务
  • STOP:线程池处于停止状态,不接受新任务,不处理队列中的任务,中断正在执行的任务
  • TIDYING:所有任务已完成,线程数为0,准备进入TERMINATED状态
  • TERMINATED:线程池已终止,所有任务已完成,所有线程已退出

3. 线程池的状态转换

  • RUNNING → SHUTDOWN:调用shutdown()方法
  • RUNNING → STOP:调用shutdownNow()方法
  • SHUTDOWN → TIDYING:队列和线程池都为空
  • STOP → TIDYING:线程池为空
  • TIDYING → TERMINATED:调用terminated()钩子方法完成后

4. 线程池的执行机制

(1) 任务提交

  • 通过execute()方法提交任务
  • 通过submit()方法提交任务(返回Future对象)

(2) 任务执行

  • 线程池中的线程从任务队列中获取任务执行
  • 执行完成后,线程会继续从队列中获取任务,直到队列为空

(3) 线程管理

  • 核心线程:一直存在,除非设置了allowCoreThreadTimeOut(true)
  • 非核心线程:空闲时间超过keepAliveTime后被回收

5. 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolWorkingPrincipleExample {
public static void main(String[] args) {
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60, // 线程空闲时间
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3) // 任务队列容量为3
);

// 提交任务
for (int i = 1; i <= 8; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("Task " + taskId + " is running on thread: " + Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务执行时间
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed");
});
System.out.println("Task " + taskId + " is submitted");
}

// 关闭线程池
executor.shutdown();
}
}

执行过程分析

  • 任务1、2:创建核心线程执行
  • 任务3、4、5:进入任务队列
  • 任务6、7:创建非核心线程执行(因为队列已满,且未达到最大线程数)
  • 任务8:执行拒绝策略(因为队列已满,且已达到最大线程数)

6. 线程池的工作原理总结

  1. 任务提交:通过execute()submit()方法提交任务
  2. 核心线程处理:如果核心线程未满,创建核心线程执行任务
  3. 队列存储:如果核心线程已满,将任务加入队列
  4. 非核心线程处理:如果队列已满,创建非核心线程执行任务
  5. 拒绝策略:如果线程数达到最大值,执行拒绝策略
  6. 线程回收:非核心线程空闲时间超过keepAliveTime后被回收
  7. 线程池关闭:调用shutdown()shutdownNow()方法关闭线程池

7. 线程池的性能优化

(1) 合理设置线程池参数

  • 根据任务类型和系统资源设置核心线程数和最大线程数
  • 使用有界队列,避免任务堆积
  • 选择合适的拒绝策略

(2) 监控线程池状态

  • 监控线程池的线程数量、任务队列大小、任务执行情况等
  • 根据监控结果调整线程池参数

(3) 避免任务执行时间过长

  • 任务执行时间过长会阻塞线程池
  • 对于长时间运行的任务,考虑使用单独的线程池

(4) 合理处理异常

  • 任务执行过程中的异常应该被捕获并处理
  • 避免异常导致线程池中的线程意外终止

8. 注意事项

  • 线程池的参数设置应该根据具体的应用场景来调整
  • 应该监控线程池的状态,及时发现和解决问题
  • 线程池需要及时关闭,避免资源泄漏
  • 避免使用Executors创建的线程池,因为它们可能存在资源耗尽的风险

4. 并发工具类

4.1 什么是volatile关键字?它的作用是什么?

volatile是Java中的一个关键字,用于修饰变量。它的主要作用是保证变量的可见性和禁止指令重排序,从而确保多线程环境下的正确执行。

1. volatile的作用

(1) 保证变量的可见性

  • 当一个线程修改了volatile变量的值,其他线程能够立即看到这个修改
  • 这是因为volatile变量的修改会直接写入主内存,而不是缓存在线程的本地内存中
  • 其他线程读取volatile变量时,会直接从主内存读取,而不是从本地内存读取

(2) 禁止指令重排序

  • 编译器和处理器为了提高性能,会对指令进行重排序
  • volatile变量会禁止指令重排序,保证代码的执行顺序与程序的逻辑顺序一致
  • 这对于依赖于执行顺序的代码(如双重检查锁定单例模式)非常重要

2. volatile的实现原理

(1) 内存屏障

  • volatile变量的读写操作会插入内存屏障
  • 内存屏障会禁止指令重排序,确保内存操作的顺序
  • 内存屏障会强制将本地内存中的修改刷新到主内存,以及从主内存读取最新值

(2) happens-before关系

  • 对volatile变量的写操作 happens-before 后续对该变量的读操作
  • 这保证了volatile变量的修改对其他线程的可见性

3. volatile的使用场景

(1) 状态标志

  • 用于表示某个状态,如线程是否应该停止
  • 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class VolatileFlagExample {
private volatile boolean stopFlag = false;

public void run() {
while (!stopFlag) {
// 执行任务
System.out.println("Running...");
}
System.out.println("Stopped");
}

public void stop() {
stopFlag = true;
}
}

(2) 双重检查锁定单例模式

  • 用于确保单例对象的正确创建
  • 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Singleton {
private static volatile Singleton instance;

private Singleton() {}

public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}

(3) 独立观察

  • 用于观察某个独立的状态变化
  • 示例:
1
2
3
4
5
6
7
8
9
10
11
public class VolatileObserverExample {
private volatile int count = 0;

public void increment() {
count++;
}

public int getCount() {
return count;
}
}

4. volatile与synchronized的区别

特性 volatile synchronized
作用 保证可见性和禁止指令重排序 保证原子性、可见性和有序性
适用范围 变量 代码块或方法
性能 开销小 开销大
原子性 不保证原子性 保证原子性
阻塞 非阻塞 阻塞

5. volatile的局限性

(1) 不保证原子性

  • volatile变量的读写操作是原子的,但复合操作(如i++)不是原子的
  • 示例:
1
2
3
4
5
6
7
8
9
10
11
12
public class VolatileAtomicityExample {
private volatile int count = 0;

// 这个方法不是线程安全的,因为i++不是原子操作
public void increment() {
count++; // 等价于 count = count + 1,包含读取、计算、写入三个操作
}

public int getCount() {
return count;
}
}

(2) 不适合复杂的同步场景

  • 对于需要原子性的复合操作,应该使用synchronized或原子类
  • 示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
private AtomicInteger count = new AtomicInteger(0);

// 这个方法是线程安全的
public void increment() {
count.incrementAndGet();
}

public int getCount() {
return count.get();
}
}

6. 示例:volatile的可见性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class VolatileVisibilityExample {
private static volatile boolean flag = false;

public static void main(String[] args) {
// 线程1:等待flag变为true
new Thread(() -> {
System.out.println("Thread 1: Waiting for flag to be true");
while (!flag) {
// 空循环
}
System.out.println("Thread 1: Flag is now true");
}).start();

// 线程2:修改flag为true
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 2: Setting flag to true");
flag = true;
}).start();
}
}

7. 示例:volatile禁止指令重排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class VolatileReorderingExample {
private static int a = 0, b = 0;
private static int x = 0, y = 0;

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100000; i++) {
a = 0; b = 0;
x = 0; y = 0;

Thread t1 = new Thread(() -> {
a = 1;
x = b;
});

Thread t2 = new Thread(() -> {
b = 1;
y = a;
});

t1.start();
t2.start();
t1.join();
t2.join();

// 如果发生指令重排序,可能会出现x=0且y=0的情况
if (x == 0 && y == 0) {
System.out.println("Reordering detected: x = " + x + ", y = " + y);
break;
}
}
}
}

8. 注意事项

  • volatile适用于简单的状态标志和独立观察场景
  • 对于需要原子性的复合操作,应该使用synchronized或原子类
  • volatile不能替代synchronized,它们有不同的适用场景
  • 合理使用volatile可以提高程序的性能,避免不必要的同步开销

4.2 什么是原子类?Java中的原子类有哪些?

原子类是Java中提供的线程安全的类,用于执行原子操作。原子操作是指不可被中断的操作,要么全部执行成功,要么全部不执行,不会出现中间状态。

1. 原子类的概念

  • 原子类基于CAS(Compare And Swap)操作实现
  • 原子类提供了线程安全的操作,无需使用synchronized关键字
  • 原子类的性能通常比synchronized高

2. Java中的原子类

Java中的原子类主要位于java.util.concurrent.atomic包中,包括以下几类:

(1) 基本类型原子类

  • AtomicInteger:原子整数
  • AtomicLong:原子长整数
  • AtomicBoolean:原子布尔值

(2) 引用类型原子类

  • AtomicReference:原子引用
  • AtomicStampedReference:带版本号的原子引用,解决ABA问题
  • AtomicMarkableReference:带标记的原子引用

(3) 数组类型原子类

  • AtomicIntegerArray:原子整数数组
  • AtomicLongArray:原子长整数数组
  • AtomicReferenceArray:原子引用数组

(4) 字段更新器

  • AtomicIntegerFieldUpdater:原子更新整数字段
  • AtomicLongFieldUpdater:原子更新长整数字段
  • AtomicReferenceFieldUpdater:原子更新引用字段

3. 原子类的使用示例

(1) AtomicInteger示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerExample {
private static AtomicInteger count = new AtomicInteger(0);

public static void main(String[] args) {
// 自增1
int result1 = count.incrementAndGet();
System.out.println("After incrementAndGet: " + result1); // 输出: 1

// 自减1
int result2 = count.decrementAndGet();
System.out.println("After decrementAndGet: " + result2); // 输出: 0

// 先获取值,再自增1
int result3 = count.getAndIncrement();
System.out.println("After getAndIncrement: " + result3); // 输出: 0
System.out.println("Current value: " + count.get()); // 输出: 1

// 先获取值,再自减1
int result4 = count.getAndDecrement();
System.out.println("After getAndDecrement: " + result4); // 输出: 1
System.out.println("Current value: " + count.get()); // 输出: 0

// 设置新值
count.set(10);
System.out.println("After set: " + count.get()); // 输出: 10

// 先获取值,再设置新值
int result5 = count.getAndSet(20);
System.out.println("After getAndSet: " + result5); // 输出: 10
System.out.println("Current value: " + count.get()); // 输出: 20

// 比较并交换
boolean success = count.compareAndSet(20, 30);
System.out.println("CompareAndSet success: " + success); // 输出: true
System.out.println("Current value: " + count.get()); // 输出: 30

// 比较并交换失败
success = count.compareAndSet(20, 40);
System.out.println("CompareAndSet success: " + success); // 输出: false
System.out.println("Current value: " + count.get()); // 输出: 30
}
}

(2) AtomicReference示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceExample {
static class User {
private String name;
private int age;

public User(String name, int age) {
this.name = name;
this.age = age;
}

@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}

public static void main(String[] args) {
AtomicReference<User> atomicReference = new AtomicReference<>();

// 设置值
User user1 = new User("Alice", 25);
atomicReference.set(user1);
System.out.println("Current user: " + atomicReference.get()); // 输出: User{name='Alice', age=25}

// 比较并交换
User user2 = new User("Bob", 30);
boolean success = atomicReference.compareAndSet(user1, user2);
System.out.println("CompareAndSet success: " + success); // 输出: true
System.out.println("Current user: " + atomicReference.get()); // 输出: User{name='Bob', age=30}

// 比较并交换失败
User user3 = new User("Charlie", 35);
success = atomicReference.compareAndSet(user1, user3);
System.out.println("CompareAndSet success: " + success); // 输出: false
System.out.println("Current user: " + atomicReference.get()); // 输出: User{name='Bob', age=30}
}
}

(3) AtomicStampedReference示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.atomic.AtomicStampedReference;

public class AtomicStampedReferenceExample {
public static void main(String[] args) {
// 初始值为100,版本号为0
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 0);

// 获取当前值和版本号
int currentValue = atomicStampedReference.getReference();
int currentStamp = atomicStampedReference.getStamp();
System.out.println("Current value: " + currentValue + ", stamp: " + currentStamp); // 输出: Current value: 100, stamp: 0

// 比较并交换(值和版本号都要匹配)
boolean success = atomicStampedReference.compareAndSet(100, 200, 0, 1);
System.out.println("CompareAndSet success: " + success); // 输出: true
System.out.println("Current value: " + atomicStampedReference.getReference() + ", stamp: " + atomicStampedReference.getStamp()); // 输出: Current value: 200, stamp: 1

// 尝试更新,但版本号不匹配
success = atomicStampedReference.compareAndSet(200, 300, 0, 2);
System.out.println("CompareAndSet success: " + success); // 输出: false
System.out.println("Current value: " + atomicStampedReference.getReference() + ", stamp: " + atomicStampedReference.getStamp()); // 输出: Current value: 200, stamp: 1

// 正确的版本号
success = atomicStampedReference.compareAndSet(200, 300, 1, 2);
System.out.println("CompareAndSet success: " + success); // 输出: true
System.out.println("Current value: " + atomicStampedReference.getReference() + ", stamp: " + atomicStampedReference.getStamp()); // 输出: Current value: 300, stamp: 2
}
}

4. 原子类的实现原理

(1) CAS操作

  • CAS(Compare And Swap)是一种原子操作,它比较内存中的值与预期值,如果相等则更新为新值
  • CAS操作由硬件支持,是原子的
  • CAS操作的三个参数:内存地址、预期值、新值

(2) ABA问题

  • ABA问题是指一个值从A变为B,再变回A,导致CAS操作误判
  • 解决方法:使用AtomicStampedReference,通过版本号来避免ABA问题

5. 原子类的优缺点

优点

  • 线程安全,无需使用synchronized
  • 性能高,避免了线程上下文切换的开销
  • 提供了丰富的原子操作

缺点

  • 只能保证单个变量的原子性
  • 对于复杂的复合操作,需要使用其他同步机制
  • 可能会导致ABA问题(使用AtomicStampedReference可以解决)

6. 原子类的适用场景

  • 计数器:如统计网站访问量、请求次数等
  • 状态标志:如线程的运行状态
  • 并发安全的单例模式:使用AtomicReference实现
  • 无锁数据结构:如无锁队列、无锁栈等

7. 注意事项

  • 原子类适用于简单的并发场景
  • 对于复杂的并发操作,可能需要使用其他同步机制
  • 合理使用原子类可以提高程序的性能
  • 注意ABA问题,必要时使用AtomicStampedReference

4.3 什么是ThreadLocal?它的作用是什么?

ThreadLocal是Java中提供的一个线程本地变量工具类,它允许每个线程拥有自己独立的变量副本,从而避免线程安全问题和简化线程间的数据传递。

1. ThreadLocal的概念

  • ThreadLocal为每个线程创建一个独立的变量副本
  • 线程可以通过ThreadLocal获取和设置自己的变量副本
  • 不同线程之间的变量副本互不影响

2. ThreadLocal的作用

(1) 为每个线程提供独立的变量副本

  • 每个线程都有自己的变量副本,线程间不会互相干扰
  • 避免了线程安全问题

(2) 简化线程间的数据传递

  • 可以在不同方法之间传递数据,而不需要通过方法参数
  • 适用于在同一个线程内的不同组件之间共享数据

(3) 避免参数传递的繁琐

  • 当多个方法需要使用同一个变量时,可以使用ThreadLocal来存储,避免参数传递

3. ThreadLocal的使用示例

(1) 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ThreadLocalExample {
// 创建ThreadLocal对象
private static ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static void main(String[] args) {
// 线程1
new Thread(() -> {
threadLocal.set("Thread 1 value");
System.out.println("Thread 1: " + threadLocal.get());
// 清理资源
threadLocal.remove();
}).start();

// 线程2
new Thread(() -> {
threadLocal.set("Thread 2 value");
System.out.println("Thread 2: " + threadLocal.get());
// 清理资源
threadLocal.remove();
}).start();
}
}

(2) 在Web应用中的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class UserContext {
private static ThreadLocal<User> userThreadLocal = new ThreadLocal<>();

public static void setUser(User user) {
userThreadLocal.set(user);
}

public static User getUser() {
return userThreadLocal.get();
}

public static void removeUser() {
userThreadLocal.remove();
}
}

// 在过滤器中设置用户信息
public class UserFilter implements Filter {
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
// 从请求中获取用户信息
User user = getUserFromRequest(request);
// 设置到ThreadLocal中
UserContext.setUser(user);
try {
chain.doFilter(request, response);
} finally {
// 清理资源
UserContext.removeUser();
}
}
}

// 在Service中获取用户信息
@Service
public class UserService {
public void process() {
// 从ThreadLocal中获取用户信息
User user = UserContext.getUser();
System.out.println("Processing for user: " + user.getUsername());
}
}

4. ThreadLocal的实现原理

(1) 内部结构

  • ThreadLocal内部维护了一个ThreadLocalMap,每个线程都有自己的ThreadLocalMap
  • ThreadLocalMap是一个特殊的Map,使用ThreadLocal作为key,存储线程的变量副本

(2) 核心方法

  • **set(T value)**:设置当前线程的变量副本
  • **get()**:获取当前线程的变量副本
  • **remove()**:移除当前线程的变量副本

(3) 工作原理

  • 当调用set()方法时,ThreadLocal会获取当前线程的ThreadLocalMap,并将变量存储到其中
  • 当调用get()方法时,ThreadLocal会获取当前线程的ThreadLocalMap,并从中获取变量
  • 当调用remove()方法时,ThreadLocal会从当前线程的ThreadLocalMap中移除变量

5. ThreadLocal的内存泄漏问题

(1) 内存泄漏的原因

  • ThreadLocalMap中的Entry使用ThreadLocal作为key,并且是弱引用(WeakReference)
  • 当ThreadLocal对象被回收后,Entry的key会变为null
  • 如果线程没有结束,ThreadLocalMap中的Entry会一直存在,导致内存泄漏

(2) 如何避免内存泄漏

  • 使用完毕后调用remove()方法清理资源
  • 在使用ThreadLocal的地方,确保在finally块中调用remove()方法

6. ThreadLocal的优缺点

优点

  • 线程安全,无需使用synchronized
  • 简化线程间的数据传递
  • 避免参数传递的繁琐

缺点

  • 可能导致内存泄漏(需要手动调用remove()方法)
  • 不适合在多线程间共享数据
  • 可能会导致代码可读性下降

7. ThreadLocal的适用场景

  • Web应用:存储用户会话信息
  • 数据库连接:为每个线程提供独立的数据库连接
  • 事务管理:存储事务上下文
  • 日期格式化:为每个线程提供独立的SimpleDateFormat实例

8. 注意事项

  • 使用完毕后一定要调用remove()方法清理资源,避免内存泄漏
  • 不要在ThreadLocal中存储大对象,以免占用过多内存
  • 注意ThreadLocal的作用域,避免在不需要的地方使用
  • 理解ThreadLocal的实现原理,避免滥用

4.4 什么是CountDownLatch和CyclicBarrier?它们的区别是什么?

CountDownLatchCyclicBarrier都是Java中提供的并发工具类,用于协调多个线程的执行。它们的主要作用是让线程等待其他线程完成或到达某个点。

1. CountDownLatch

(1) 概念

  • CountDownLatch是一个倒计时门闩,用于等待多个线程完成
  • 它通过一个计数器来实现,当计数器减到0时,等待的线程被唤醒

(2) 核心方法

  • **CountDownLatch(int count)**:构造方法,设置计数器的初始值
  • **await()**:等待计数器减到0
  • **countDown()**:计数器减1

(3) 使用场景

  • 等待多个线程完成后再继续执行
  • 主线程等待所有子线程完成后再结束

(4) 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
// 创建CountDownLatch,设置计数器为3
CountDownLatch latch = new CountDownLatch(3);

// 创建3个线程
for (int i = 1; i <= 3; i++) {
final int taskId = i;
new Thread(() -> {
System.out.println("Task " + taskId + " is running");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + taskId + " is completed");
// 计数器减1
latch.countDown();
}).start();
}

System.out.println("Main thread is waiting for all tasks to complete");
// 等待计数器减到0
latch.await();
System.out.println("All tasks are completed, main thread continues");
}
}

2. CyclicBarrier

(1) 概念

  • CyclicBarrier是一个循环屏障,用于等待所有线程到达某个点
  • 当所有线程到达后,屏障被打开,所有线程继续执行
  • 它的计数器可以重置,因此可以重复使用

(2) 核心方法

  • **CyclicBarrier(int parties)**:构造方法,设置参与的线程数
  • **CyclicBarrier(int parties, Runnable barrierAction)**:构造方法,设置参与的线程数和屏障打开时执行的任务
  • **await()**:等待所有线程到达
  • **reset()**:重置计数器

(3) 使用场景

  • 多个线程需要在某个点同步后再继续执行
  • 多个线程需要分阶段执行任务,每个阶段完成后再进入下一阶段

(4) 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
public static void main(String[] args) {
// 创建CyclicBarrier,设置参与的线程数为3,屏障打开时执行的任务
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads have arrived at the barrier");
});

// 创建3个线程
for (int i = 1; i <= 3; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("Thread " + threadId + " is running");
Thread.sleep(1000);
System.out.println("Thread " + threadId + " has arrived at the barrier");
// 等待所有线程到达
barrier.await();
System.out.println("Thread " + threadId + " continues after the barrier");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}

3. CountDownLatch和CyclicBarrier的区别

特性 CountDownLatch CyclicBarrier
计数器使用 计数器只能使用一次 计数器可以重置,重复使用
等待方式 等待其他线程完成 等待所有线程到达某个点
实现原理 基于AQS(AbstractQueuedSynchronizer) 基于ReentrantLock和Condition
使用场景 主线程等待所有子线程完成 多个线程相互等待,同步后继续执行
线程角色 有等待线程和计数线程之分 所有线程都是等待线程
异常处理 计数线程异常不影响其他线程 一个线程异常会导致所有线程异常
屏障动作 无屏障动作 可以设置屏障打开时执行的任务

4. 适用场景对比

CountDownLatch适用场景

  • 主线程等待多个子线程完成初始化
  • 多个线程等待某个资源准备就绪
  • 实现多个线程的协调执行

CyclicBarrier适用场景

  • 多个线程分阶段执行任务,每个阶段完成后同步
  • 多线程计算,最后合并结果
  • 模拟并发测试

5. 注意事项

CountDownLatch

  • 计数器必须减到0,否则等待的线程会一直阻塞
  • 计数器不能重置,只能使用一次
  • 适用于一次性的同步场景

CyclicBarrier

  • 所有线程必须调用await()方法,否则其他线程会一直阻塞
  • 可以通过reset()方法重置计数器,重复使用
  • 适用于需要多次同步的场景
  • 一个线程异常会导致所有线程异常,需要注意异常处理

5. 并发编程的其他问题

5.1 什么是乐观锁和悲观锁?

乐观锁悲观锁是并发编程中两种不同的锁策略,用于解决多线程并发访问共享资源时的冲突问题。

1. 乐观锁

(1) 概念

  • 乐观锁假设不会发生冲突,因此在访问资源时不会立即获取锁
  • 只有在更新资源时才检查是否发生冲突
  • 如果发生冲突,则重试或采取其他策略

(2) 实现方式

  • 版本号机制:为数据添加版本号,每次更新时版本号加1,更新前检查版本号是否一致
  • CAS操作:使用Compare And Swap操作,比较内存中的值与预期值,如果相等则更新

(3) 适用场景

  • 读操作远多于写操作的场景
  • 冲突概率较低的场景
  • 对性能要求较高的场景

(4) 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 使用版本号机制实现乐观锁
public class OptimisticLockExample {
private int value;
private int version;

public synchronized boolean update(int newValue, int expectedVersion) {
if (version == expectedVersion) {
value = newValue;
version++;
return true;
}
return false;
}

public synchronized int getValue() {
return value;
}

public synchronized int getVersion() {
return version;
}
}

// 使用CAS操作实现乐观锁
import java.util.concurrent.atomic.AtomicInteger;

public class CASExample {
private AtomicInteger value = new AtomicInteger(0);

public void increment() {
int current;
do {
current = value.get();
} while (!value.compareAndSet(current, current + 1));
}

public int getValue() {
return value.get();
}
}

2. 悲观锁

(1) 概念

  • 悲观锁假设会发生冲突,因此在访问资源前先获取锁
  • 确保同一时间只有一个线程能访问资源
  • 其他线程需要等待锁释放后才能访问

(2) 实现方式

  • synchronized关键字:Java内置的锁机制
  • ReentrantLock:可重入锁,提供更多的锁操作
  • 读写锁:ReadWriteLock,支持读写分离

(3) 适用场景

  • 写操作远多于读操作的场景
  • 冲突概率较高的场景
  • 对数据一致性要求较高的场景

(4) 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 使用synchronized实现悲观锁
public class PessimisticLockExample {
private int value;

public synchronized void increment() {
value++;
}

public synchronized int getValue() {
return value;
}
}

// 使用ReentrantLock实现悲观锁
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
private int value;
private ReentrantLock lock = new ReentrantLock();

public void increment() {
lock.lock();
try {
value++;
} finally {
lock.unlock();
}
}

public int getValue() {
lock.lock();
try {
return value;
} finally {
lock.unlock();
}
}
}

3. 乐观锁和悲观锁的区别

特性 乐观锁 悲观锁
锁策略 假设不会发生冲突 假设会发生冲突
实现方式 版本号机制、CAS操作 synchronized、ReentrantLock
适用场景 读多写少,冲突概率低 写多读少,冲突概率高
性能 无锁操作,性能高 有锁操作,性能较低
复杂度 实现复杂,需要处理冲突 实现简单,直接获取锁
死锁 不会发生死锁 可能发生死锁

4. 乐观锁和悲观锁的选择

选择乐观锁的情况

  • 读操作远多于写操作
  • 冲突概率较低
  • 对性能要求较高
  • 数据一致性要求不是特别严格

选择悲观锁的情况

  • 写操作远多于读操作
  • 冲突概率较高
  • 对数据一致性要求较高
  • 实现简单,易于理解

5. 注意事项

  • 乐观锁需要处理冲突,可能导致重试次数过多,影响性能
  • 悲观锁可能导致线程阻塞,影响并发性能
  • 在高并发场景下,应根据具体情况选择合适的锁策略
  • 可以结合使用两种锁策略,例如:读操作使用乐观锁,写操作使用悲观锁

5.2 什么是CAS操作?它的优缺点是什么?

CAS(Compare And Swap)是一种原子操作,它比较内存中的值与预期值,如果相等则更新为新值。CAS操作是实现乐观锁的核心技术,也是许多并发工具类的基础。

1. CAS操作的工作原理

  • 操作过程:CAS操作包含三个参数:内存地址(V)、预期值(A)和新值(B)。当且仅当内存地址V的值等于预期值A时,才将内存地址V的值更新为B,否则不做任何操作。
  • 原子性:CAS操作是由硬件支持的原子操作,确保了操作的原子性。
  • 返回值:CAS操作返回一个布尔值,表示操作是否成功。

2. CAS操作的实现

在Java中,CAS操作主要通过Unsafe类实现,该类提供了底层的CAS操作方法。例如,AtomicInteger类就是使用CAS操作来实现原子更新的。

3. CAS操作的示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.atomic.AtomicInteger;

public class CASExample {
private static AtomicInteger count = new AtomicInteger(0);

public static void main(String[] args) {
// 尝试将count从0更新为1
boolean success = count.compareAndSet(0, 1);
System.out.println("First CAS: " + success + ", count = " + count.get()); // 输出: First CAS: true, count = 1

// 尝试将count从0更新为2(失败,因为count现在是1)
success = count.compareAndSet(0, 2);
System.out.println("Second CAS: " + success + ", count = " + count.get()); // 输出: Second CAS: false, count = 1

// 尝试将count从1更新为2(成功)
success = count.compareAndSet(1, 2);
System.out.println("Third CAS: " + success + ", count = " + count.get()); // 输出: Third CAS: true, count = 2
}
}

4. CAS操作的优点

  • 无锁操作:CAS操作不需要获取锁,避免了线程上下文切换的开销,性能更高。
  • 非阻塞:CAS操作不会阻塞线程,即使操作失败也会立即返回,不会导致线程挂起。
  • 原子性:CAS操作是原子的,确保了操作的正确性。
  • 适用范围广:CAS操作是许多并发工具类的基础,如原子类、ConcurrentHashMap等。

5. CAS操作的缺点

  • ABA问题

    • 概念:当一个值从A变为B,再变回A时,CAS操作会误判为没有发生变化。
    • 解决方法:使用版本号或时间戳,如AtomicStampedReference。
  • 自旋时间过长

    • 概念:当CAS操作失败时,线程会不断重试,导致CPU资源浪费。
    • 解决方法:设置重试次数上限,或使用Backoff策略。
  • 只能保证单个变量的原子性

    • 概念:CAS操作只能保证单个变量的原子性,无法保证多个变量的原子性。
    • 解决方法:使用锁或其他同步机制。
  • 开销

    • 概念:CAS操作需要不断比较和尝试,可能会导致缓存一致性流量增加。
    • 解决方法:合理使用CAS操作,避免频繁的CAS操作。

6. ABA问题的解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.atomic.AtomicStampedReference;

public class ABASolutionExample {
public static void main(String[] args) {
// 创建AtomicStampedReference,初始值为100,版本号为0
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100, 0);

// 线程1:模拟ABA问题
new Thread(() -> {
int[] stampHolder = new int[1];
int currentValue = atomicStampedReference.get(stampHolder);
int currentStamp = stampHolder[0];
System.out.println("Thread 1: Current value = " + currentValue + ", stamp = " + currentStamp);

// 模拟ABA问题:先将值从100改为200
atomicStampedReference.compareAndSet(100, 200, currentStamp, currentStamp + 1);
System.out.println("Thread 1: After first update - value = " + atomicStampedReference.getReference() + ", stamp = " + atomicStampedReference.getStamp());

// 再将值从200改回100
int[] stampHolder2 = new int[1];
currentValue = atomicStampedReference.get(stampHolder2);
currentStamp = stampHolder2[0];
atomicStampedReference.compareAndSet(200, 100, currentStamp, currentStamp + 1);
System.out.println("Thread 1: After second update - value = " + atomicStampedReference.getReference() + ", stamp = " + atomicStampedReference.getStamp());
}).start();

// 线程2:尝试更新值,使用版本号避免ABA问题
new Thread(() -> {
try {
Thread.sleep(1000); // 等待线程1完成ABA操作
} catch (InterruptedException e) {
e.printStackTrace();
}

int[] stampHolder = new int[1];
int currentValue = atomicStampedReference.get(stampHolder);
int currentStamp = stampHolder[0];
System.out.println("Thread 2: Current value = " + currentValue + ", stamp = " + currentStamp);

// 尝试更新值,由于版本号不匹配,操作会失败
boolean success = atomicStampedReference.compareAndSet(100, 300, 0, 1);
System.out.println("Thread 2: CAS success = " + success + ", value = " + atomicStampedReference.getReference() + ", stamp = " + atomicStampedReference.getStamp());
}).start();
}
}

7. CAS操作的适用场景

  • 计数器:如AtomicInteger、AtomicLong等。
  • 并发安全的单例模式:使用AtomicReference实现。
  • 无锁数据结构:如ConcurrentLinkedQueue、ConcurrentHashMap等。
  • 乐观锁:实现版本号机制。

8. 注意事项

  • 合理使用CAS操作,避免频繁的CAS操作导致性能下降。
  • 注意ABA问题,必要时使用AtomicStampedReference。
  • 对于复杂的并发操作,可能需要使用锁或其他同步机制。
  • 理解CAS操作的原理,避免滥用。

5.3 什么是线程安全?如何实现线程安全?

线程安全是指多个线程同时访问共享资源时,不会导致数据不一致、数据丢失或其他问题,保证程序的正确性和可靠性。

1. 线程安全的概念

  • 数据一致性:多个线程访问共享资源时,数据的状态是一致的,不会出现部分更新的情况。
  • 原子性:操作要么全部执行成功,要么全部不执行,不会出现中间状态。
  • 可见性:一个线程对共享资源的修改,其他线程能够立即看到。
  • 有序性:程序的执行顺序与代码的逻辑顺序一致,不会出现指令重排序。

2. 实现线程安全的方法

(1) 使用synchronized关键字

  • 概念:synchronized是Java内置的同步机制,用于保证方法或代码块的原子性。
  • 原理:synchronized通过获取对象的锁来实现同步,同一时间只有一个线程能够获取锁并执行代码。
  • 使用方式
    • 修饰方法:public synchronized void method() { ... }
    • 修饰代码块:synchronized (object) { ... }
  • 优点:简单易用,无需手动释放锁。
  • 缺点:性能较低,不支持超时和中断。

(2) 使用ReentrantLock

  • 概念:ReentrantLock是Java提供的可重入锁,实现了Lock接口。
  • 原理:ReentrantLock通过AQS(AbstractQueuedSynchronizer)实现,提供了更灵活的锁操作。
  • 使用方式
    1
    2
    3
    4
    5
    6
    7
    ReentrantLock lock = new ReentrantLock();
    lock.lock();
    try {
    // 临界区代码
    } finally {
    lock.unlock();
    }
  • 优点:支持超时、中断,提供了公平锁和非公平锁的选择。
  • 缺点:需要手动释放锁,代码复杂度较高。

(3) 使用volatile关键字

  • 概念:volatile是Java中的关键字,用于保证变量的可见性和禁止指令重排序。
  • 原理:volatile变量的修改会直接写入主内存,其他线程读取时会直接从主内存读取。
  • 使用方式private volatile int count = 0;
  • 优点:性能高,无需获取锁。
  • 缺点:不保证原子性,只适用于简单的状态标志。

(4) 使用原子类

  • 概念:原子类是Java提供的线程安全的类,用于执行原子操作。
  • 原理:原子类基于CAS(Compare And Swap)操作实现,保证操作的原子性。
  • 使用方式AtomicInteger count = new AtomicInteger(0);
  • 优点:线程安全,性能高。
  • 缺点:只能保证单个变量的原子性。

(5) 使用线程安全的集合类

  • 概念:Java提供了线程安全的集合类,如ConcurrentHashMap、CopyOnWriteArrayList等。
  • 原理:线程安全的集合类通过内部的同步机制保证线程安全。
  • 使用方式ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
  • 优点:使用方便,无需手动同步。
  • 缺点:性能可能比非线程安全的集合类低。

(6) 使用线程本地变量

  • 概念:ThreadLocal是Java提供的线程本地变量工具类,每个线程都有自己的变量副本。
  • 原理:ThreadLocal为每个线程创建一个独立的变量副本,线程间互不影响。
  • 使用方式private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
  • 优点:线程安全,无需同步。
  • 缺点:可能导致内存泄漏,需要手动清理。

3. 线程安全实现方法的对比

方法 优点 缺点 适用场景
synchronized 简单易用,无需手动释放锁 性能较低,不支持超时和中断 简单的同步场景
ReentrantLock 支持超时、中断,提供公平锁选择 需要手动释放锁,代码复杂度高 复杂的同步场景
volatile 性能高,无需获取锁 不保证原子性 简单的状态标志
原子类 线程安全,性能高 只能保证单个变量的原子性 计数器、状态标志
线程安全的集合类 使用方便,无需手动同步 性能可能较低 多线程环境下的集合操作
线程本地变量 线程安全,无需同步 可能导致内存泄漏 线程间数据隔离

4. 线程安全的最佳实践

  • 最小化同步范围:只同步必要的代码块,减少锁的竞争。
  • 选择合适的同步机制:根据具体场景选择合适的线程安全实现方法。
  • 使用不可变对象:不可变对象天然是线程安全的。
  • 避免共享可变状态:尽量减少共享可变状态的使用。
  • 使用线程安全的集合类:在多线程环境下使用线程安全的集合类。
  • 合理使用锁:避免死锁和活锁,合理设置锁的粒度。

5. 示例

(1) 使用synchronized实现线程安全

1
2
3
4
5
6
7
8
9
10
11
public class SynchronizedExample {
private int count = 0;

public synchronized void increment() {
count++;
}

public synchronized int getCount() {
return count;
}
}

(2) 使用ReentrantLock实现线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockExample {
private int count = 0;
private ReentrantLock lock = new ReentrantLock();

public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}

public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}

(3) 使用原子类实现线程安全

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.atomic.AtomicInteger;

public class AtomicExample {
private AtomicInteger count = new AtomicInteger(0);

public void increment() {
count.incrementAndGet();
}

public int getCount() {
return count.get();
}
}

6. 注意事项

  • 线程安全的实现需要根据具体场景选择合适的方法。
  • 避免过度同步,以免影响性能。
  • 注意锁的粒度,避免锁竞争。
  • 注意线程安全的集合类的使用场景。
  • 合理使用线程本地变量,避免内存泄漏。

5.4 什么是生产者-消费者模式?如何实现?

生产者-消费者模式是一种设计模式,它将生产和消费分离,通过缓冲区进行通信,从而实现解耦和提高系统的并发性能。

1. 生产者-消费者模式的概念

  • 生产者:负责生成数据的线程或进程
  • 消费者:负责处理数据的线程或进程
  • 缓冲区:用于存储生产者生成的数据,供消费者使用
  • 解耦:生产者和消费者之间通过缓冲区进行通信,彼此不直接依赖
  • 平衡:缓冲区可以平衡生产者和消费者的速度差异

2. 生产者-消费者模式的实现方式

(1) 使用wait()和notify()方法

  • 原理:使用Object类的wait()和notify()方法实现线程间的通信

  • 实现步骤

    1. 缓冲区满时,生产者线程等待
    2. 缓冲区空时,消费者线程等待
    3. 生产者生产数据后,通知消费者线程
    4. 消费者消费数据后,通知生产者线程
  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ProducerConsumerExample {
private final int MAX_SIZE = 10;
private final List<Integer> buffer = new ArrayList<>();

public synchronized void produce(int value) throws InterruptedException {
while (buffer.size() == MAX_SIZE) {
System.out.println("Buffer is full, producer is waiting");
wait(); // 缓冲区满,生产者等待
}
buffer.add(value);
System.out.println("Produced: " + value);
notify(); // 通知消费者
}

public synchronized int consume() throws InterruptedException {
while (buffer.isEmpty()) {
System.out.println("Buffer is empty, consumer is waiting");
wait(); // 缓冲区空,消费者等待
}
int value = buffer.remove(0);
System.out.println("Consumed: " + value);
notify(); // 通知生产者
return value;
}

public static void main(String[] args) {
ProducerConsumerExample example = new ProducerConsumerExample();

// 生产者线程
new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
example.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费者线程
new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
example.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

(2) 使用BlockingQueue

  • 原理:BlockingQueue是Java提供的阻塞队列,内置了线程安全的操作

  • 实现步骤

    1. 生产者使用put()方法向队列中添加数据(如果队列满,会阻塞)
    2. 消费者使用take()方法从队列中获取数据(如果队列空,会阻塞)
  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueExample {
private final int MAX_SIZE = 10;
private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);

public void produce(int value) throws InterruptedException {
queue.put(value); // 队列满时阻塞
System.out.println("Produced: " + value);
}

public int consume() throws InterruptedException {
int value = queue.take(); // 队列空时阻塞
System.out.println("Consumed: " + value);
return value;
}

public static void main(String[] args) {
BlockingQueueExample example = new BlockingQueueExample();

// 生产者线程
new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
example.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费者线程
new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
example.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

(3) 使用信号量

  • 原理:使用Semaphore来控制对缓冲区的访问

  • 实现步骤

    1. 使用一个信号量控制空闲空间的数量
    2. 使用另一个信号量控制已使用空间的数量
    3. 生产者获取空闲空间信号量,生产数据后释放已使用空间信号量
    4. 消费者获取已使用空间信号量,消费数据后释放空闲空间信号量
  • 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class SemaphoreExample {
private final int MAX_SIZE = 10;
private final int[] buffer = new int[MAX_SIZE];
private final AtomicInteger in = new AtomicInteger(0);
private final AtomicInteger out = new AtomicInteger(0);
private final Semaphore empty = new Semaphore(MAX_SIZE); // 空闲空间信号量
private final Semaphore full = new Semaphore(0); // 已使用空间信号量

public void produce(int value) throws InterruptedException {
empty.acquire(); // 获取空闲空间
buffer[in.get() % MAX_SIZE] = value;
System.out.println("Produced: " + value + " at position " + (in.get() % MAX_SIZE));
in.incrementAndGet();
full.release(); // 释放已使用空间
}

public int consume() throws InterruptedException {
full.acquire(); // 获取已使用空间
int value = buffer[out.get() % MAX_SIZE];
System.out.println("Consumed: " + value + " from position " + (out.get() % MAX_SIZE));
out.incrementAndGet();
empty.release(); // 释放空闲空间
return value;
}

public static void main(String[] args) {
SemaphoreExample example = new SemaphoreExample();

// 生产者线程
new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
example.produce(i);
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

// 消费者线程
new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
example.consume();
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

3. 生产者-消费者模式的优缺点

优点

  • 解耦:生产者和消费者之间通过缓冲区进行通信,彼此不直接依赖
  • 平衡:缓冲区可以平衡生产者和消费者的速度差异
  • 并发:生产者和消费者可以并行执行,提高系统的并发性能
  • 可扩展性:可以根据需要增加生产者或消费者的数量

缺点

  • 缓冲区溢出:如果生产者的速度远快于消费者,可能导致缓冲区溢出
  • 死锁:如果实现不当,可能导致死锁
  • 资源消耗:缓冲区需要占用一定的内存资源

4. 生产者-消费者模式的适用场景

  • 消息队列:如Kafka、RabbitMQ等
  • 线程池:线程池中的任务队列
  • GUI应用:UI线程和工作线程之间的通信
  • 网络编程:服务器处理客户端请求
  • 批处理系统:数据处理 pipeline

5. 注意事项

  • 缓冲区大小:缓冲区的大小应该根据实际情况设置,避免过大或过小
  • 线程安全:确保缓冲区的操作是线程安全的
  • 异常处理:处理可能的异常,避免程序崩溃
  • 资源管理:及时释放资源,避免资源泄漏
  • 性能优化:根据实际情况选择合适的实现方式

6. 生产者-消费者模式的最佳实践

  • 使用BlockingQueue:在Java中,推荐使用BlockingQueue来实现生产者-消费者模式,它内置了线程安全的操作,使用方便
  • 合理设置缓冲区大小:根据生产和消费的速度差异,设置合适的缓冲区大小
  • 使用线程池:使用线程池来管理生产者和消费者线程,提高线程的利用率
  • 监控和调优:监控系统的运行状态,根据实际情况进行调优