curator对Zookeeper节点监听总结 zookee

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战

zookeeper简介

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

ZooKeeper包含一个简单的原语集,提供Java和C的接口。

ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

以上内容摘自【百度百科】,详细了解可以去官网https:/zookeeper.apache.org/

zookeeper集群搭建

关于单节点,单服务器多节点的伪集群,集群,网上相关资料很多,不再赘述。

curator简介

zookeeper提供的原生API操作过于烦琐,curator框架是对zookeeper提供的原生API进行了封装,提供了更高级的API接口,使客户端程序员使用zookeeper更加容易及高效。

关于curaotr的详细介绍可参看官网curator.apache.org/

zookeeper监听的原生API

zookeeper原生的JAVA API提供监听接口是Watcher接口,使用的时候实现该接口重写process(WatchedEvent watchedEvent)方法即可。

缺点:该监听只能监听一次,触发一次之后即失效,如果需要重复使用,需要每使用一次,即注册一次。

测试代码及说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
java复制代码public class ZkWatcherTest {

  private static final String ADDR = "192.168.100.1:2181";
  private static final String PATH = "/test1";
  private static ZooKeeper zk = null;
  static {
      try {
          zk = new ZooKeeper(ADDR, 3000, new WatcherTest());
      } catch (IOException e) {
          e.printStackTrace();
      }
  }

  public static void main(String[] args) throws InterruptedException {
      test1();
      Thread.sleep(Integer.MAX_VALUE);
  }

  private static void test1(){
      try {
          /**
            * set 事件触发(修改节点值)
            * get 不触发
            * 对一级子节点操作不触发
            * 删除当前节点触发,如果是同步会抛异常(KeeperErrorCode = NoNode),再创建再删除不再触发
            * 创建节点不触发
            * 同步连接时,exists事件未触发,异步未测试
            */
          zk.getData(PATH, true, null);
          /**
            * set 事件触发(修改节点值)
            * get 不触发
            * 对一级子节点操作(CRUD)不触发
            * 删除当前节点触发,未抛异常
            * 同步连接时,exists事件未触发,异步未测试
            * 创建节点触发
            */
          zk.exists(PATH, true);
          /**
            * 删除节点触发,抛出异常
            * set 不触发(修改节点值)
            * get 不触发
            * 删除节点再创建之后 ,操作一级节点不触发
            * 创建一级节点触发
            * 删除一级节点触发
            * 修改一级节点的值不触发
            * 同步连接时,exists事件未触发,异步未测试
            */
          zk.getChildren(PATH, true);
      } catch (KeeperException e) {
          e.printStackTrace();
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }

  private static class WatcherTest implements Watcher{
      public void process(WatchedEvent watchedEvent) {
          System.out.println("wathceEvent: " + watchedEvent);
          try {
            //watcher当回调方法后已失效,如下面第二个参数为true,注册默认Watcher
              zk.getChildren(PATH, true);
          } catch (KeeperException e) {
              e.printStackTrace();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }

      }
  }
}
public class ZkWatcherTest {

  private static final String ADDR = "192.168.100.1:2181";
  private static final String PATH = "/test1";
  private static ZooKeeper zk = null;
  static {
      try {
          zk = new ZooKeeper(ADDR, 3000, new WatcherTest());
      } catch (IOException e) {
          e.printStackTrace();
      }
  }

  public static void main(String[] args) throws InterruptedException {
      test1();
      Thread.sleep(Integer.MAX_VALUE);
  }

  private static void test1(){
      try {
          /**
            * set 事件触发(修改节点值)
            * get 不触发
            * 对一级子节点操作不触发
            * 删除当前节点触发,如果是同步会抛异常(KeeperErrorCode = NoNode),再创建再删除不再触发
            * 创建节点不触发
            * 同步连接时,exists事件未触发,异步未测试
            */
          zk.getData(PATH, true, null);
          /**
            * set 事件触发(修改节点值)
            * get 不触发
            * 对一级子节点操作(CRUD)不触发
            * 删除当前节点触发,未抛异常
            * 同步连接时,exists事件未触发,异步未测试
            * 创建节点触发
            */
          zk.exists(PATH, true);
          /**
            * 删除节点触发,抛出异常
            * set 不触发(修改节点值)
            * get 不触发
            * 删除节点再创建之后 ,操作一级节点不触发
            * 创建一级节点触发
            * 删除一级节点触发
            * 修改一级节点的值不触发
            * 同步连接时,exists事件未触发,异步未测试
            */
          zk.getChildren(PATH, true);
      } catch (KeeperException e) {
          e.printStackTrace();
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }

  private static class WatcherTest implements Watcher{
      public void process(WatchedEvent watchedEvent) {
          System.out.println("wathceEvent: " + watchedEvent);
          try {
            //watcher当回调方法后已失效,如下面第二个参数为true,注册默认Watcher
              zk.getChildren(PATH, true);
          } catch (KeeperException e) {
              e.printStackTrace();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }

      }
  }
}

对于Zookeeper提供的原生JAVA API来说,初始化客户端实例的时候需要传入一个Watcher参数,该值可以为空,这是注册一个默认的Watcher,该Watcher在第一次调用之后便会失效,getData, exists, getChildren三个接口的第二个参数设置为true即可再次注册watcher(默认Watcher,即初始化Zookeeper客户端传入的Watcher),对于每个接口注册watcher能够监听的事件状态和触发Watcher的事件类型,参看注释说明。

curator官方推荐的高级监听API

curator官方推荐的API是对zookeeper原生的JAVA API进行了封装,将重复注册,事件信息等很好的处理了。而且监听事件返回了详细的信息,包括变动的节点路径,节点值等等,这是原生API所没有的。

这个对事件的监听类似于一个本地缓存视图和远程Zookeeper视图的对比过程。

注:curator的方法调用采用的是流式API,此种风格的优点及使用注意事项可自行查阅资料了解。

官方推荐的API提供了三个接口,分别如下:

  • NodeCache

对一个节点进行监听,监听事件包括指定路径的增删改操作

  • PathChildrenCache

对指定路径节点的一级子目录监听,不对该节点的操作监听,对其子目录的增删改操作监听

  • TreeCache

综合NodeCache和PathChildrenCahce的特性,是对整个目录进行监听,可以设置监听深度。

这三个API的用法及注意事项和部分参数说明如下测试代码所示:内容比较琐碎不再抽出进行详细总结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
java复制代码public class ZKCurator {
 
  private static final String ADDR = "192.168.100.1:2181";
  private static final String PATH = "/zktest1";

  public static void main(String[] args) throws InterruptedException {
      final CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ADDR, new RetryNTimes(10, 5000));
      zkClient.start();
      System.out.println("start zkclient...");
      Thread thread = null;

      try {
          registerWatcher(zkClient);
          //registerNodeCache(zkClient);
      } catch (Exception e) {
          e.printStackTrace();

      }

      System.out.println("register wathcer end...");
      Thread.sleep(Integer.MAX_VALUE);
      zkClient.close();
  }

  private static void registerWatcher(CuratorFramework zkClient) throws Exception {
      /**
        * 注册监听器,当前节点不存在,创建该节点:未抛出异常及错误日志
        * 注册子节点触发type=[CHILD_ADDED]
        * 更新触发type=[CHILD_UPDATED]
        *
        * zk挂掉type=CONNECTION_SUSPENDED,,一段时间后type=CONNECTION_LOST
        * 重启zk:type=CONNECTION_RECONNECTED, data=null
        * 更新子节点:type=CHILD_UPDATED, data=ChildData{path='/zktest111/tt1', stat=4294979983,4294979993,1501037475236,1501037733805,2,0,0,0,6,0,4294979983
        , data=[55, 55, 55, 55, 55, 55]}

        * 删除子节点type=CHILD_REMOVED
        * 更新根节点:不触发
        * 删除根节点:不触发 无异常
        * 创建根节点:不触发
        * 再创建及更新子节点不触发
        *
        * 重启时,与zk连接失败
        */
      ExecutorService service = Executors.newFixedThreadPool(3);
      PathChildrenCache watcher = new PathChildrenCache(zkClient, PATH, true/*,false, service*/);
      watcher.getListenable().addListener(new PathChildrenCacheListener() {
          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
              System.out.println(pathChildrenCacheEvent);
          }
      });
      /*PathChildrenCache.StartMode说明如下
      *POST_INITIALIZED_EVENT
      *1、在监听器启动的时候即,会枚举当前路径所有子节点,触发CHILD_ADDED类型的事件
      * 2、同时会监听一个INITIALIZED类型事件
      * NORMAL异步初始化cache
      * POST_INITIALIZED_EVENT异步初始化,初始化完成触发事件PathChildrenCacheEvent.Type.INITIALIZED
      /*NORMAL只和POST_INITIALIZED_EVENT的1情况一样,不会ALIZED类型事件触发
   
      /*BUILD_INITIAL_CACHE 不会触发上面两者事件,同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据       */
      watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
      System.out.println("注册watcher成功...");
  }

  public static void registerNodeCache(CuratorFramework client) throws Exception{
      /*
      * 节点路径不存在时,set不触发监听
      * 节点路径不存在,,,创建事件触发监听
      * 节点路径存在,set触发监听
      * 节点路径存在,delete触发监听
      *
      *
      * 节点挂掉,未触发任何监听
      * 节点重连,未触发任何监听
      * 节点重连 ,恢复监听
      * */
      final NodeCache nodeCache = new NodeCache(client, PATH, false);
      nodeCache.getListenable().addListener(new NodeCacheListener() {
          public void nodeChanged() throws Exception {
              System.out.println("当前节点:"+nodeCache.getCurrentData());
          }
      });
      //如果为true则首次不会缓存节点内容到cache中,默认为false,设置为true首次不会触发监听事件
      nodeCache.start(true);
  }

public static void registTreeCache(CuratorFramework client) throws Exception {
      /**
        * TreeCache.nodeState == LIVE的时候,才能执行getCurrentChildren非空,默认为PENDING
        * 初始化完成之后,监听节点操作时 TreeCache.nodeState == LIVE
        *
        * maxDepth值设置说明,比如当前监听节点/t1,目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全
        * 监听,即监听到t4,如果为2,则监听到t3,对t3的子节点操作不再触发
        * maxDepth最大值2147483647
        *
        * 初次开启监听器会把当前节点及所有子目录节点,触发[type=NODE_ADDED]事件添加所有节点(小等于maxDepth目录)
        * 默认监听深度至最低层
        * 初始化以[type=INITIALIZED]结束
        *
        * [type=NODE_UPDATED],set更新节点值操作,范围[当前节点,maxDepth目录节点](闭区间)
        *
        *
        * [type=NODE_ADDED] 增加节点 范围[当前节点,maxDepth目录节点](左闭右闭区间)
        *
        * [type=NODE_REMOVED] 删除节点, 范围[当前节点, maxDepth目录节点](闭区间),删除当前节点无异常
        *
        * 事件信息
        * TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/zktest1', stat=4294979373,4294979373,1499850881635,1499850881635,0,0,0,0,2,0,4294979373
        , data=[116, 49]}}
        *
        */
      final TreeCache treeCache = TreeCache.newBuilder(client, PATH).setCacheData(true).setMaxDepth(2).build();
      treeCache.getListenable().addListener(new TreeCacheListener() {
          public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
              System.out.println("treeCacheEvent: "+treeCacheEvent);
          }
      });
      //没有开启模式作为入参的方法
      treeCache.start();
  }
} class ZKCurator {
 
  private static final String ADDR = "192.168.100.1:2181";
  private static final String PATH = "/zktest1";

  public static void main(String[] args) throws InterruptedException {
      final CuratorFramework zkClient = CuratorFrameworkFactory.newClient(ADDR, new RetryNTimes(10, 5000));
      zkClient.start();
      System.out.println("start zkclient...");
      Thread thread = null;

      try {
          registerWatcher(zkClient);
          //registerNodeCache(zkClient);
      } catch (Exception e) {
          e.printStackTrace();

      }

      System.out.println("register wathcer end...");
      Thread.sleep(Integer.MAX_VALUE);
      zkClient.close();
  }

  private static void registerWatcher(CuratorFramework zkClient) throws Exception {
      /**
        * 注册监听器,当前节点不存在,创建该节点:未抛出异常及错误日志
        * 注册子节点触发type=[CHILD_ADDED]
        * 更新触发type=[CHILD_UPDATED]
        *
        * zk挂掉type=CONNECTION_SUSPENDED,,一段时间后type=CONNECTION_LOST
        * 重启zk:type=CONNECTION_RECONNECTED, data=null
        * 更新子节点:type=CHILD_UPDATED, data=ChildData{path='/zktest111/tt1', stat=4294979983,4294979993,1501037475236,1501037733805,2,0,0,0,6,0,4294979983
        , data=[55, 55, 55, 55, 55, 55]}

        * 删除子节点type=CHILD_REMOVED
        * 更新根节点:不触发
        * 删除根节点:不触发 无异常
        * 创建根节点:不触发
        * 再创建及更新子节点不触发
        *
        * 重启时,与zk连接失败
        */
      ExecutorService service = Executors.newFixedThreadPool(3);
      PathChildrenCache watcher = new PathChildrenCache(zkClient, PATH, true/*,false, service*/);
      watcher.getListenable().addListener(new PathChildrenCacheListener() {
          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
              System.out.println(pathChildrenCacheEvent);
          }
      });
      /*PathChildrenCache.StartMode说明如下
      *POST_INITIALIZED_EVENT
      *1、在监听器启动的时候即,会枚举当前路径所有子节点,触发CHILD_ADDED类型的事件
      * 2、同时会监听一个INITIALIZED类型事件
      * NORMAL异步初始化cache
      * POST_INITIALIZED_EVENT异步初始化,初始化完成触发事件PathChildrenCacheEvent.Type.INITIALIZED
      /*NORMAL只和POST_INITIALIZED_EVENT的1情况一样,不会ALIZED类型事件触发
   
      /*BUILD_INITIAL_CACHE 不会触发上面两者事件,同步初始化客户端的cache,及创建cache后,就从服务器端拉入对应的数据       */
      watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
      System.out.println("注册watcher成功...");
  }

  public static void registerNodeCache(CuratorFramework client) throws Exception{
      /*
      * 节点路径不存在时,set不触发监听
      * 节点路径不存在,,,创建事件触发监听
      * 节点路径存在,set触发监听
      * 节点路径存在,delete触发监听
      *
      *
      * 节点挂掉,未触发任何监听
      * 节点重连,未触发任何监听
      * 节点重连 ,恢复监听
      * */
      final NodeCache nodeCache = new NodeCache(client, PATH, false);
      nodeCache.getListenable().addListener(new NodeCacheListener() {
          public void nodeChanged() throws Exception {
              System.out.println("当前节点:"+nodeCache.getCurrentData());
          }
      });
      //如果为true则首次不会缓存节点内容到cache中,默认为false,设置为true首次不会触发监听事件
      nodeCache.start(true);
  }

