ZooKeeper – JavaAPI操作

Java
239
0
0
2023-06-07

这里操作 zookeeper 的 Java API使用的是一套zookeeper客户端框架Curator ,它解决了很多Zookeeper客户端非常底层的细节开发工作 。

Curator包含了几个包:

curator-framework :对zookeeper的底层api的一些封装

curator-recipes :封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器等

注意事项:

maven 依赖,这里使用的curator版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败。

1. 创建maven java 工程,导入jar包

pom .xml文件

 <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="#34;
         xmlns:xsi="#34;
         xsi:schemaLocation=" #34;>
    <parent>
        <artifactId>bigdata-sample</artifactId>
        <groupId>com.etbird</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>zookeeper</artifactId>

    <!-- <repositories>
      <repository>
        <id> cloudera </id>
        <url>
      </repository>
    </repositories> -->

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

        <dependency>
            <groupId>com.google.collections</groupId>
            <artifactId>google-collections</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId> junit </groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>  

2. Zookeeper节点操作

2.1 创建永久节点

 package com.etbird.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;

public class ZookeeperApiTest {
    @Test
    public void createNode() throws  Exception  {
        //1.定制重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
        //2.获取客户端对象
        //这里配置ip或域名都可以
        String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);

        //3.调用start开启客户端操作
        client.start();

        //4.通过create来进行创建节点,并且需要指定节点类型
        client.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/hello2/world","www".getBytes());

        //5.关闭客户端
        client.close();
    }
} 

运行测试:

查看创建结果:

2.2 创建临时节点

 /**
 *
 * 创建临时节点
 */
@Test
public void create Tmp Node() throws Exception {
    //1.定制重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    //2.获取客户端对象
    //这里配置ip或域名都可以
    String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);

    //3.调用start开启客户端操作
    client.start();

    //4.通过create来进行创建节点,并且需要指定节点类型
    client.create()
            .creatingParentsIfNeeded()
            .withMode(CreateMode.EPHEMERAL)
            .forPath("/tt","www".getBytes());

    //可以加入sleep,在客户端观察(5秒)
    Thread.sleep(5000);
    //5.关闭客户端
    client.close();
}  

2.3 修改节点数据

 /**
 *
 * 修改节点数据
 * 节点下面添加数据与修改是类似的,一个节点下面会有一个数据,新的数据会覆盖旧的数据
 */
@Test
public void nodeData() throws Exception {
    //1.定制重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    //2.获取客户端对象
    //这里配置ip或域名都可以
    String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);

    //3.调用start开启客户端操作
    client.start();

    //4.修改节点数据
    client.setData()
            .forPath("/hello2","xyz".getBytes());
    
    //5.关闭客户端
    client.close();
}  

2.4 节点数据查询

 /**
 *
 * 查询节点数据
 */
@Test
public void getNodeData() throws Exception {
    //1.定制重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    //2.获取客户端对象
    //这里配置ip或域名都可以
    String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);

    //3.调用start开启客户端操作
    client.start();

    //4.查询节点数据
    byte[] bytes = client.getData()
            .forPath("/hello2");
    System.out.println(new String(bytes));

    //5.关闭客户端
    client.close();
}  

2.5 节点watch机制

 /**
 * zookeeper的watch机制
 *
 * @throws Exception
 */
@Test
public void watchNode() throws Exception {
    //1.定制重试策略
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);

    //2.获取客户端对象
    String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
    CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);

    //3.调用start开启客户端操作
    client.start();

    //4.创建一个treeCache对象,指定要监控的节点路径
    // ExecutorService pool = Executors.newCachedThreadPool();
    TreeCache treeCache = new TreeCache(client, "/hello5");

    //5.自定义监听器和处理过程
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
        public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
            ChildData data = event.getData();
            if (data != null) {
                switch (event.getType()) {
                    case NODE_ADDED:
                        System.out.println("NODE_ADDED : " + data.getPath() + "  数据:" + new String(data.getData()));
                        break;
                    case NODE_REMOVED:
                        System.out.println("NODE_REMOVED : " + data.getPath() + "  数据:" + new String(data.getData()));
                        break;
                    case NODE_UPDATED:
                        System.out.println("NODE_UPDATED : " + data.getPath() + "  数据:" + new String(data.getData()));
                        break;
                    default:
                        break;
                }
            } else {
                System.out.println("data is null : " + event.getType());
            }
        }
    });

    //开始监听
    treeCache.start();

    //延迟,观察程序输出
    Thread.sleep(50000000);

}  

在客户端修改/hello5节点数据为xxx

执行效果如下: