Zookeeper Curator使用介绍

Java
309
0
0
2023-02-02
目录
  • 1、添加依赖
  • 2、创建会话
  • 3、创建节点
  • 4、删除节点
  • 5、获取数据
  • 6、更新数据

从编码风格上来讲,curator提供了基于Fluent的编程风格支持

1、添加依赖

在pom.xml文件中添加如下内容:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>

2、创建会话

Curator的创建会话方式与原生的API和ZkClient的创建方式区别很⼤。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。具体如下:

1. 使用CuratorFramework这个工厂类的两个静态方法来创建⼀个客户端

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

其中参数RetryPolicy提供重试策略的接口,可以让⽤户实现⾃定义的重试策略,默认提供了以下实现, 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、RetryForever(永远重试策略)

2. 通过调用CuratorFramework中的start()方法来启动会话

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy); client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000,1000,retryPolicy);
client.start();

其实进⼀步查看源代码可以得知,其实这两种方法内部实现⼀样,只是对外包装成不同的方法。它们的底层都是通过第三个⽅法builder来实现的。

RetryPolicy retryPolicy	= new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
            .connectString("server1:2181,server2:2181,server3:2181")
            .sessionTimeoutMs(50000)
            .connectionTimeoutMs(30000)
            .retryPolicy(retryPolicy)
            .build();
client.start();

参数:

connectString:zk的server地址,多个server之间使⽤英⽂逗号分隔开

connectionTimeoutMs: 连 接 超 时 时 间 , 如 上 是 30s, 默 认 是 15

ssessionTimeoutMs: 会 话 超 时 时 间 , 如 上 是 50s, 默 认 是 60s

retryPolicy:失败重试策略

ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间

计 算 公 式 : 当 前 sleep 时 间 =baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))

maxRetries:最大重试次数

maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间,默认的最大时间是Integer.MAX_VALUE毫秒。

其他,查看org.apache.curator.RetryPolicy接⼝的实现类

start():完成会话的创建

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateSession {
    // 创建会话
    public static void main(String[] args) {
        // 不使用fluent编程风格
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("8.142.8.105:2181", retryPolicy);
        curatorFramework.start();
        System.out.println("会话被建立了");
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println("会话2被创建了");
    }
}

需要注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离

3、创建节点

curator提供了⼀系列Fluent风格的接口,通过使用Fluent编程风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。

下面简单介绍⼀下常用的几个节点创建场景。

(1)创建⼀个初始内容为空的节点

client.create().forPath(path);

Curator默认创建的是持久节点,内容为空。

(2)创建⼀个包含内容的节点

client.create().forPath(path,"我是内容".getBytes());

Curator和ZkClient不同的是依旧采用Zookeeper原生API的⻛格,内容使用byte[]作为方法参数。

(3)递归创建父节点,并选择节点类型

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

creatingParentsIfNeeded这个接口非常有用,在使用ZooKeeper 的过程中,开发人员经常会碰到NoNodeException 异常,其中⼀个可能的原因就是试图对⼀个不存在的父节点创建子节点。因此,开发人员不得不在每次创建节点之前,都判断⼀下该父节点是否存在——这个处理通常比较麻烦。在使用Curator 之后,通过调用creatingParentsIfNeeded接口,Curator就能够自动地递归创建所有需要的⽗节点。

下面通过一个实际例子来演示如何在代码中使用这些API。

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class CreateNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println("会话2被创建了");
        // 创建节点
        String path = "/lg-curator/c1";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
        System.out.println("节点递归创建成功,该节点路径:" + s);
    }
}

4、删除节点

删除节点的方法也是基于Fluent方式来进行操作,不同类型的操作调用新增不同的方法调用即可。

(1)删除一个子节点

client.delete().forPath(path);

(2)删除节点并递归删除其子节点

client.delete().deletingChildrenIfNeeded().forPath(path);

(3)指定版本进行删除

client.delete().withVersion(1).forPath(path);

如果此版本已经不存在,则删除异常,异常信息如下。

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =BadVersion for

(4)强制保证删除一个节点

client.delete().guaranteed().forPath(path);

只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到节点删除成功。比如遇到⼀些网络异常的情况,此guaranteed的强制删除就会很有效果。

演示实例:

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class DeleteNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        // 不使用fluent编程风格
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println("会话2被创建了");
        // 删除节点
        String path = "/lg-curator";
        client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);
        System.out.println("删除成功,删除的节点:" + path);
    }
}

5、获取数据

获取节点数据内容API相当简单,同时Curator提供了传入一个Stat变量的方式来存储服务器端返回的最新的节点状态信息

// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat(); 
client.getData().storingStatIn(stat).forPath(path);

演示:

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class GetNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println("会话2被创建了");
        // 创建节点
        String path = "/lg-curator/c1";
        String s = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
        System.out.println("节点递归创建成功,该节点路径:" + s);
        // 获取节点的数据内容以及状态信息
        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println("获取到的节点数据内容:" + new String(bytes));
        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("获取节点的状态信息:" + stat);
    }
}

6、更新数据

更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如 果version已经变更,则抛出异常。

// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);

版本不一致异常信息:

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for

案例演示:

package com.lagou.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class UpdateNote_curator {
    // 创建会话
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 使用fluent编程风格
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("8.142.8.105:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(retryPolicy)
                .namespace("base")  // 独立的命名空间 /base
                .build();
        client.start();
        System.out.println("会话2被创建了");
        String path = "/lg-curator/c1";
        // 获取节点的数据内容以及状态信息
        // 数据内容
        byte[] bytes = client.getData().forPath(path);
        System.out.println("获取到的节点数据内容:" + new String(bytes));
        // 状态信息
        Stat stat = new Stat();
        client.getData().storingStatIn(stat).forPath(path);
        System.out.println("获取节点的状态信息:" + stat);
        // 更新节点内容
        int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion();
        System.out.println("当前的最新版本是"+version);
        byte[] byte2 = client.getData().forPath(path);
        System.out.println("修改后的节点数据内容:" + new String(byte2));
    }
}