public static void registTreeCache(CuratorFramework client) throws Exception {
      /**
        * TreeCache.nodeState == LIVE的时候,才能执行getCurrentChildren非空,默认为PENDING
        * 初始化完成之后,监听节点操作时 TreeCache.nodeState == LIVE
        *
        * maxDepth值设置说明,比如当前监听节点/t1,目录最深为/t1/t2/t3/t4,则maxDepth=3,说明下面3级子目录全
        * 监听,即监听到t4,如果为2,则监听到t3,对t3的子节点操作不再触发
        * maxDepth最大值2147483647
        *
        * 初次开启监听器会把当前节点及所有子目录节点,触发[type=NODE_ADDED]事件添加所有节点(小等于maxDepth目录)
        * 默认监听深度至最低层
        * 初始化以[type=INITIALIZED]结束
        *
        * [type=NODE_UPDATED],set更新节点值操作,范围[当前节点,maxDepth目录节点](闭区间)
        *
        *
        * [type=NODE_ADDED] 增加节点 范围[当前节点,maxDepth目录节点](左闭右闭区间)
        *
        * [type=NODE_REMOVED] 删除节点, 范围[当前节点, maxDepth目录节点](闭区间),删除当前节点无异常
        *
        * 事件信息
        * TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/zktest1', stat=4294979373,4294979373,1499850881635,1499850881635,0,0,0,0,2,0,4294979373
        , data=[116, 49]}}
        *
        */
      final TreeCache treeCache = TreeCache.newBuilder(client, PATH).setCacheData(true).setMaxDepth(2).build();
      treeCache.getListenable().addListener(new TreeCacheListener() {
          public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
              System.out.println("treeCacheEvent: "+treeCacheEvent);
          }
      });
      //没有开启模式作为入参的方法
      treeCache.start();
  }
}

