zookeeper是一个分布式协调服务,为用户的分布式应用提供协调服务。使用范围有:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务…… 文章利用zookeeper实现简单的分布式锁
zookeeper虽然提供的服务很多,但底层只有两个功能 1.管理程序提交的数据; 2.为程序提供数据节点监听服务分布式系统多个应用需要操作同一个资源,为资源操作线程安全问题,创建分布式锁,应用取得锁才能操作资源,否则阻塞直到分布锁到该应用节点时才能操作资源。 思路:多个应用在zookeeper上注册,zookeeper采用排序最小的分配锁,如果是最小就获取锁并操作数据,然后删除该节点。
通过连接集群的Zookeeper对象操作节点,主要方法有create/delete/exists,创建zookeeper时可创建监听对象及监听回调方法,当节点状态改变时则触发事件(监听仅一次有效),因此可重新通过get(… true)使监听方法激活为true.
public class DistributedClient { //zookeeper主机 private String hosts = "192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181"; //超时时间 private int SESSION_TIME_OUT = 2000; private String subNode = "/subNode"; private String groupNode = "/group"; private ZooKeeper zk; private volatile String thisPath; //当前节点 public void connectZookeeper() throws Exception { zk = new ZooKeeper(hosts, SESSION_TIME_OUT, new Watcher() { //监听回调 public void process(WatchedEvent event) { if (event.getType() == EventType.NodeChildrenChanged && groupNode.equals(event.getPath())) { //获取子节点,并对父节点进行监听 try { List<String> childrenList = zk.getChildren(groupNode, true); String thisNode = thisPath.substring((groupNode + "/").length()); //比较自己是否是最小的节点 Collections.sort(childrenList); if (0 == childrenList.indexOf(thisNode)) { doSomething(); //特定的业务逻辑,用完锁之后需要删除 //重新注册一次锁, 带序号的锁 thisPath = zk.create(groupNode + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } catch (Exception e) { System.err.println("zookeeper is error"); } } } }); if(null == zk.exists(groupNode, true)) { zk.create(groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //父节点必须为persistent才可创建子节点 } //初始化创建锁 thisPath = zk.create(groupNode + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Thread.sleep(1000); List<String> childrenNodes = zk.getChildren(groupNode, true); if (1 == childrenNodes.size()) { doSomething(); thisPath = zk.create(groupNode + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } } /** * 特写业务实现 * @throws Exception * @throws InterruptedException */ private void doSomething() throws Exception { System.out.println("get this lock" + thisPath); try { Thread.sleep(1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println("finish use this lock, will to release: " + thisPath); zk.delete(this.thisPath, -1); //-1表示不指定版本 } } @Test public void testNode() { try { for (int i = 0; i < 5; i++) { new Thread() { public void run() { try { DistributedClient dl = new DistributedClient(); dl.connectZookeeper(); } catch (Exception e) { e.printStackTrace(); } } }.start(); } Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { System.err.println("zookeeper connect error"); e.printStackTrace(); } } }