zookeeper 学习笔记-zookeeper的应用

xiaoxiao2021-02-28  98

最近组里分享zookeeper,在这里记录下在网上找的资料以及看书的笔记 。

zookeeper 是一个高效可靠的分布式协调服务,分布式应用程序可以基于它实现数据发布订阅,zookeeper 可以实现:配置管理,分布式锁,分布式队列,master 选举,屏障等等。使用的核心技术主要有watcher,以及zookeeper丰富的数据节点类型:持久节点,持久顺序节点,临时节点,临时顺序节点。

系统模型

数据模型: zookeeper的数据结构与普通文件系统非常相似,zoopeeer树中每个节点称为znode,不同于文件系统中的目录与文件,zookeeper每个节点可以有自己的数据,同时又像目录一样可以作为路径标示的一部分。

事务ID:zookeeper中,事务是指能够改变zookeeper服务器状态的操作,一般包括数据节点的创建与删除,节点更新,客户端会话创建与失效等操作。zookeeper会为每一个事务分配一个全局唯一的事务ID--ZXID,从其中可以识别出zookeeper处理这些操作的顺序。

Znode结构: 组成: 1. stat::状态信息,Znode的版本,权限等信息 2. data:与Znode关联的数据 3. children:Znode下的子节点 每个Znode数据大小最多1M,因为zookeeper 所有节点以及节点数据都存放在内存中,如果数据太大,会过多消耗zookeeper服务器内存 节点类型: 1. 持久节点:节点被创建后就会一直存在于zookeeper服务器上,除非显示的删除 2. 持久顺序节点:创建时,zookeeper会自动为给定的节点名后加上一个数字后缀 3. 临时节点:生命周期与客户端会话绑定在一起,会话失效,节点就删除 4. 临时顺序节点

关于watcher

zookeeper 允许客户端向服务端注册一个watcher监听,当服务端的一些指定事件触发了这个watcher,就会向指定客户端发送一个watchEvent事件通知来实现分布式通知的功能。 watchEvent 只包含三部分内容:通知状态,事件类型以及节点路径,即watcher只会通知客户端数据发生了变化,并不会告诉客户端变化后的数据。

配置管理

即发布订阅系统,发布者将数据发布到zookeeper节点中,供订阅者订阅,从而达到数据动态获取的目的,实现配置信息的集中式管理与数据动态更新。

配置管理中心流程:

发布者向 zookeeper服务器注册一个保存配置的节点,并去修改这个节点的数据:

