Zookeeper的Java API操作
创始人
2024-06-01 16:02:50
0

Zookeeper的Java API操作

    • 一、先启动Zookeeper集群
    • 二、IDEA 环境搭建
    • 三、创建子节点
    • 四、获取子节点并监听节点变化
    • 五、判断 Znode 是否存在
    • 六、Watcher工作流程

一、先启动Zookeeper集群

二、IDEA 环境搭建

1.创建一个Maven工程:ZookeeperProject
2.在pom.xml文件添加如下内容:

junitjunitRELEASEorg.apache.logging.log4jlog4j-core2.8.2org.apache.zookeeperzookeeper3.5.7

3.拷贝log4j.properties文件到项目根目录
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入:

log4j.rootLogger=INFO, stdout 
log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
log4j.appender.logfile=org.apache.log4j.FileAppender 
log4j.appender.logfile.File=target/spring.log 
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

三、创建子节点

package com.hyj.zk;import org.apache.zookeeper.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.io.IOException;
import java.util.List;public class CreateZnode {//注意:逗号前后不能有空格  指定Zookeeper服务器列表private static String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";/* sessionTimeout指会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。*/private static int sessionTimeout=100000;private ZooKeeper zkClient=null;@Beforepublic void init() throws IOException {//创建一个Zookeeper实例来连接Zookeeper服务器   Watcher会话监听器,服务端将会触发监听zkClient=new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Override  //收到事件通知后的回调函数(用户的业务逻辑)public void process(WatchedEvent watchedEvent) {}});}@Test    //创建子节点public void create() throws InterruptedException, KeeperException {/* 参数 1:要创建的节点的路径; 参数 2:节点数据(一个字节数组) ;参数 3:节点权限 ;ZooDefs.Ids.OPEN_ACL_UNSAFE表示以后对这个节点的任何操作都不受权限控制参数 4:节点的类型   持久无序号节点PERSISTENT    持久带序号节点 PERSISTENT_SEQUENTIAL (persistent_sequential)短暂无序号节点EPHEMERAL     短暂带序号节点 EPHEMERAL_SEQUENTIAL (ephemeral_sequential)*/String s = zkClient.create("/sanguo/xiyouji", "sunwu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}@Afterpublic void close() throws InterruptedException {zkClient.close();}
}

从ZookKeeper系列:watch机制截的一张图在这里插入图片描述

监听的事件类型有:

  • None 客户端连接状态发生改变的时候,会收到None事件通知(如连接成功,连接失败,session会话过期等)
  • NodeCreated 节点被创建
  • NodeDeleted 节点被删除
  • NodeDataChanged 节点数据被修改
  • NodeChildrenChanged 子节点被创建或删除

四、获取子节点并监听节点变化

package com.hyj.zk;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.io.IOException;
import java.util.List;public class GetChildren {//注意:逗号前后不能有空格  指定Zookeeper服务器列表private static String connectString="hadoop102:2181,hadoop103:2181,hadoop104:2181";/* sessionTimeout指会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。*/private static int sessionTimeout=100000;private ZooKeeper zkClient=null;@Beforepublic void init() throws IOException {//创建一个Zookeeper实例来连接Zookeeper服务器   Watcher会话监听器,服务端将会触发监听zkClient=new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Override  //收到事件通知后的回调函数(用户的业务逻辑)public void process(WatchedEvent watchedEvent) {if(watchedEvent.getType() == Event.EventType.None){if(watchedEvent.getState() == Event.KeeperState.SyncConnected){System.out.println("Zookeeper连接成功!!!");}else if(watchedEvent.getState() == Event.KeeperState.Disconnected){System.out.println("客户端和服务器的连接断开!!!");}else if (watchedEvent.getState() == Event.KeeperState.Expired){System.out.println("session会话过期!!!");}}else{System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());try {//再次监听(注册一次,监听一次)List children = zkClient.getChildren("/", true); //false表示不监听,true表示使用默认的watcherfor (String child : children) {System.out.println(child);}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}}});}@Test   //获取子节点并监听节点路径变化public void getChildren() throws InterruptedException, KeeperException {// 参数1: 表示监听的节点     参数2: true表示监听 ,false表示不监听List children = zkClient.getChildren("/", true); //使用默认的watcherfor (String child : children) {System.out.println(child);}//延时阻塞Thread.sleep(Long.MAX_VALUE);}@Test   //获取子节点不监听节点路径变化public void getChildren2() throws InterruptedException, KeeperException {// 参数1: 表示监听的节点     参数2: true表示监听 ,false表示不监听List children = zkClient.getChildren("/", false);  //不注册watcherfor (String child : children) {System.out.println(child);}}@Test   //获取子节点并监听节点路径变化public void getChildren3() throws InterruptedException, KeeperException {// 参数1: 表示监听的节点     参数2: true表示监听 ,false表示不监听List children = zkClient.getChildren("/", new Watcher() {  //注册新的watcher@Override //收到事件通知后的回调函数(用户的业务逻辑)public void process(WatchedEvent watchedEvent) {System.out.println(watchedEvent.getType() + "------" + watchedEvent.getPath());try {List children = zkClient.getChildren("/", false); //这里若是true它还是会使用默认的watcherfor (String child : children) {System.out.println(child);}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}});for (String child : children) {System.out.println(child);}//延时阻塞Thread.sleep(Long.MAX_VALUE);}@Afterpublic void close() throws InterruptedException {zkClient.close();}
}