注意事项:

1、Curator只是封装了原生Zookeeper的监听事件,使客户端程序员无序重复注册Watcher,但是Wathcer的一次性还是存在的,只是由curator完成。因此对于某些场景使用依然需要慎重。因为curator需要重复注册,因此,第一次触发Wathcer与再次注册Watcher即使是异常操作,但是中间还是存在时延,假使对于Zookeeper瞬时触发几个事件,则该监听器并不能保证监听到所有状态的改变,至于可以监听到多少取决于服务器的处理速度。

2、只要curator的cache启动成功,监听器注册成功,理论上只要没有1的情况下,监听器是可以很完美的处理需要监听到的事件。但是如果在cache.start()的时候,与Zookeeper的连接是中断的,则后续连接恢复,也无法让客户端感知到需要监听的变动。我当时想到的一个解决方案是在Zookeeper启动的时候设置一个连接状态的监听器(连接状态监听器看第7节),如果Zookeeper客户端连接状态是连接失败,则添加这个监听器,恢复连接的时候,调用cache.clearAndRefresh(),然后移除连接状态监听器即可。

但是,这个接口只针对PathChildrenCache,因为该监听器监听节点删除的时候,再次创建也不会再有重新监听的效果,调用该接口即可恢复。另外两种监听器可以不用考虑这种情况,原因取决于监听器的内部实现。

