Zookeeper 分布式锁

最优方案
方案:子节点监听方式
实现分布式锁
实现思路:监听子节点状态
1. 在父节点 (持久化) 下创建临时节点,实际创建的节点路径会根据数量进行自增(ZK 自编号方式创建节点)。

2. 创建节点成功后,首先获取父节点下的子节点列表,判断本线程的路径后缀编号是否是所有子节点中最小的,若是则获取锁,
反之监听本节点前一个节点 (路径排序为本节点路径数字减一的节点) 变动状态 (通过 getData() 方法注册 watcher)
3. 当监听对象状态变动 (节点删除状态) 后 watcher 会接收到通知,这时再次判断父节点下的子节点的排序状态,
若满足本线程的路径后缀编号最小则获取锁,反之继续注册 watcher 监听前一个节点状态


import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;




public class ZkLock {
    private Logger logger = LoggerFactory.getLogger(ZkLock.class);
    private static final String CHILD_NODE_PATH = "temp";
    private String baseLockPath;
    private String finalLockId;

    public static final String TIMER_REGISTER_PATH = "/timer";
    
    // 是否需要中断阻塞标志位
    private boolean needInterrupt = false;
    // ZK 是否连接成功标志位
    private boolean connected = false;
    //是否获取到锁标志位
    private boolean acquireLock = false;

    private String host = "192.168.11.111";
    private String port = "2181";
    private String hostPort = Configuration.getString("system.zookeeper.host.port");
    private ZooKeeper zooKeeper;
    private PreviousNodeWatcher previousNodeWatcher;

    private ZkLock(String host, String port, String lock) {
        this.host = host;
        this.port = port;
        this.hostPort= host + ":" + port;
        this.baseLockPath = TIMER_REGISTER_PATH + "/" + lock;
        this.previousNodeWatcher = new PreviousNodeWatcher(this);
    }

    private ZkLock(String hostPort, String lock) {
        this.hostPort = hostPort;
        this.baseLockPath = TIMER_REGISTER_PATH + "/" + lock;
        this.previousNodeWatcher = new PreviousNodeWatcher(this);
    }

    private ZkLock(String lock) {
        this.hostPort = Configuration.getString("system.zookeeper.host.port");
        this.baseLockPath = TIMER_REGISTER_PATH + "/" + lock;
        this.previousNodeWatcher = new PreviousNodeWatcher(this);
    }


    /**
     * 新建锁(连接ZK阻塞)
     *
     * @param host zk 服务ip
     * @param port zk 服务端口
     * @param lock 锁名称
     * @return
     */
    public static ZkLock create(String host, String port, String lock) {
        ZkLock ZkLock = new ZkLock(host, port, lock);
        ZkLock.connectZooKeeper();
        return ZkLock;
    }

    public static ZkLock create(String hostPort,  String lock) {
        ZkLock ZkLock = new ZkLock(hostPort,  lock);
        ZkLock.connectZooKeeper();
        return ZkLock;
    }

    /**
     * 获取锁(阻塞)
     *
     * @return true代表获取到分布式任务锁
     */
    public boolean getLock() {
        if (!connected) {
            return false;
        }
        while (!needInterrupt) {
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                logger.warn(e.getMessage(), e);
            }
            if (acquireLock) {
                return true;
            }
        }
        return false;
    }

    /**
     * 释放锁
     *
     * @return true代表释放锁成功, 并切断ZK连接
     */
    public boolean releaseLock() {
        try {
            if (zooKeeper != null && connected) {
                zooKeeper.delete(finalLockId, -1);
                logger.info(String.format("ZK [%s] delete success.",finalLockId));
            }
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
        }
        return disconnectZooKeeper();
    }

    private boolean disconnectZooKeeper() {
        if (zooKeeper == null && !connected) {
            return false;
        }
        try {
            connected = false;
            acquireLock = false;
            if (zooKeeper != null) {
                zooKeeper.close();
            }
            logger.info(String.format("ZK disconnect success."));
        } catch (Exception e) {
            logger.warn(String.format("ZK disconnect failed. [%s]", e.getMessage()), e);
        }
        return true;
    }

    private boolean connectZooKeeper() {
        try {
            //连接ZK
            zooKeeper = new ZooKeeper(this.hostPort, 60000, event -> {
                if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
                    needInterrupt = true;
                } else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
                    needInterrupt = true;
                } else if (event.getState() == Watcher.Event.KeeperState.Expired) {
                    needInterrupt = true;
                } else {
                    if (event.getType() == Watcher.Event.EventType.None) {
                        //连接成功
                        connected = true;
                    }
                }
            });

            //等待异步连接成功,超过时间3s则退出等待,防止线程锁死
            int i = 1;
            while (!connected) {
                if (i == 30) {
                    break;
                }
                Thread.sleep(100);
                i++;
            }

            if (connected) {
                //创建父节点
                if (zooKeeper.exists(baseLockPath, false) == null) {
                    zooKeeper.create(baseLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }

                //创建子节点
                finalLockId = zooKeeper.create(baseLockPath + "/" + CHILD_NODE_PATH, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL);
                logger.info(" Connect zookeeper success. create child node " + baseLockPath + "/" + CHILD_NODE_PATH);
                //检查一次是否获取到锁
                checkAcquire();
            } else {
                needInterrupt = true;
                logger.warn(" Connect zookeeper failed. Time consumes 3 s");
                return false;
            }
        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
            return false;
        }
        return true;
    }

    private void checkAcquire() {
        if (!connected) {
            return;
        }
        try {
            //获取子节点列表,若没有获取到锁,注册监听,监听对象应当是比本节点路径编号小一(或者排在前面一位)的节点
            List<String> childrenList = zooKeeper.getChildren(baseLockPath, false);

            if (judgePathNumMin(childrenList)) {
                //获取到锁
                acquireLock = true;
            } else {
                watchPreviousNode(childrenList);
            }

        } catch (Exception e) {
            logger.warn(e.getMessage(), e);
            disconnectZooKeeper();
        }
    }

    private boolean judgePathNumMin(List<String> paths) {
        if (paths.isEmpty()) {
            return true;
        }
        if (paths.size() >= 2) {
            //对无序状态的子节点路径列表按照编号升序排序
            paths.sort((str1, str2) -> {
                int num1;
                int num2;
                String string1 = str1.substring(CHILD_NODE_PATH.length(), str1.length());
                String string2 = str2.substring(CHILD_NODE_PATH.length(), str2.length());
                num1 = Integer.parseInt(string1);
                num2 = Integer.parseInt(string2);
                if (num1 > num2) {
                    return 1;
                } else if (num1 < num2) {
                    return -1;
                } else {
                    return 0;
                }
            });
        }

        //判断本线程的节点路径是否是最小编号
        String minId = paths.get(0);
        return finalLockId.equals(baseLockPath + "/" + minId);
    }

    private void watchPreviousNode(List<String> paths) {
        if (paths.isEmpty() || paths.size() == 1) {
            needInterrupt = true;
            return;
        }
        int currentNodeIndex = paths.indexOf(finalLockId.substring((baseLockPath + "/").length(), finalLockId.length()));
        String previousNodePath = baseLockPath + "/" + paths.get(currentNodeIndex - 1);
        //通过getData方法注册watcher
        try {
            zooKeeper.getData(previousNodePath, previousNodeWatcher, new Stat());
        } catch (Exception e) {
            //watcher注册失败,退出锁竞争
            logger.warn(String.format("Previous node watcher register failed! message: [%s]", e.getMessage()), e);
            needInterrupt = true;
        }
    }

    private class PreviousNodeWatcher implements Watcher {
        private ZkLock context;

        PreviousNodeWatcher(ZkLock context) {
            this.context = context;
        }

        @Override
        public void process(WatchedEvent event) {
            if (event.getState() == Watcher.Event.KeeperState.AuthFailed) {
                context.needInterrupt = true;
            } else if (event.getState() == Watcher.Event.KeeperState.Disconnected) {
                context.needInterrupt = true;
            } else if (event.getState() == Watcher.Event.KeeperState.Expired) {
                context.needInterrupt = true;
            } else {
                //节点被删除了,说明这个节点释放了锁
                if (event.getType() == Event.EventType.NodeDeleted) {
                    context.checkAcquire();
                }
            }
        }
    }
}
  
    展开阅读全文