五、判断 Znode 是否存在

package com.hyj.zk;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.io.IOException;
import java.util.List;public class IsExistNode {//注意:逗号前后不能有空格  指定Zookeeper服务器列表private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";/* sessionTimeout指会话的超时时间,是一个以“毫秒”为单位的整型值。在ZooKeeper中有会话的概念,在一个会话周期内,ZooKeeper客户端和服务端之间会通过心跳检测机制来维持会话的有效性,一旦在sessionTimeout时间内没有进行有效的心跳检测,会话就会失效。*/private static int sessionTimeout = 100000;private ZooKeeper zkClient = null;@Beforepublic void init() throws IOException {//创建一个Zookeeper实例来连接Zookeeper服务器   Watcher会话监听器,服务端将会触发监听zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {@Override //收到事件通知后的回调函数(用户的业务逻辑)public void process(WatchedEvent watchedEvent) {if (watchedEvent.getType() != Event.EventType.None) {System.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());try {List children = zkClient.getChildren("/", false); //false表示不监听,true表示使用默认的watcherfor (String child : children) {System.out.println(child);}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}}});}@Testpublic void exist() throws InterruptedException, KeeperException {// 参数1: 表示要判断的节点     参数2: true表示监听 ,false表示不监听Stat stat = zkClient.exists("/sanguo", false); //不注册watcherSystem.out.println(stat == null ? "not exist" : "exist");}@Testpublic void exist2() throws InterruptedException, KeeperException {// 参数1: 表示要判断的节点     参数2: true表示监听此节点变化 ,false表示不监听Stat stat = zkClient.exists("/sanguo", true);  //使用默认的watcherSystem.out.println(stat == null ? "not exist" : "exist");//延时阻塞Thread.sleep(Long.MAX_VALUE);}@Testpublic void exist3() throws InterruptedException, KeeperException {// 参数1: 表示要判断的节点     参数2: true表示监听此节点变化 ,false表示不监听Stat stat = zkClient.exists("/sanguo", new Watcher() {@Override  //收到事件通知后的回调函数(用户的业务逻辑)public void process(WatchedEvent watchedEvent) {  //注册新的watcherSystem.out.println(watchedEvent.getType() + "--" + watchedEvent.getPath());try {List children = zkClient.getChildren("/", false); //false表示不监听,true表示使用默认的watcherfor (String child : children) {System.out.println(child);}} catch (KeeperException | InterruptedException e) {e.printStackTrace();}}});System.out.println(stat == null ? "not exist" : "exist");//延时阻塞Thread.sleep(Long.MAX_VALUE);}@Afterpublic void close() throws InterruptedException {zkClient.close();}
}

六、Watcher工作流程

Client 向 Zookeeper 服务端注册一个 Watcher ,同时将Watcher对象存储在客户端的 WatcherManager 中。当Zookeeper 服务端的一些指定事件触发了 Watcher 事件时,就会向客户端发送事件通知,客户端就会从WatcherManager 中取出对应的 Watcher 进行回调。

Watcher工作机制分为三个过程:

  1. 客户端注册Watcher

  2. 服务端处理Watcher

  3. 客户端回调Watcher

相关内容

热门资讯

楼船夜雪瓜洲渡,铁马秋风大散... 楼船夜雪瓜洲渡 铁马秋风大散关(妙句之妙)———宋·陆游《书愤》1161年冬秋,南宋军队分别在瓜洲和...
小学生古诗配画图片欣赏 小学生古诗配画图片欣赏  一幅幅精美绝伦的小学生古诗配画图片,相信总会勾起你童年的记忆。随着我们逐渐...
唐诗之《长沙过贾谊宅》 唐诗三百首之《长沙过贾谊宅》  《长沙过贾谊宅》  作者:刘长卿  三年谪宦此栖迟,万古惟留楚客悲。...
念兰堂红烛,心长焰短,向人垂... “念兰堂红烛,心长焰短,向人垂泪。”出处 出自 宋代 晏殊 的《撼庭秋·别来音信千里》“念兰堂红烛,...
春来遍是桃花水,不辨仙源何处... “春来遍是桃花水,不辨仙源何处寻。”出处 出自 唐代 王维 的《桃源行》“春来遍是桃花水,不辨仙源何...
田园风光的经典诗句 关于田园风光的经典诗句(精选130句)  在平平淡淡的学习、工作、生活中,大家都经常接触到诗句吧,诗...
盘飧市远无兼味,樽酒家贫只旧... “盘飧市远无兼味,樽酒家贫只旧醅。”出处 出自 唐代 杜甫 的《客至》“盘飧市远无兼味,樽酒家贫只旧...
唐诗登鹳雀楼 王之涣 唐诗登鹳雀楼 王之涣  王之涣的登鹳雀楼把景色写得浩瀚壮阔,气魄雄浑,让历代文人无不称赞。下面是小编...
洒脱的诗句   洒脱的诗句  1、十年磨一剑,霜刃未曾试。今日把示君,谁有不平事?  2、今夜送归灯火冷,河塘,...
描写桃花盛开的诗句 描写桃花盛开的诗句  1、 把夭桃斫断,煞他风景。——郑板桥《沁园春·恨》  2、 半盏屠苏犹未举,...
赞颂竹子的诗句及出处 赞颂竹子的诗句及出处  赞颂竹子的诗句及其出处:  1、竹生空野外,梢云耸百寻。无人赏高节,徒自抱贞...
辛弃疾《青玉案·元夕》赏析 辛弃疾《青玉案·元夕》赏析  《青玉案·元夕》为宋代大词人辛弃疾的作品。此词从极力渲染元宵节绚丽多彩...
描写登高的经典诗句 描写登高的经典诗句  1、强欲登高去,无人送酒来。遥怜故园菊,应傍战场开。——岑参《行军九日思长安故...
表达友情的诗句参考 表达友情的诗句参考  表达友情的诗句参考  1、折花逢驿使,寄与陇头人。江南无所有,聊赠一枝春。——...
老将,老将韩偓,老将的意思,... 老将,老将韩偓,老将的意思,老将赏析 -诗词大全  老将,老将韩偓,老将的意思,老将赏析 -诗词大全...
分水岭,分水岭伍彬,分水岭的... 分水岭,分水岭伍彬,分水岭的意思,分水岭赏析 -诗词大全 分水岭 作者:伍彬朝代:唐体裁:五律 ...
顾城现代诗作品《黑眼睛》鉴赏 顾城现代诗作品《黑眼睛》鉴赏  【1、作品原文】  黑夜给了我黑色的眼睛  我却用它寻找光明  【2...
《过分水岭》唐诗鉴赏 《过分水岭》唐诗鉴赏  过分水岭  温庭筠  溪水无情似有情,  入山三日得同行。  岭头便是分头处...
描写芍药花的诗句 关于描写芍药花的诗句  在平时的学习、工作或生活中,大家都收藏过令自己印象深刻的诗句吧,诗句一般饱含...
重岩叠嶂,隐天蔽日 “重岩叠嶂,隐天蔽日。”出处 出自 南北朝 郦道元 的《三峡》“重岩叠嶂,隐天蔽日。”全诗《三峡》 ...