JUC 提供了一套锁对象,分别是:Lock 接口、AbstractQueuedSynchronizaer(队列同步器)、ReentrantLock、ReentrantReadWriteLock、Condition、LockSupport。
Lock 接口
Lock 接口定义了锁工具常用的方法,ReentrantLock、ReentrantReadWriteLock 都是 Lock 的具体实现。
- lock()用于加锁
- unlock()释放锁
- lockInterruptibly()可中断加锁,线程持有锁后可被终端,中断后抛出一个异常以通知线程
- tryLock(), 尝试加锁,并返回获取锁的结果,后续需要根据返回结果自行实现阻塞。
- newConditioin()获取 Condition 对象,Condition 对象以在 Lock 锁的同步块内进行阻塞(await)和通知其他线程执行操作(singal),功能同 Ojbect 的 wait/notify 方法。
AbstractQueuedSynchronizaer 同步器
AbstractQueueSynchronizaer(抽象队列同步器)是一个抽象类,内置 FIFO 队列,是 JUC 的并发包实现的核心,它的设计者希望它能能成为大部分并发需求实现的基础。很多同步器都是继承 AbstractQueueSynchronizaer 实现的,重入锁、读写锁中锁都是继承 AQS。
ReentrantLock 中锁
ReentrantLock 的非公平锁
ReentrantLock 的公平锁
使用 AQS
使用同步器时使用以下三个方法改变状态:
- getState() 获取当前同步状态
- setState() 设置当前同步状态
- compareAndSetState() 使用 CAS 设置当前状态,该方法能保证以原子操作设置状态
使用同步器应该重写的方法:
方法 | 说明 |
---|---|
boolean tryAcquire(int arg) | 独占方式获取同步状态,CAS 更新状态,arg 为锁状态,例如加锁状态是 1,那么调用参数应该传参 1,下同 |
boolean tryRealease(int arg) | 独占释放同步,释放后等待获取同步的线程将有机会获取锁 🔒 |
int tryAcquireShared(int arg) | 共享获取同步状态,返回值大于 0 表示成功,反之失败 |
boolean tryReleaseShared(int arg) | 共享释放同步状态 |
boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程占用 |
同步器提供的模板方法:
这些方法可以直接使用,并且这些方法和 Lock 接口中的方法很像。
方法 | 说明 |
---|---|
void acquire(int arg) | 独占获取同步状态,如果当前线程获取同步状态成功,则有该方法返回,否则进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与 acquire()相同但该方法会相应中断,当前线程为获取到同步状态就进入同步队列,如果当前线程中断则抛出 InterruptedException并返回 |
voidtryAcquireNanos(int arg, long nanos) | 在 acquireInterruptibly(int arg)方法上增加了超时等待功能,并且有返回值,未超时且获取到同步状态返回 true,超时返回 false |
void acquireShared(int arg) | 共享式获取同步状态如果当前线程未获取到同步状态,将会进入同步队列等待,与独占获取的主要区别是同一时刻可以由多个线程获取到状态 |
void acquireSharedInterruptibly(int arg) | 与 acquireShare(int arg)方法相同,不用点是该方法相应中断 |
boolean acquireSharedNanos(int arg, long nanos) | 在 acquireSharedInterruptibly(int arg)基础上增加了超时等待 |
boolean release(int arg) | 独占方式释放同步状态,释放同步状态后,阻塞队列的第一个线程将被唤醒 |
boolean releaseShared(int arg) | 共享式释放锁 |
词汇解释:
- 超时等待:在执行操作是等待固定的时间,如果超出了这个等待时间就不等待了,然后继续往下执行,通常超时等待方法都有返回值表面请求的结果(如
boolean acquireSharedNanos(int arg, long nanos)
,也有没有返回值的:Object.wait(long timeout)、LockSupport.park(long nanos) - “如果获取到锁,从方法中返回”:意思是不在请求方法中阻塞了,该方法执行完毕或 return 返回值了
- 独占、共享:独占同一时刻被以可线程访问或占用,共享式多个
使用 AQS 实现非重入互斥锁
1// 非重入互斥锁
2// 锁定义:采用同步非阻塞方式(CAS+volatile),状态0代表可获取锁,状态1代表不可获取锁
3public class Mutex implements Lock {
4
5 // 继承并覆盖AQS
6 private static class Sync extends AbstractQueuedSynchronizer{
7
8 @Override
9 protected boolean tryAcquire(int arg) {
10 // CAS,期望值为0,更新值为1
11 if(compareAndSetState(0,1)){
12 // CAS更新成功,设置当前线程为独占
13 setExclusiveOwnerThread(Thread.currentThread());
14 //返回true表锁当前线程获取到了锁
15 return true;
16 }
17 // false未获取到锁
18 return false;
19 }
20
21 @Override
22 protected boolean tryRelease(int arg) {
23 // 非0装才允许释放锁,否则抛异常
24 if(getState() == 0) throw new IllegalMonitorStateException();
25 setExclusiveOwnerThread(null);
26 // 更新状态,0:锁为可获取状态
27 setState(0);
28 // 锁释放成功
29 return true;
30 }
31
32 // 状态为1说明有个线程正在持有锁
33 @Override
34 protected boolean isHeldExclusively() {
35 return getState()==1;
36 }
37
38 // 返回条件对象,用与在同步块内线程通信(wait/signal)
39 Condition newCondition(){
40 return new ConditionObject();
41 }
42 }
43
44 private final Sync sync = new Sync();
45
46 // 所有加锁传入的状态都是1,然后调用Sync的方法
47 @Override
48 public void lock() {
49 sync.acquire(1);
50 }
51
52 @Override
53 public void lockInterruptibly() throws InterruptedException {
54 sync.acquireInterruptibly(1);
55 }
56
57 @Override
58 public boolean tryLock() {
59 return sync.tryAcquire(1);
60 }
61
62 @Override
63 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
64 return sync.tryAcquireNanos(1,unit.toNanos(time));
65 }
66
67 @Override
68 public void unlock() {
69 sync.release(1);
70 }
71
72 @Override
73 public Condition newCondition() {
74 return sync.newCondition();
75 }
76}
测试
1public class TestMutex {
2 Mutex mutex = new Mutex();
3
4 void m1(){
5 try {
6 mutex.lock();
7 System.out.println("m1......");
8 Thread.sleep(2000);
9 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }finally {
12 mutex.unlock();
13 }
14 }
15
16 void m2(){
17 try {
18 mutex.lock();
19 System.out.println("m2......");
20 Thread.sleep(1000);
21 }catch (InterruptedException e){
22 e.printStackTrace();
23 }finally {
24 mutex.unlock();
25 }
26
27 }
28
29 public static void main(String[] args) {
30 TestMutex o = new TestMutex();
31
32 // 执行m1
33 new Thread(o::m1,"t1").start();
34 new Thread(o::m1,"t2").start();
35
36 // 执行m2
37 new Thread(o::m2,"t3").start();
38
39 }
40}
ReentrantLock、ReentrantReadWriteLock
ReentrantLock 是可重入锁,实现了公平锁和非公平锁(默认),除了能提供 synchronized 相同的功能,还提供了尝试加锁,超时加锁,加锁中断等特性。
ReentrantReadWriteLock 是对 ReentrantLock 的再次改进,因为并不所由操作都需要互斥访问的,例如在无写的情况下多线程进行读,这不会使数据变脏,而在写到来时才需要同步,ReentrantReadWriteLock 应用而生,读写锁的特点是读是共享的,写是独占的,且读操作都在一个写操作完成后进行,因此在多读的场景下能提高同步性能。
ReentranLock 的使用
独占加锁,多个线程公用一个 ReentrantLock 对象,对同步代码加锁释放锁。
注意:所有的加锁都要手动释放!
使用模板:
1Reentrant lock = new ReentrantLock();
2...
3
4lock.lock()
5try{
6 ...
7 同步代码
8 ...
9}finally{
10 lock.unlock();
11}
使用“尝试获取锁”,使用尝试获取锁得到返回值后仍要需要我们自行控制同步。
1...
2try{
3 while(!tryLock()){
4 //未获取到锁空转阻塞
5 }
6 同步代码...
7}finally{
8 lock.unlock();
9}
使用 Condition,Condition 可在同步代码中进一步进行同步操作,如释放锁(await)通知其他线程竞争锁(signal、signalAll)。
1Lock lock = new ReentrantLock();
2Conditiion con = lock.newCondition();
3 ...
4
5lock.lock()
6
7try{
8 // 阻塞当前线程,让出锁
9 con.await();
10 // 唤醒一个线程执行
11 con.signal();
12}finally{
13 lock.unlock();
14}
ReentranReadWriteLock 的使用
1// 互斥锁和读写锁测试
2public class TestReadWriteLock {
3 static Lock lock = new ReentrantLock();
4 private static int value;
5
6 static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
7 static Lock readLock = readWriteLock.readLock();
8 static Lock writeLock = readWriteLock.writeLock();
9
10 public static void read(Lock lock) {
11 try {
12 lock.lock();
13 Thread.sleep(1000);
14 System.out.println("read over! value = "+value);
15 //模拟读取操作
16 } catch (InterruptedException e) {
17 e.printStackTrace();
18 } finally {
19 lock.unlock();
20 }
21 }
22
23 public static void write(Lock lock, int v) {
24 try {
25 lock.lock();
26 Thread.sleep(1000);
27 value = v;
28 System.out.println("write over! value = " + value);
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 } finally {
32 lock.unlock();
33 }
34 }
35
36 public static void main(String[] args) {
37
38 /*// 使用互斥锁
39 Runnable readR = ()-> read(lock);
40 Runnable writeR = ()->write(lock, new Random().nextInt());*/
41
42 // 使用读写锁
43 Runnable readR = ()-> read(readLock);
44 Runnable writeR = ()->write(writeLock, new Random().nextInt());
45
46 for(int i=0; i<2; i++) new Thread(writeR).start();
47 for(int i=0; i<20; i++) new Thread(readR).start();
48 for(int i=0; i<2; i++) new Thread(writeR).start();
49 }
50}
51
52// output
53read over! value = 0
54read over! value = 0
55read over! value = 0
56read over! value = 0
57read over! value = 0
58write over! value = 295078983
59write over! value = 2106533192
60read over! value = 2106533192
61read over! value = 2106533192
62read over! value = 2106533192
63read over! value = 2106533192
64read over! value = 2106533192
65read over! value = 2106533192
66read over! value = 2106533192
67read over! value = 2106533192
68read over! value = 2106533192
69write over! value = -1287679733
70read over! value = -1287679733
71read over! value = -1287679733
72read over! value = -1287679733
73read over! value = -1287679733
74write over! value = -791313481
LockSupport 使用
LockSupport 可以用来阻塞当前线程(park)或者唤醒线程(unpark). 使用示例如下。
1public class TestLockSupport {
2 public static void main(String[] args) {
3 // 两个线程打印 aaabbbaaa
4
5 Thread t1 = new Thread(() -> {
6 for (int i = 0; i < 6; i++) {
7
8 System.out.printf("a");
9 if (i == 2) {
10 // 打印三个字母后阻塞
11 LockSupport.park();
12 }
13 }
14 });
15 t1.start();
16
17 new Thread(()->{
18 for (int i = 0; i < 3; i++) {
19 System.out.printf("b");
20 }
21 // 释放锁
22 LockSupport.unpark(t1);
23 }).start();
24 }
25}
并发容器
ConcurrentHashMap
为什么要使用 ConcurrentHashMap?
- 线程安全。在并发环境下如果使用 HashMap 可能导致程序死循环,HashMap 在进行 put 操作时会时会使 Entry 形成环,Entry 的 next 引用永不为空就会产生死循环获取 Entry。
- 效率更高。HashTable 对 put、get、remove 方法等方法都加了 synchronizaed 锁,意味着同一时间只能由一个线程访问 HashTable 实例的方法,效率很低,而 ConcurrentHashMap 使用分段锁的方式,不同段可由不同的线程访问,提高了并发度提高了访问效率。
队列容器
并发容器队列分为有界对列和无界队列,有无届指限制添加到队列的元素数量,无界就是插入没有限制。
非阻塞队列和阻塞队列,非阻塞队列对插入和获取进行同步,而阻塞队列除了在插入和获取进行同步外,在没有元素时或插入满时会阻塞当前线程。
阻塞队列不可用时的处理方式,对于不同的插入和移除方法有不同的处理。
处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time,unit) |
检查 | element() | peek() | 不可用 | 不可用 |
Java 中的阻塞队列
- ArrayBlockingQueue:基于数组的有界阻塞队列,默认非公平,支持公平方式
- LinkedBlockingQueue:链式无界阻塞队列
- PriorityBlockingQueue:具有优先级的无界阻塞队列, 使用元素的 compareTo 或专用的 Comparator 进行比较,不保证同级元素的公平性
- DelayQueue:使用优先级队列实现的无界阻塞队列,可以使用该队列设计缓存系统、或者进行任务调度,存储的元素需要实现 Delayed 接口
- SynchronousQueue:不存储元素的阻塞队列
- LinkedTransferQueue:链表结构组成的无界阻塞队列,其 transfer 方法可以使正在等待接受元素的 take/poll 方法理解使用其传入的元素
- LinkedBlockingDequeue:链表结构双向阻塞队列
非并发容器同步实现
早期的 Vector 和 HashTable 都是使用 sychronized 进行同步,Concurrent 包下的同步多是 CAS+volatile 实现,那么非并发的容器怎么实现同步?一种方式是使用 Collections 里的包装其,另外一种是自己实现。
1 // 方式1
2 List<String> list = new ArrayList<>();
3
4 List<String> syncList = Collections.synchronizedList(list);
其他类型容器的同步方式相同。
1public class Test {
2 static List<Integer> list = new LinkedList<>();
3
4 public static void main(String[] args) {
5 for (int i = 0; i < 1000; i++) {
6 list.add(i);
7 }
8
9 try {
10 TimeUnit.SECONDS.sleep(1);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14
15 // 方式2: 使用list自身为锁,此时list自身互斥访问性能差
16 // 方式3: 使用ReentrantLock或者声明一个Object为lock都可以实现同步访问
17 for (int i = 0; i < 100; i++) {
18 new Thread(()->{
19 synchronized (list){
20 for (int j = 0; j < 10; j++) {
21 Integer remove = list.remove(0);
22 System.out.println(remove);
23 }
24 }
25 }).start();
26 }
27 }
28}
并发容器总结
非并发容器在多线程环境下存在线程安全问题,会导致数据脏读、抛异常或者死锁,多线程下切记使用并发容器。
对于 Map/Set
线程安全的环境下(如单线程),常用 HashMap、TreeMap(带排序功能)、LinkedHashMap;在多线程环境下常用 ConcurrentHashMap、ConcurrentSkipListMap,少用或不用 HashTable、synchronizedMap,它们都是使用 synchronized 实现同步,两者本质区别不大。
对于队列/列表
单线程下,多读少删少插入用 ArrayList,多删多插入用 LinkedList。
并发环境下,尽快少用 Vector, synchronizedList,synchronized 实现同步效率低。多读少写可以使用 CopyOnWriteList,另外根据使用队列是否需要设置大小选择有界或无界队列,然后选择阻塞类型。
(完)