Smart's Blog

java通过zookeeper实现分布式锁

序言:
我们知道在多线程环境中,线程对于资源的竞争和共享十分激烈,开发者需要设计合理的锁机制来控制线程对于资源的访问。而在分布式集群系统中,资源的访问对象则很有可能是多个进程(多台机器),我们需要一种锁机制来实现对分布式的进程的资源访问控制,这种锁机制叫做分布式锁,而实现分布式锁,最常用的方法是借助zookeeper实现。假设我们的分布式应用部署在集群内的10台机器上,而这10台机器上的应用需要对同一个redis实例内的一个key进行读写操作,我们看看如何借助zookeeper的分布式锁实现资源读写的原子性。业务场景的流程图如下:



1、利用zookeeper创建分布式共享锁

       我们在zookeeper中先创建一个节点目录为:/zk_lock/etl_lock目录,应用程序在获取锁之前,都需要在这个目录下创建一个临时节点(节点类型为EPHEMERAL_SEQUENTIAL,临时自增序列节点,保证节点名称的唯一性),进行读操作时创建的节点的名称为:lock-r-自增序号,写操作时名称为lock-w-自增序号,如下图:

       这里面的每一个叶子节点代表的都是一个客户端进程的读或写请求操作,节点排序的先后代表的是请求操作的先后顺序,客户端程序获取到这个List列表,判断当前List与自身的节点信息是否满足一定的条件,如果满足就获得锁,如果不满足,则进入等待。

2、java客户端的分布式锁机制实现

       java客户端通过readLock()和writeLock()实现了读锁和写锁两种锁,调用这两个方法时,都先在zookeeper的/zk_lock/etl_lock目录下创建一个属于当前进程独有的子节点,之后进入while循环,判断是否符合获得锁的条件,但while循环内并不是做轮询操作,而是通过zookeeper的Watcher向zookeeper注册一个监听事件,当zookeeper的/zk_lock/etl_lock目录下的节点发生删除节点操作时,会回调Watcher类内的process方法,通知java客户端zookeeper锁目录下的节点列表发生变动,java客户端再去获取新的节点列表,并判断自己是否符合获取锁的条件。同时,客户端轮询机制的等待与唤醒操作通过CountDownLatch类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 读锁
* @return
*/
public String readLock(){
String path;
List<String> nodeList;
String firstNodeName;
String nodePath = "/zk_lock/etl_lock";
String firstNodePath;
int nodeId=-1;
path = createNode(zkReadLock,"",ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
while(true) {
/**
* 获取读锁的两个条件:
* 1、自己处于nodeList的第一个位置
* 或
* 2、nodeList中排在自己之后的节点都是读操作
*/
nodeList = getChildren(nodePath);
firstNodeName = nodeList.get(0);
firstNodePath = nodePath+"/"+firstNodeName;
for(int i=0;i<nodeList.size();i++){
nodePath = nodePath+nodeList.get(i);
if(nodePath.equals(path)){
nodeId = i;//获取自己在列表中的位置
}
}
if (firstNodePath.equals(path)) {
lockSemaphore = new CountDownLatch(1);
return path;
} else {
for(int i=0;i<nodeId;i++){
//如果比自己序号小的子节点有写请求,则等待锁
if(nodeList.get(i).contains("w")) {
try {
lockSemaphore.await();
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
lockSemaphore = new CountDownLatch(1);
return path;
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 写锁
* @return
*/
public String writeLock(){
String path;
List<String> nodeList;
String nodePath = "/zk_lock/etl_lock";
String firstNodePath;
String firstNodeName;
int nodeId = -1;
path = createNode(zkWriteLock,"",ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
while(true) {
/**
* 获取写锁的条件:
* 1、自己处于nodeList的第一个位置
* 2、nodeList中排在自己之前的节点没有写操作
*/
nodeList = getChildren(nodePath);
firstNodeName = nodeList.get(0);
firstNodePath = nodePath+"/"+firstNodeName;
if (firstNodePath.equals(path)) {
lockSemaphore = new CountDownLatch(1);
return path;
}
else {
for (int i = 0; i < nodeList.size(); i++) {
nodePath = nodePath + nodeList.get(i);
if (nodePath.equals(path)) {
nodeId = i;//获取自己在列表中的位置
}
}
for (int i = 0; i < nodeId; i++) {
//如果比自己序号小的子节点有写请求,则等待锁
if (nodeList.get(i).contains("w")) {
try {
lockSemaphore.await();
continue;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
lockSemaphore = new CountDownLatch(1);
return path;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* zookeeper事件监听器
*/
public class ZkWatcher implements Watcher{
@Override
public void process(WatchedEvent watchedEvent) {
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
logs = new LogInfoGroup();
logs.info("建立zookeeper连接成功!");
logger.logInfos(logs);
connectedSemaphore.countDown();
}
if(watchedEvent.getType() == Event.EventType.NodeDeleted){
lockSemaphore.countDown();
}
}
}

我们通过一个简单的多线程测试任务来分布式多进程环境下的读写操作情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ZkTest {
private static ZkUtil zkUtil = new ZkUtil();
static int num =0 ;
public static void main(String args[]) throws IOException, KeeperException, InterruptedException {
zkUtil.connector(Config.getInstance().getProperty("ZK_URL"),30000);
for(int i=0;i<5;i++){
Thread thread1 = new Thread(new WriteThread());
Thread thread2 = new Thread(new ReadThread());
thread1.start();
thread2.start();
}
}
public static class ReadThread implements Runnable{
@Override
public void run() {
for(int i=0;i<5;i++) {
String path = zkUtil.readLock();
System.out.println(Thread.currentThread().getName() + ":读操作,num:" + num);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
zkUtil.deleteNode(path);
}
}
}
public static class WriteThread implements Runnable{
@Override
public void run() {
for(int i=0;i<5;i++) {
String path = zkUtil.writeLock();
num++;
System.out.println(Thread.currentThread().getName() + ":写操作---->num:" + num);
zkUtil.deleteNode(path);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}




可以看见,通过对读写操作的加锁操作,多个线程对同一变量(共享资源)的读写操作是原子性的。