zookeeper分布式锁

xiaoxiao2021-02-28  41

       以前都是用redis分布式锁,zookeeper也是实现分布式锁但是不推荐使用,不过可以拿来学习一下。

在网上找了一个版本感觉写的很不错但是测试却没通过,特地做了修改并测试通过,拿来给大家参考一下。

package com.tgh.zk; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import java.util.List; import java.util.Random; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @author : tanggh * @description :〈描述〉 * @date : 2018/6/28 */ public class ZKDistributedLock { private String lockId; private static final String LOCK_ROOT = "/MYLOCKS"; //-------------------------------------------------------------- // data为存储的节点数据内容 // 由于锁机制用的是序列功能的特性,data的值不重要,只要利于网络传输即可 //-------------------------------------------------------------- private final static byte[] data = {0x12, 0x34}; private final CountDownLatch latch = new CountDownLatch(1); private ZooKeeper zk; private int sessionTimeout; public ZKDistributedLock(ZooKeeper zk,int sessionTimeout) { this.zk = zk; this.sessionTimeout = sessionTimeout; } class LockWatcher implements Watcher { @Override public void process(WatchedEvent event) { //-------------------------------------------------------------- // 监控节点变化(本程序为序列的上一节点) // 若为节点删除,证明序列的上一节点已删除,此时释放阀门让当前的lock获得锁 //-------------------------------------------------------------- if (event.getType() == Event.EventType.NodeDeleted){ latch.countDown(); // System.out.println("event lock id has been deleted:"+lockId); } } } /** * @return * @throws KeeperException * @throws InterruptedException */ public boolean lock() throws KeeperException, InterruptedException{ //-------------------------------------------------------------- // 保证锁根节点存在,若不存在则创建它 //-------------------------------------------------------------- createLockRootIfNotExists(); try { lockId = zk.create(LOCK_ROOT + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("thread " + Thread.currentThread().getName() + " create the lock node: " + lockId + ", trying to get lock now"); while (true){ //-------------------------------------------------------------- // 获得锁根节点下的各锁子节点,并排序 //-------------------------------------------------------------- List<String> nodes = zk.getChildren(LOCK_ROOT, true); SortedSet<String> sortedNode = new TreeSet<String>(); for (String node : nodes) { sortedNode.add(LOCK_ROOT + "/" + node); } String first = sortedNode.first(); SortedSet<String> lessThanMe = sortedNode.headSet(lockId); //-------------------------------------------------------------- // 检查是否有比当前锁节点lockId更小的节点,若有则监控当前节点的前一节点 //-------------------------------------------------------------- if (lockId.equals(first)) { System.out.println("thread " + Thread.currentThread().getName() + " has get the lock, lockId is " + lockId); return true; } else if (!lessThanMe.isEmpty()) { String prevLockId = lessThanMe.last(); System.out.println("lockId: " + lockId + " wait prevLockId:" + prevLockId); Stat stat = zk.exists(prevLockId, new LockWatcher()); if(stat != null){ //-------------------------------------------------------------- // 阀门等待sessionTimeout的时间 // 当等待sessionTimeout的时间过后,上一个lock的Zookeeper连接会过期,删除所有临时节点,触发监听器 //-------------------------------------------------------------- latch.await(sessionTimeout, TimeUnit.MILLISECONDS); }else{ System.out.println("monitor exist failed:"+"lockId: " + lockId + " wait prevLockId:" + prevLockId); } } } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return false; } public synchronized boolean unlock() throws InterruptedException { //-------------------------------------------------------------- // 删除lockId节点以释放锁 //-------------------------------------------------------------- try { System.out.println("thread " + Thread.currentThread().getName() + " unlock the lock: " + lockId + ", the node: " + lockId + " had been deleted"); zk.delete(lockId, -1); return true; } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } finally { // zk.close(); } return false; } /** * 保证锁根节点存在,若不存在则创建它 */ public void createLockRootIfNotExists() { try { Stat stat = zk.exists(LOCK_ROOT, false); if (stat == null) { zk.create(LOCK_ROOT, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { final ZKOriApiTest test = new ZKOriApiTest(); final CountDownLatch latch = new CountDownLatch(10); final Random random = new Random(); random.setSeed(200); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { ZKDistributedLock lock = null; try { lock = new ZKDistributedLock(test.getZooKeeper(), test.getTimeout()); latch.countDown(); // latch.await(); lock.lock(); // Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); }finally { try { lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } Thread.sleep(100000); test.close(); } } 测试时大家可以把latch.await()打开,看看不同情况下测试结果。 Stat stat = zk.exists(prevLockId, new LockWatcher()); if(stat != null){ //-------------------------------------------------------------- // 阀门等待sessionTimeout的时间 // 当等待sessionTimeout的时间过后,上一个lock的Zookeeper连接会过期,删除所有临时节点,触发监听器 //-------------------------------------------------------------- latch.await(sessionTimeout, TimeUnit.MILLISECONDS); }else{ System.out.println("monitor exist failed:"+"lockId: " + lockId + " wait prevLockId:" + prevLockId); } 这个地方一定要判断state是否为空,多线程竞争可能造成一个结点被删除了,它的下个结点才试着去监听它是否存在。另外else里边的内容仅供判断,可以不需要。

测试结果:

thread Thread-0 create the lock node: /MYLOCKS/0000000272, trying to get lock nowthread Thread-5 create the lock node: /MYLOCKS/0000000273, trying to get lock nowthread Thread-9 create the lock node: /MYLOCKS/0000000274, trying to get lock nowthread Thread-6 create the lock node: /MYLOCKS/0000000275, trying to get lock nowthread Thread-3 create the lock node: /MYLOCKS/0000000276, trying to get lock nowthread Thread-1 create the lock node: /MYLOCKS/0000000277, trying to get lock nowthread Thread-7 create the lock node: /MYLOCKS/0000000278, trying to get lock nowthread Thread-4 create the lock node: /MYLOCKS/0000000279, trying to get lock nowthread Thread-2 create the lock node: /MYLOCKS/0000000280, trying to get lock nowthread Thread-8 create the lock node: /MYLOCKS/0000000281, trying to get lock nowthread Thread-0 has get the lock, lockId is /MYLOCKS/0000000272thread Thread-0 unlock the lock: /MYLOCKS/0000000272, the node: /MYLOCKS/0000000272 had been deletedlockId: /MYLOCKS/0000000273 wait  prevLockId:/MYLOCKS/0000000272lockId: /MYLOCKS/0000000274 wait  prevLockId:/MYLOCKS/0000000273lockId: /MYLOCKS/0000000275 wait  prevLockId:/MYLOCKS/0000000274lockId: /MYLOCKS/0000000276 wait  prevLockId:/MYLOCKS/0000000275lockId: /MYLOCKS/0000000277 wait  prevLockId:/MYLOCKS/0000000276lockId: /MYLOCKS/0000000278 wait  prevLockId:/MYLOCKS/0000000277lockId: /MYLOCKS/0000000279 wait  prevLockId:/MYLOCKS/0000000278lockId: /MYLOCKS/0000000280 wait  prevLockId:/MYLOCKS/0000000279lockId: /MYLOCKS/0000000281 wait  prevLockId:/MYLOCKS/0000000280monitor exist failed:lockId: /MYLOCKS/0000000273 wait  prevLockId:/MYLOCKS/0000000272thread Thread-5 has get the lock, lockId is /MYLOCKS/0000000273thread Thread-5 unlock the lock: /MYLOCKS/0000000273, the node: /MYLOCKS/0000000273 had been deletedthread Thread-9 has get the lock, lockId is /MYLOCKS/0000000274thread Thread-9 unlock the lock: /MYLOCKS/0000000274, the node: /MYLOCKS/0000000274 had been deletedthread Thread-6 has get the lock, lockId is /MYLOCKS/0000000275thread Thread-6 unlock the lock: /MYLOCKS/0000000275, the node: /MYLOCKS/0000000275 had been deletedthread Thread-3 has get the lock, lockId is /MYLOCKS/0000000276thread Thread-3 unlock the lock: /MYLOCKS/0000000276, the node: /MYLOCKS/0000000276 had been deletedthread Thread-1 has get the lock, lockId is /MYLOCKS/0000000277thread Thread-1 unlock the lock: /MYLOCKS/0000000277, the node: /MYLOCKS/0000000277 had been deletedthread Thread-7 has get the lock, lockId is /MYLOCKS/0000000278thread Thread-7 unlock the lock: /MYLOCKS/0000000278, the node: /MYLOCKS/0000000278 had been deletedthread Thread-4 has get the lock, lockId is /MYLOCKS/0000000279thread Thread-4 unlock the lock: /MYLOCKS/0000000279, the node: /MYLOCKS/0000000279 had been deletedthread Thread-2 has get the lock, lockId is /MYLOCKS/0000000280thread Thread-2 unlock the lock: /MYLOCKS/0000000280, the node: /MYLOCKS/0000000280 had been deletedthread Thread-8 has get the lock, lockId is /MYLOCKS/0000000281

thread Thread-8 unlock the lock: /MYLOCKS/0000000281, the node: /MYLOCKS/0000000281 had been deleted

参考原型:

https://blog.csdn.net/peace1213/article/details/52571445

转载请注明原文地址: https://www.6miu.com/read-2633005.html

最新回复(0)