public class Publish implements Watcher{ private static CountDownLatch latch = new CountDownLatch(1); private static Stat stat = new Stat(); private static ZooKeeper zk =null; private final static Integer SESSION_TIMEOUT = 3000; public static String path ="/publish"; public static void main(String[] args) { try { // 创建一个会话实例 zk = new ZooKeeper("127.0.0.1:2181",SESSION_TIMEOUT,new Publish()); // 创建节点 if(createPath(path, String.valueOf(System.currentTimeMillis()))){ latch.await(); // 会话创建完毕, zookeeper会向会话对应的客户端返回一个事件通知,客户端只有获取这个通知后才算创建了会话(process) //修改数据 changeData(); } } catch (Exception e) { e.printStackTrace(); } } public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ System.out.println("receive watched event:"+event); System.out.println(event.getState()); latch.countDown(); System.out.println("zk connection"); } } public static boolean createPath( String path, String data ) { try { zk.exists( path, true ); System.out.println("节点创建成功, Path: " + zk.create( path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT ) + ", content: " + data ); } catch ( Exception e ) {} return true; } public static void changeData(){ try{ System.out.println("init data :pulish node data"+new String( zk.getData(path,true,stat))); // 数据通知 for(int i = 0 ; i< 4 ; i++){ System.out.println( "publish new Data:"+i); zk.setData(path ,String.valueOf(i).getBytes(),-1); Thread.sleep(4000L); } }catch (Exception e){ e.printStackTrace(); } } }

订阅者,订阅的数据发生变化时,会被通知: 注:watcher是一次性的触发器,并且只会通知客户端数据发生了变化,不会通知变化后的数据,客户端想要知道需要再次读取数据,并且重新注册watcher

// 订阅者,数据变化 public class Subscribe1 implements Watcher { private static CountDownLatch latch = new CountDownLatch(1); private static Stat stat = new Stat(); private static ZooKeeper zk =null; private final static Integer SESSION_TIMEOUT = 3000; public static void main(String[] args) { try { String path ="/publish"; zk = new ZooKeeper("127.0.0.1:2181",SESSION_TIMEOUT,new Subscribe1());//在创建ZooKeeper时第三个参数负责设置该类的默认构造函数,连接成功会触发 latch.await(); System.out.println("Subscribe1 connection"); byte[] temp = zk.getData(path,true,stat); System.out.println("Subscribe1 init data :publish node data : "+new String(temp)); List<String> list = zk.getChildren(path, true);//在path 上注册一个节点 System.out.println("Subscribe1 init data : publish children node : " + list); while(true){ Thread.sleep(Integer.MAX_VALUE); } } catch (Exception e) { e.printStackTrace(); } } // 回调方法,负责处理来自Zookeeper服务端的Watcher通知 public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ if(Event.EventType.None == event.getType() && event.getPath() == null){ latch.countDown(); }else if(event.getType() == Event.EventType.NodeDataChanged){ modifyDataChange(event); } } } public void modifyDataChange(WatchedEvent event){ //Client需要去拉取最新的数据信息 try { byte[] newByte = zk.getData(event.getPath(),true,stat); System.out.println("Subscribe1 ---- path:"+event.getPath()+"\tdata has changed.\t new Data :"+ new String(newByte)); } catch (Exception e) { e.printStackTrace(); } } }

与配置中心应用相类似的另一种是服务管理,即对集群服务上下线做统一管理,当工作服务器的基本信息发生改变时,监控服务器就会得到通知,并响应这些变化。这里只要把客户端监听的类型改为对子节点的监听,一台服务器上线,就在父节点下注册一个临时子节点。这样,服务器上线或者下线,子节点数量会发生相应的变化,就会通知客户端,客户端就可以根据这些变化做出响应。

屏障

屏障: 客户端需要等待多个进程完成各自的任务,然后才能继续往前进行下一步。即阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。 单机系统中可以使用java提供的countLatchDown,但是分布式系统中这种方式不再起效。此时可以利用zookeeper的临时节点,以及watcher来实现。

/** * 继承watcher,实现分布式环境中不同任务之间的同步处理 * 针对事件的触发使线程做出相应的处理,从而避免无谓的while(true),导致cpu空转。 */ public class Barrier implements Watcher { private static final String addr = "127.0.0.1:2181"; private ZooKeeper zk = null; private Integer mutex; private int size = 0; private String root; public Barrier(String root, int size) { this.root = root; this.size = size; try { zk = new ZooKeeper(addr, 10 * 1000, this); mutex = new Integer(-1); // 1,每启动一个线程,都会给该线程加一个对象锁 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } /** * 3, 每个线程都在监听znode的子节点个数,当有新线程进入时,监听Znode子节点的线程都会再次被唤醒 * 唤醒后,线程继续while(true) 循环,继续判断是否符合屏障要求 */ public synchronized void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } /** * 2,进入屏障 * 通过while循环判断创建的节点个数 * 不符合则线程阻塞, 当有其他新线程进入时, 回调process,唤醒线程,继续判断是否符合要求 * 若符合要求,enter 方法执行完毕,即撤出屏障 */ public boolean enter(String name) throws Exception { zk.create(root + "/" + name, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { System.out.println(root + "/" + name + "await"); mutex.wait(); } else { return true; } } } } public boolean leave(String name) throws KeeperException, InterruptedException { zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } }

测试代码

public class BarrierTest { public static void main(String args[]) throws Exception { for (int i = 0; i < 40; i++) { Process p = new Process("Thread-" + i, new Barrier("/test_node", 40)); p.start(); Thread.sleep(1000); } } } class Process extends Thread { private String name; private Barrier barrier; public Process(String name, Barrier barrier) { this.name = name; this.barrier = barrier; } @Override public void run() { try { // System.out.println(name + "began run"); barrier.enter(name); System.out.println(name + " enter"); Thread.sleep(3000); barrier.leave(name); System.out.println(name + " leave"); } catch (Exception e) { e.printStackTrace(); } } }

参考: http://www.cnblogs.com/wuxl360/p/5817471.html https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper-code/ http://www.cnblogs.com/yuyijq/p/3424473.html http://yuzhouwan.com/posts/31915/

转载请注明原文地址: https://www.6miu.com/read-24332.html

最新回复(0)