早些时候找了下java中锁分类, 今天我们聊下java中的锁
场景
在计算机科学中,锁是在执行多线程时用于强行限制资源访问的同步机制,即用于在并发控制中保证对互斥要求的满足。—> 来自wikipedia
sychronized vs Lock
类型 | sychronized | Lock |
---|---|---|
解决的问题 | 对访问资源进行限制 | 同sychronized |
实现 | 内置关键字 | 基于AQS,实现接口 |
锁的获取与释放 可见性 | 隐式(手动加锁,隐式释放) | 显式(需要手动获取与释放) |
是否支持中断 | 不支持 | 支持 |
demo
实现一个独占式锁
1 | public class Mutex implements Lock { |
原理
其实就是基于AQS 内置的一部分参数,然后实现tryAcquire,acquire等接口。 具体的可以看下这篇关于AQS的总结. 理解AQS原理其实就基本把JUC中的锁摸的差不多了。
接下来我们主要的聊下公平锁,非公平锁,重入锁,重入读写锁。
公平锁 vs 非公平锁
这两个是两种实现方式,一般锁内置的实现都是会有实现这两种。比如重入锁,重入读写锁内部都有公平与非公平方式的实现
“公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。”
重入锁
顾名思义就是一个线程在获取到锁之后,再次获取锁。
sychronized 也是可重入锁。
重入锁其实是独占式状态共享
demo
- 打印当前等待的线程
- 主要接口就是lock() && unlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72public class ReenTrantLockTest {
static ReentrantLock reentrantLock = new ReentrantLock2(true);
static ReentrantLock reentrantLockUnfair = new ReentrantLock2(false);
@Test
public void testReenTrantLockUnFairAndFair() throws InterruptedException {
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
threads[i] = new Thread(new Job(reentrantLock));
threads[i].start();
}
for (int i = 0; i < 5; i++) {
threads[i].join();
}
}
private static class Job extends Thread {
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
public void run() {
for (int i = 0; i < 2; i++) {
lock.lock();
try {
System.out.print("获取锁的当前线程[" + Thread.currentThread().getName() + "]");
System.out.print("等待队列中的线程[");
((ReentrantLock2) lock).getQueueThreads();
System.out.println("]");
reenDoSth();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
private void reenDoSth() {
lock.lock();
try {
System.out.println("reenDoSth[" + Thread.currentThread().getName() + "]");
} catch (Exception e) {
} finally {
lock.unlock();
}
}
}
private static class ReentrantLock2 extends ReentrantLock {
public ReentrantLock2(boolean fair) {
super(fair);
}
public void getQueueThreads() {
ArrayList<Thread> threads = new ArrayList<>(super.getQueuedThreads());
threads.forEach(thread -> System.out.print(thread.getName()));
}
}
}
源码分析
- 重入锁是一个典型的独占式锁
非公平锁
####### lock
######## NonfairSync.lock
1 | /** |
######## Sync.nonfairTryAcquire
1 | /** |
####### lock
######## NonfairSync.unlock
- 实际调用的是Sync.release
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public final boolean release(int arg) {
// 释放锁。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 这里c == 0 才表示该线程所有获取到锁的都已经释放掉了
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
公平锁
####### lock
######## FairSync.lock
- 公平锁的获取稍微有点不一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 比非公平锁多了一个一个 !hasQueuedPredecessors() 其他都一样
// 就是判断是否有前置节点,就是公平锁的本质,一定是按照顺序来的
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
####### unlock
公平锁的释放与非公平锁的释放共用的一套代码,这里就不赘述了
重入锁总结
- 重入锁其实主要是依赖来AQS的内部实现。
- 公平锁的释放与非公平锁的释放是一样的
- 公平锁的获取与非公平锁的获取唯一的不同在与公平锁在判断是否可以获取到锁的时候,需要判断是否有前置节点
读写锁
demo
- 读写操作夹杂,然后看当前等待的读写线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113public class ReenTrantReadWriteLockTest {
static Map<String, Integer> cache = new HashMap<>();
static ReentrantReadWriteLock reentrantReadWriteLockUnFair = new ReentrantReadWriteLock2(false);
static ReentrantReadWriteLock reentrantReadWriteLockFair = new ReentrantReadWriteLock2(true);
@Test
public void testWRLock() throws InterruptedException {
int index = 0;
int length = 14;
Thread[] threads = new Thread[length];
for (int i = 0; i < 10; i++) {
threads[index++] = new ReadJob(reentrantReadWriteLockFair);
if (i % 3 == 0) {
threads[index++] = new WriteJob(reentrantReadWriteLockFair);
}
}
for (int i = 0; i < length; i++) {
threads[i].start();
}
// reentrantReadWriteLockFair.getQueueLength()
for (int i = 0; i < length; i++) {
threads[i].join();
}
}
static class WriteJob extends Thread {
ReadWriteLock readWriteLock;
public WriteJob(ReadWriteLock readWriteLock) {
this.readWriteLock = readWriteLock;
}
public void run() {
readWriteLock.writeLock().lock();
try {
Thread.sleep(1000);
String s = Thread.currentThread().getId() + "-" + cache.get(Thread.currentThread().getName());
cache.put(Thread.currentThread().getName(), ((ReentrantReadWriteLock2) readWriteLock).getQueueLength());
System.out.println("write[" + Thread.currentThread().getName() + "] " + s);
System.out.println("[" + Thread.currentThread().getName() + "] read " + s);
System.out.print("等待队列中的线程[");
((ReentrantReadWriteLock2) readWriteLock).getWaitQueue();
System.out.println("]");
read();
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
private void read() {
readWriteLock.readLock().lock();
try {
Integer s = cache.get(Thread.currentThread().getName());
System.out.println("[" + Thread.currentThread().getName() + "] read " + ((ReentrantReadWriteLock2) readWriteLock).getQueueLength());
// System.out.print("等待队列中的线程[");
// ((ReentrantReadWriteLock2) readWriteLock).getWaitQueue();
// System.out.println("]");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
static class ReadJob extends WriteJob {
public ReadJob(ReadWriteLock readWriteLock) {
super(readWriteLock);
}
public void run() {
super.read();
}
}
private static class ReentrantReadWriteLock2 extends ReentrantReadWriteLock {
public ReentrantReadWriteLock2(boolean fair) {
super(fair);
}
public void getWaitQueue() {
// 打印等待写的线程
super.getQueuedWriterThreads().forEach(thread -> System.out.print(thread.getName()));
System.out.println();
// 打印等待读的线程
super.getQueuedReaderThreads().forEach(thread -> System.out.print(thread.getName()));
}
}
读写锁
重写读写锁 是共享式状态共享(读) + 独占式状态共享(写)
- 举例A(R)->B(w)->C(R)->D(R)->E(W)->F(R)
- 下面的demo 是读写夹杂,打印等待读写的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113public class ReenTrantReadWriteLockTest {
static Map<String, Integer> cache = new HashMap<>();
static ReentrantReadWriteLock reentrantReadWriteLockUnFair = new ReentrantReadWriteLock2(false);
static ReentrantReadWriteLock reentrantReadWriteLockFair = new ReentrantReadWriteLock2(true);
@Test
public void testWRLock() throws InterruptedException {
int index = 0;
int length = 14;
Thread[] threads = new Thread[length];
for (int i = 0; i < 10; i++) {
threads[index++] = new ReadJob(reentrantReadWriteLockFair);
if (i % 3 == 0) {
threads[index++] = new WriteJob(reentrantReadWriteLockFair);
}
}
for (int i = 0; i < length; i++) {
threads[i].start();
}
// reentrantReadWriteLockFair.getQueueLength()
for (int i = 0; i < length; i++) {
threads[i].join();
}
}
static class WriteJob extends Thread {
ReadWriteLock readWriteLock;
public WriteJob(ReadWriteLock readWriteLock) {
this.readWriteLock = readWriteLock;
}
public void run() {
readWriteLock.writeLock().lock();
try {
Thread.sleep(1000);
String s = Thread.currentThread().getId() + "-" + cache.get(Thread.currentThread().getName());
cache.put(Thread.currentThread().getName(), ((ReentrantReadWriteLock2) readWriteLock).getQueueLength());
System.out.println("write[" + Thread.currentThread().getName() + "] " + s);
System.out.println("[" + Thread.currentThread().getName() + "] read " + s);
System.out.print("等待队列中的线程[");
((ReentrantReadWriteLock2) readWriteLock).getWaitQueue();
System.out.println("]");
read();
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.writeLock().unlock();
}
}
private void read() {
readWriteLock.readLock().lock();
try {
Integer s = cache.get(Thread.currentThread().getName());
System.out.println("[" + Thread.currentThread().getName() + "] read " + ((ReentrantReadWriteLock2) readWriteLock).getQueueLength());
// System.out.print("等待队列中的线程[");
// ((ReentrantReadWriteLock2) readWriteLock).getWaitQueue();
// System.out.println("]");
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
static class ReadJob extends WriteJob {
public ReadJob(ReadWriteLock readWriteLock) {
super(readWriteLock);
}
public void run() {
super.read();
}
}
private static class ReentrantReadWriteLock2 extends ReentrantReadWriteLock {
public ReentrantReadWriteLock2(boolean fair) {
super(fair);
}
public void getWaitQueue() {
super.getQueuedWriterThreads().forEach(thread -> System.out.print(thread.getName()));
System.out.println();
super.getQueuedReaderThreads().forEach(thread -> System.out.print(thread.getName()));
}
}
}
原理剖析
详细的剖析可以看这篇文章
我这边主要聊几个关键点
读写锁是可重入锁
状态的设计
- 一共32位。 高16位为读锁的状态 ,底16位为写锁的状态。
- 所以判断读写锁,根据位运算计算即可。
写锁
- 排它锁
- 获取的时候需要判断是否有读锁在获取,如果有,则获取失败.(如果可以获取成功,就是锁升级,ReentrantWriteReadLock 不支持)
读锁
- 共享锁
- 获取的时候,如果存在写锁,并且是当前线程,则是可以获取到锁的(锁降级) 。
HoldCounter
- 共享锁从某一角度来讲,就是一个计数器。这个计数器是于当前线程绑定的。 而HoldCounter可以理解为是这个计数器(原理是ThreadLocalCache)
参考大佬
- ReentrantReadWriteLock读写锁详解
- 《java并发编程的艺术》方腾飞 魏鹏 程晓明 著