这里操作 zookeeper 的 Java API使用的是一套zookeeper客户端框架Curator ,它解决了很多Zookeeper客户端非常底层的细节开发工作 。
Curator包含了几个包:
curator-framework :对zookeeper的底层api的一些封装
curator-recipes :封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器等
注意事项:
maven 依赖,这里使用的curator版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败。
1. 创建maven java 工程,导入jar包
pom .xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="#34;
xmlns:xsi="#34;
xsi:schemaLocation=" #34;>
<parent>
<artifactId>bigdata-sample</artifactId>
<groupId>com.etbird</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>zookeeper</artifactId>
<!-- <repositories>
<repository>
<id> cloudera </id>
<url>
</repository>
</repositories> -->
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId> junit </groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. Zookeeper节点操作
2.1 创建永久节点
package com.etbird.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.junit.Test;
public class ZookeeperApiTest {
@Test
public void createNode() throws Exception {
//1.定制重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//2.获取客户端对象
//这里配置ip或域名都可以
String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);
//3.调用start开启客户端操作
client.start();
//4.通过create来进行创建节点,并且需要指定节点类型
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/hello2/world","www".getBytes());
//5.关闭客户端
client.close();
}
}
运行测试:
查看创建结果:
2.2 创建临时节点
/**
*
* 创建临时节点
*/
@Test
public void create Tmp Node() throws Exception {
//1.定制重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//2.获取客户端对象
//这里配置ip或域名都可以
String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);
//3.调用start开启客户端操作
client.start();
//4.通过create来进行创建节点,并且需要指定节点类型
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/tt","www".getBytes());
//可以加入sleep,在客户端观察(5秒)
Thread.sleep(5000);
//5.关闭客户端
client.close();
}
2.3 修改节点数据
/**
*
* 修改节点数据
* 节点下面添加数据与修改是类似的,一个节点下面会有一个数据,新的数据会覆盖旧的数据
*/
@Test
public void nodeData() throws Exception {
//1.定制重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//2.获取客户端对象
//这里配置ip或域名都可以
String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);
//3.调用start开启客户端操作
client.start();
//4.修改节点数据
client.setData()
.forPath("/hello2","xyz".getBytes());
//5.关闭客户端
client.close();
}
2.4 节点数据查询
/**
*
* 查询节点数据
*/
@Test
public void getNodeData() throws Exception {
//1.定制重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
//2.获取客户端对象
//这里配置ip或域名都可以
String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);
//3.调用start开启客户端操作
client.start();
//4.查询节点数据
byte[] bytes = client.getData()
.forPath("/hello2");
System.out.println(new String(bytes));
//5.关闭客户端
client.close();
}
2.5 节点watch机制
/**
* zookeeper的watch机制
*
* @throws Exception
*/
@Test
public void watchNode() throws Exception {
//1.定制重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 3);
//2.获取客户端对象
String zkServerStr = "192.168.204.101:2181,192.168.204.102:2181,192.168.204.103:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(zkServerStr, 1000, 1000, retryPolicy);
//3.调用start开启客户端操作
client.start();
//4.创建一个treeCache对象,指定要监控的节点路径
// ExecutorService pool = Executors.newCachedThreadPool();
TreeCache treeCache = new TreeCache(client, "/hello5");
//5.自定义监听器和处理过程
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if (data != null) {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("NODE_ADDED : " + data.getPath() + " 数据:" + new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("NODE_REMOVED : " + data.getPath() + " 数据:" + new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("NODE_UPDATED : " + data.getPath() + " 数据:" + new String(data.getData()));
break;
default:
break;
}
} else {
System.out.println("data is null : " + event.getType());
}
}
});
//开始监听
treeCache.start();
//延迟,观察程序输出
Thread.sleep(50000000);
}
在客户端修改/hello5节点数据为xxx
执行效果如下: