七周七并发模型笔记
1. 交替锁
书中内容:
class ConcurrentSortedList {
private class Node {
int value;
Node prev;
Node next;
ReentrantLock lock = new ReentrantLock();
Node() {}
Node(int value, Node prev, Node next) {
this.value = value; this.prev = prev; this.next = next;
}
}
private final Node head;
private final Node tail;
public ConcurrentSortedList() {
head = new Node(); tail = new Node();
head.next = tail; tail.prev = head;
}
public void insert(int value) {
Node current = head;
current.lock.lock();
Node next = current.next;
try {
while (true) {
next.lock.lock();
try {
if (next == tail || next.value < value) {
Node node = new Node(value, current, next);
next.prev = node;
current.next = node;
return;
}
} finally { current.lock.unlock(); }
current = next;
next = current.next;
}
} finally { next.lock.unlock(); }
}
}
insert()方法保证链表是有序的:遍历链表直到找到第一个值小于新插入值的节点位置,在这个位置前插入新节点。
第26行锁住了链表的头节点,第30行锁住了下一个节点。接下来检测两个节点之间是否是待插入位置。如果不是,则在第38行解锁当前节点并继续遍历。如果找到待插入位置,第33~36行构造新节点并将其插入链表后返回。两把锁的解锁操作在两个finally块中进行(第38行和第42行)。
这种方案不仅可以让多个线程并发地进行链表插入操作,还能让其他的链表操作安全地并发。比如计算链表节点个数,只需倒序遍历链表即可:
public int size() {
Node current = tail;
int count = 0;
while (current.prev != head) {
ReentrantLock lock = current.lock;
lock.lock();
try {
++count;
current = current.prev;
} finally { lock.unlock(); }
}
return count;
}
最开始读到这里的时候就想, 这种不把整个列表锁住的size() 方法, 其实计算出来的列表个数
这个状态是很奇怪的; 因为, 你说这个个数是这个链表在什么时间点
下的个数?
计算结束时刻? 100个元素的链表, 照上面的方法, 计算到第1个的时候, 是可以在第98和99个之间插入元素的. 那么计算结束得到的个数是100个, 但是结束的那一刹那, 列表实际的元素个数是101个; 计算开始时刻? 跟结束时刻同理, 计算开始时, 实际个数是100个, 中途在”遍历指针”之前插入元素, 计算得到的结果就会比 开始时刻多.
我们姑且不去计较这个”个数”是什么时间点的, 只要得到就行; 那么一下子的反应就是, 既然不用计较时间点, 那么我还需要加锁干嘛呢? 直接一个个遍历就是了;
仔细想了下, 这里有个容易出错的地方, 比如一个A-B的链表中插入C, 这个时候计算size, 假如插入的赋值顺序写成了:
B.prev = C //(1)
C.prev = A
那这样在并发size() 的时候, 插入操作刚执行完1, 就会导致 C.prev 是空的情况, size()要么异常退出, 要么计算不正确. 所以正确的方式应该是:
C.prev = A // (1)
B.prev = C // (2)
但是问题就恰恰在这里, 这也是很容易忽略的地方; 这个仅仅是”正确的执行顺序”, 而不是正确的写法; 这么写并不会得到这么执行这个结果; 这两个语句的执行顺序不是”一定”的; 也就是指令乱序或者编译器优化导致乱序的情况
所以, 这个插入操作必须作为一个”互斥块”, 就算是相对于”只读” 的size() 操作