Zookeeper 的常用客户端有3种,分别是:zookeeper 官方客户端、Apache Curator 和 zkclient。

  1. Zookeeper:Zookeeper官方提供的原生Java客户端
  2. Curator:Netflix公司在原生zookeeper基础上开源的Java客户端
  3. Zkclient:在原生Zookeeper基础上进行扩展的开源第三方Java客户端(不推荐使用)

Zookeeper 官方客户端

maven中引入 zookeeper 类库, 并保持版本与服务端一致,避免出现兼容性的问题。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.3</version>
</dependency>

连接 zookeeper 并操作节点

示例代码:

public class ZkDemo1 {

    private static final String CONNECT_STRING = "192.168.3.19:2181";
    private static final int SESSION_TIMEOUT = 30;
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZooKeeper zooKeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, event -> {
           if (event.getType()== Watcher.Event.EventType.None){
               System.out.println("ok");
               COUNT_DOWN_LATCH.countDown();
           }
        });
        System.out.println(new String(zooKeeper.getData("/node-test-01", null, null)));
        COUNT_DOWN_LATCH.await();
    }
}

由于创建 zookeeper 客户端实例是异步执行,使用 CountDownLatch 保证客户端实例创建完毕再被使用。

如下代码为最终调用的构造方法,最后执行 cnxn.start();

public ZooKeeper(
    String connectString,
    int sessionTimeout,
    Watcher watcher,
    boolean canBeReadOnly,
    HostProvider aHostProvider,
    ZKClientConfig clientConfig) throws IOException {
    LOG.info(
        "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
        connectString,
        sessionTimeout,
        watcher);

    if (clientConfig == null) {
        clientConfig = new ZKClientConfig();
    }
    this.clientConfig = clientConfig;
    watchManager = defaultWatchManager();
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    hostProvider = aHostProvider;

    cnxn = createConnection(
        connectStringParser.getChrootPath(),
        hostProvider,
        sessionTimeout,
        this,
        watchManager,
        getClientCnxnSocket(),
        canBeReadOnly);
    cnxn.start();
}

继续查看 ClientCnxn 类的 start() 方法源码,SendThread 和 EventThread 有两个线程启动:

SendThread:创建socket连接、获取命令数据发送给服务端、读取服务端响应数据。

EventThread:从 waitingEvents 队列中获取数据。执行监听器 watch 事件、执行 Packet 数据包的异步调用。

public void start() {
  sendThread.start();
  eventThread.start();
}

实现永久监听

在 Watcher 中数据变化后,重新调用获取数据方法,同时增加当前 Watcher ,实现循环监听。

Watcher watcher = new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            try {
                String changedValue = new String(zooKeeper.getData("/node-test-01", this, null));
                System.out.println("changedValue:" + changedValue);
            } catch (KeeperException | InterruptedException e) {
                //
            }
        }
    }
};
System.out.println(new String(zooKeeper.getData("/node-test-01", watcher, null)));

Curator 客户端

Curator 是一套由 netflix 公司开源的 zooKeeper 客户端框架,解决了 zookeeper 客户端很多非常底层的细节开发工作,包括会话重连、反复注册Watcher和多种异常处理等,并对 leader 选举、 分布式计数器、分布式锁等做了封装。Curator项目是目前 zooKeeper 客户端中使用最多,对 zooKeeper 支持最好的第三方客户端。

引入 curator 类库, curator-framework 包是对 zooKeeper 底层 API 的一些封装,curator-recipes 包封装了一些 zooKeeper 服务的高级特性,如: Cache 事件监听、选举、分布式锁、分布式 Barrier等。使用全部功能,则只引入 curator-recipes 即可;如果只使用对 zookeeper 封装,只引入 curator-framework 即可。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

创建会话、并创建节点

public class CuratorDemo1 {
    public static void main(String[] args) throws Exception {
        String zookeeperConnectionString = "192.168.3.19:2181";
        //重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .newClient(zookeeperConnectionString, retryPolicy);
        curatorFramework.start();
        curatorFramework.create().forPath("/curator-node-01", "hello".getBytes());
    }
}

connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个或多个地址;如果是多个地址,使用逗号分隔,如 host1:port1,host2:port2,host3;port3。

retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 zooKeeper 服务端。而 curator 提供了多种重试方式。在 curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回状态是 OK 表示操作没有问题,而 SYSTEMERROR 表示系统或服务端错误。

策略名称说明
ExponentialBackoffRetry重试一组次数,重试之间的睡眠时间增加
RetryNTimes指定重连次数
RetryOneTime重连一次
RetryUntilElapsed在给定的时间结束之前重试

数据节点操作示例

创建临时节点(数据为空):

curatorFramework.create().withMode(CreateMode.EPHEMERAL)
        .forPath("/curator-ephemeral-node-01");

创建节点,并自动递归创建父节点:

curatorFramework.create()
        .creatingParentContainersIfNeeded()
        .forPath("/a/b/c", "init".getBytes());

删除节点(如果存在叶子节点,则删除失败):

curatorFramework.delete().forPath("/abc");

删除节点,并递归删除其所有的子节点:

curatorFramework.delete().deletingChildrenIfNeeded().forPath("/a");

更新节点数据:

curatorFramework.setData().forPath("/node-test-01", "a-data".getBytes());

更新节点数据并强制指定版本:

curatorFramework.setData().withVersion(4).forPath("/node-test-01", "b-data".getBytes());

异步操作:

以上的创建、删除、更新等方法都是同步的,curator 提供异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback 接口中一个重要的回调值为 CuratorEvent ,里面包含事件类型、响应吗和节点的详细信息。

//线程池
Executor executor = Executors.newFixedThreadPool(2);
curatorFramework.create()
        .creatingParentsIfNeeded()
        .withMode(CreateMode.EPHEMERAL)
        .inBackground((cf, ce) -> System.out.printf("eventType:%s,resultCode:%s%n",
                ce.getType(), ce.getResultCode()), executor)
        .forPath("/async-node-01");

System.in.read(); //防止main线程退出

如果 inBackground 方法不指定 executor ,那么会默认使用 curator 的 EventThread 去进行异步处理。

Curator Caches

原生的 zooKeeper wacher 是一次性的:一个 wacher 一旦触发就会被移出,如果想要反复使用 wacher,就要在 wacher 被移除后重新注册,curator 为此做了优化,curator 引入了 cache 的概念用来实现对 zooKeeper 服务器端进行事件监听;cache 是 curator 对事件监听的包装,其对事件的监听可以理解为一个本地缓存视图和远程 zooKeeper 视图的对比过程;而且 curator 会反复注册监听。

1.NodeCache 对某一个节点进行监听

2.PathChildrenCache 对子节点进行监听(不会对二级子节点进行监听)

3.TreeCache 对当前节点下所有节点进行监听

Curator 5.x中以上3个类已标记为Deprecated。

Curator 5.x使用CuratorCacheCuratorCacheListener监听当前节点和子节点(子节点的子节点)的创建、更新、删除。

NODE_CREATED // 节点创建
NODE_CHANGED // 节点更新
NODE_DELETED // 节点删除

监听示例代码:

CuratorCache cache = CuratorCache.build(curatorFramework, "/curator-node-01");
CuratorCacheListener listener = CuratorCacheListener.builder()
        .forCreates(node -> System.out.printf("Node created: [%s]%n", node))
        .forChanges((oldNode, node) -> System.out.printf("Node changed. Old: [%s] New: [%s]%n", oldNode, node))
        .forDeletes(oldNode -> System.out.printf("Node deleted. Old value: [%s]%n", oldNode))
        .forInitialized(() -> System.out.println("Cache initialized"))
        .build();

// 注册listener
cache.listenable().addListener(listener);
// 启动cache
cache.start();
//防止main线程退出
System.in.read(); 
最后修改:2022 年 11 月 15 日
如果觉得我的文章对你有用,请随意赞赏