Zookeeper 的常用客户端有3种,分别是:zookeeper 官方客户端、Apache Curator 和 zkclient。
- Zookeeper:Zookeeper官方提供的原生Java客户端
- Curator:Netflix公司在原生zookeeper基础上开源的Java客户端
- Zkclient:在原生Zookeeper基础上进行扩展的开源第三方Java客户端(不推荐使用)
Zookeeper 官方客户端
maven中引入 zookeeper 类库, 并保持版本与服务端一致,避免出现兼容性的问题。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
连接 zookeeper 并操作节点
示例代码:
public class ZkDemo1 {
private static final String CONNECT_STRING = "192.168.3.19:2181";
private static final int SESSION_TIMEOUT = 30;
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
if (event.getType()== Watcher.Event.EventType.None){
System.out.println("ok");
COUNT_DOWN_LATCH.countDown();
}
});
System.out.println(new String(zooKeeper.getData("/node-test-01", null, null)));
COUNT_DOWN_LATCH.await();
}
}
由于创建 zookeeper 客户端实例是异步执行,使用 CountDownLatch 保证客户端实例创建完毕再被使用。
如下代码为最终调用的构造方法,最后执行 cnxn.start();
。
public ZooKeeper(
String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly,
HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info(
"Initiating client connection, connectString={} sessionTimeout={} watcher={}",
connectString,
sessionTimeout,
watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
cnxn = createConnection(
connectStringParser.getChrootPath(),
hostProvider,
sessionTimeout,
this,
watchManager,
getClientCnxnSocket(),
canBeReadOnly);
cnxn.start();
}
继续查看 ClientCnxn 类的 start() 方法源码,SendThread 和 EventThread 有两个线程启动:
SendThread:创建socket连接、获取命令数据发送给服务端、读取服务端响应数据。
EventThread:从 waitingEvents 队列中获取数据。执行监听器 watch 事件、执行 Packet 数据包的异步调用。
public void start() {
sendThread.start();
eventThread.start();
}
实现永久监听
在 Watcher 中数据变化后,重新调用获取数据方法,同时增加当前 Watcher ,实现循环监听。
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDataChanged) {
try {
String changedValue = new String(zooKeeper.getData("/node-test-01", this, null));
System.out.println("changedValue:" + changedValue);
} catch (KeeperException | InterruptedException e) {
//
}
}
}
};
System.out.println(new String(zooKeeper.getData("/node-test-01", watcher, null)));
Curator 客户端
Curator 是一套由 netflix 公司开源的 zooKeeper 客户端框架,解决了 zookeeper 客户端很多非常底层的细节开发工作,包括会话重连、反复注册Watcher和多种异常处理等,并对 leader 选举、 分布式计数器、分布式锁等做了封装。Curator项目是目前 zooKeeper 客户端中使用最多,对 zooKeeper 支持最好的第三方客户端。
引入 curator 类库, curator-framework 包是对 zooKeeper 底层 API 的一些封装,curator-recipes 包封装了一些 zooKeeper 服务的高级特性,如: Cache 事件监听、选举、分布式锁、分布式 Barrier等。使用全部功能,则只引入 curator-recipes 即可;如果只使用对 zookeeper 封装,只引入 curator-framework 即可。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
创建会话、并创建节点
public class CuratorDemo1 {
public static void main(String[] args) throws Exception {
String zookeeperConnectionString = "192.168.3.19:2181";
//重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curatorFramework = CuratorFrameworkFactory
.newClient(zookeeperConnectionString, retryPolicy);
curatorFramework.start();
curatorFramework.create().forPath("/curator-node-01", "hello".getBytes());
}
}
connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个或多个地址;如果是多个地址,使用逗号分隔,如 host1:port1,host2:port2,host3;port3。
retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 zooKeeper 服务端。而 curator 提供了多种重试方式。在 curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回状态是 OK 表示操作没有问题,而 SYSTEMERROR 表示系统或服务端错误。
策略名称 | 说明 |
---|---|
ExponentialBackoffRetry | 重试一组次数,重试之间的睡眠时间增加 |
RetryNTimes | 指定重连次数 |
RetryOneTime | 重连一次 |
RetryUntilElapsed | 在给定的时间结束之前重试 |
数据节点操作示例
创建临时节点(数据为空):
curatorFramework.create().withMode(CreateMode.EPHEMERAL)
.forPath("/curator-ephemeral-node-01");
创建节点,并自动递归创建父节点:
curatorFramework.create()
.creatingParentContainersIfNeeded()
.forPath("/a/b/c", "init".getBytes());
删除节点(如果存在叶子节点,则删除失败):
curatorFramework.delete().forPath("/abc");
删除节点,并递归删除其所有的子节点:
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/a");
更新节点数据:
curatorFramework.setData().forPath("/node-test-01", "a-data".getBytes());
更新节点数据并强制指定版本:
curatorFramework.setData().withVersion(4).forPath("/node-test-01", "b-data".getBytes());
异步操作:
以上的创建、删除、更新等方法都是同步的,curator 提供异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback 接口中一个重要的回调值为 CuratorEvent ,里面包含事件类型、响应吗和节点的详细信息。
//线程池
Executor executor = Executors.newFixedThreadPool(2);
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((cf, ce) -> System.out.printf("eventType:%s,resultCode:%s%n",
ce.getType(), ce.getResultCode()), executor)
.forPath("/async-node-01");
System.in.read(); //防止main线程退出
如果 inBackground 方法不指定 executor ,那么会默认使用 curator 的 EventThread 去进行异步处理。
Curator Caches
原生的 zooKeeper wacher 是一次性的:一个 wacher 一旦触发就会被移出,如果想要反复使用 wacher,就要在 wacher 被移除后重新注册,curator 为此做了优化,curator 引入了 cache 的概念用来实现对 zooKeeper 服务器端进行事件监听;cache 是 curator 对事件监听的包装,其对事件的监听可以理解为一个本地缓存视图和远程 zooKeeper 视图的对比过程;而且 curator 会反复注册监听。
1.NodeCache 对某一个节点进行监听
2.PathChildrenCache 对子节点进行监听(不会对二级子节点进行监听)
3.TreeCache 对当前节点下所有节点进行监听
Curator 5.x中以上3个类已标记为Deprecated。
Curator 5.x使用CuratorCache
和CuratorCacheListener
监听当前节点和子节点(子节点的子节点)的创建、更新、删除。
NODE_CREATED // 节点创建
NODE_CHANGED // 节点更新
NODE_DELETED // 节点删除
监听示例代码:
CuratorCache cache = CuratorCache.build(curatorFramework, "/curator-node-01");
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreates(node -> System.out.printf("Node created: [%s]%n", node))
.forChanges((oldNode, node) -> System.out.printf("Node changed. Old: [%s] New: [%s]%n", oldNode, node))
.forDeletes(oldNode -> System.out.printf("Node deleted. Old value: [%s]%n", oldNode))
.forInitialized(() -> System.out.println("Cache initialized"))
.build();
// 注册listener
cache.listenable().addListener(listener);
// 启动cache
cache.start();
//防止main线程退出
System.in.read();