curator使用zookeeper原生监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码final CuratorFramework client = CuratorFrameworkFactory.builder()
              .retryPolicy(new RetryNTimes(0, 1000))
              .connectionTimeoutMs(4000)
              .sessionTimeoutMs(40000)
              .connectString(ADDR)
              .defaultData(null)
              .build();

client.checkExists().usingWatcher(new Watcher() {
  public void process(WatchedEvent event) {

  }
}); CuratorFramework client = CuratorFrameworkFactory.builder()
              .retryPolicy(new RetryNTimes(0, 1000))
              .connectionTimeoutMs(4000)
              .sessionTimeoutMs(40000)
              .connectString(ADDR)
              .defaultData(null)
              .build();

client.checkExists().usingWatcher(new Watcher() {
  public void process(WatchedEvent event) {

  }
});

如代码所示,和原生API的使用方法差不多,只不过curator的接口调用风格,监听器的用法、特性及触发事件和原生监听器一样,因为这里传入的参数便是Zookeeper原生监听器,当然也可以是CuratorWathcer参数。

这样用的话,监听器便需要自己实现重复注册了。

使用curator但是却不使用它提供的高级监听器的API是应对于某些特殊的业务场景。

其它监听器

比如连接状态监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码final CuratorFramework client = CuratorFrameworkFactory.builder()
              .retryPolicy(new RetryNTimes(0, 1000))
              .connectionTimeoutMs(4000)
              .sessionTimeoutMs(40000)
              .connectString(ADDR)
              .defaultData(null)
              .build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
  public void stateChanged(CuratorFramework client, ConnectionState newState) {
     
  }
}); CuratorFramework client = CuratorFrameworkFactory.builder()
              .retryPolicy(new RetryNTimes(0, 1000))
              .connectionTimeoutMs(4000)
              .sessionTimeoutMs(40000)
              .connectString(ADDR)
              .defaultData(null)
              .build();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
  public void stateChanged(CuratorFramework client, ConnectionState newState) {
     
  }
});

ConnectionState参数说明:

ConnectionState 说明
CONNECTED 第一次成功连接时
SUSPENDED 连接丢失但是连接尚未超时的时候
RECONECTED SUSPENDED、LOST、READ_ONLY三种状态之后并重新建立连接的时候
LOST 连接确认丢失的时候
READ_ONLY 只读模式。该模式会在客户端初始化的时候设置

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%