对zookeeper
的开源框架curator
中关于listener
的使用不算难,这个地方我整理了2篇比较好的文章,对齐用法作出说明。
参考文章如下所示:
此文不是原创,是参考后进行了2文的整理和总结,如果文章有说明的不清晰的地方,请参考原文出处。
三更灯火五更鸡,正是男儿读书时。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<!--此处需要排除 log4j2 的引导,因为当前项目已使用logback-->
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
</dependency>
<!--对监听的操作封装到了此库中-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
在进正文前,写将我在写这个博客过程中发现的一些旁枝末节的问题告诉大家:
zookeeper
在创建节点之前,请一定要判定其节点是否存放,否则可能会报错:org.apache.zookeeper.KeeperException$NodeExistsException:
,核心错误是:NodeExistsException
。zookeeper
中如果使用usingWatcher
的时候,请注意你监听的节点必须是已经存在的,否则会抛出错误org.apache.zookeeper.KeeperException$NodeExistsException:
,核心错误是NodeExists
直接先上代码:
byte[] content = client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器watchedEvent:" + watchedEvent);
}
}).forPath(path);
通过源码查看可以发现 WatchedEvent
中有如下事件:
public enum EventType {
None (-1),
NodeCreated (1), //节点创建
NodeDeleted (2), //节点删除
NodeDataChanged (3), //节点数据改变
NodeChildrenChanged (4); //子节点改变
//省略部分代码...
}
由此可见其支持的事件还是很多的,假如我使用如下代码来改变节点数据:
//设置获取修改节点
client.setData().forPath(path, "Hicode".getBytes());
ThreadUtil.sleep(2, TimeUnit.SECONDS);
client.setData().forPath(path, "Club".getBytes());
ThreadUtil.sleep(100, TimeUnit.SECONDS);
你可以发现,监听只会触发一次,只会输出一次。
这个也确实是该用法的一个问题,每个事件监听只会触发一次,如果需要反复触发,需要在每次进行操作前再添加一个监听,代码如下所示,通过这个案例你会发现使用 usingWatcher
会比较繁琐:
// 第一次注册监听
client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器watchedEvent:" + watchedEvent);
}
}).forPath(path);
//设置获取修改节点
System.out.println("第一次改变");
client.setData().forPath(path, "Hicode".getBytes());
// 第二次注册监听
client.getData().usingWatcher(new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听器watchedEvent:" + watchedEvent);
}
}).forPath(path);
ThreadUtil.sleep(2, TimeUnit.SECONDS);
System.out.println("第二次改变");
client.setData().forPath(path, "Club".getBytes());
ThreadUtil.sleep(12, TimeUnit.SECONDS);
总结下:
usingWatcher
只会触发一次,每一次被触发后,需要再次注册监听才行。还是先上代码,如下代码表示注册监听
CuratorListener listener = new CuratorListener() {
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("监听事件触发,event事件为:" + event.getType());
}
client.getCuratorListenable().addListener(listener);
假设我们有如下代码来改变状态:
//创建临时子节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path + "/hi");
// 异步获取节点数据
client.getData().inBackground().forPath(path);
// 变更节点内容inBackground 。
client.setData().forPath(path, "124443".getBytes());
//创建临时子节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path + "/hi");
client.delete().deletingChildrenIfNeeded().forPath(path);
仔细观察结果可以发现:
监听事件触发,event事件为:WATCHED
监听事件触发,event事件为:GET_DATA
监听事件触发,event事件为:CREATE
通过输出,我们可以发现,监听一共触发了3次。
type
为watched
表示被监听。GET_DATA
获取数据,结合代码可以看出上client.getData()
触发的。CREATE
创建事件,通过比对代代码可以看出是因为create
触发的。你会不会有疑问,为什么setData()
和delete()
没有触发,是因为不支持嘛?
如果你仔细你可以发现,触发了事件监听的都是因为一句关键代码inBackground
。你可以试试在没有触发的代码中加入这个试试。
那么他支持哪些事件了,通过查看源码可以分析出:
package org.apache.curator.framework.api;
public enum CuratorEventType {
CREATE,
DELETE,
EXISTS,
GET_DATA,
SET_DATA,
CHILDREN,
SYNC,
GET_ACL,
SET_ACL,
TRANSACTION,
GET_CONFIG,
RECONFIG,
WATCHED,
REMOVE_WATCHES,
CLOSING;
}
几乎所有的事件都是支持的,前提是通过 inbackground
在后台运行。
在使用此方式的时候,是需要引入这个 jar 包的。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
他是通过缓存的节点数据的变更来完成监听的,在我的理解中是通过缓存数据的变更来触发事件的,并不是直接接听服务节点。
该缓存监听的是节点自身的变化,比如说创建、变更和删除。
@Test
public void nodeCacheTest() throws Exception {
String path = "/hicode/club";
CuratorFramework client = CuratorFrameworkFactory.builder().retryPolicy(new RetryNTimes(5, 2000)) .connectString(CONNECT_STR).build();
client.start();
//通过 client 创建 nodeCache,请注意一定要 start
NodeCache nodeCache = new NodeCache(client, path);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("nodeChanged...");
}
});
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path+"/"+"test");
client.setData().forPath(path+"/test","cc".getBytes());
client.setData().forPath(path+"/test","hh".getBytes());
client.delete().forPath(path+"/test");
ThreadUtil.sleep(5,TimeUnit.MINUTES);
nodeCache.close();
client.close();
}
上述代码如下代码是构建缓存并创建监听:
//通过 client 创建 nodeCache,请注意一定要 start
NodeCache nodeCache = new NodeCache(client, path);
nodeCache.start();
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("nodeChanged...");
}
});
最终输出的只有一句:
nodeChanged...
刚好和我前文提到的:NodeCache
只会监听节点自身的变更。
该缓存变相监听的是节点的路径的变更,比如说因为子节点创建导致路径变长,子节点删除导致路径变短,对节点自身的值改变不做监听。
@Test
public void pathCacheTest() throws Exception {
String path = "/hicode/club/childPath";
CuratorFramework client = CuratorFrameworkFactory.builder()
.retryPolicy(new RetryNTimes(5, 2000)) .connectString(CONNECT_STR).build();
client.start();
PathChildrenCache pathCache = new PathChildrenCache(client, path, false);
pathCache.start();
//默认会有事件:PathChildrenCacheEvent{type=CONNECTION_RECONNECTED, data=null}
pathCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("事件类型是:" + event);
}
});
//创建节点,不触发
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(path);
//对节点赋值,不触发
client.setData().forPath(path, "childPath".getBytes());
String tempPath = path + "/temp";
//创建节点触发
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath(tempPath);
client.setData().forPath(tempPath, "666".getBytes());
ThreadUtil.sleep(5, TimeUnit.MINUTES);
pathCache.close();
client.close();
}
输出结果为:
事件类型是:PathChildrenCacheEvent{type=CONNECTION_RECONNECTED, data=null}
事件类型是:PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/hicode/club/childPath/temp', stat=468,469,1527855307449,1527855307451,1,0,0,72057686640754862,3,0,468
, data=null}}
第一个事件:CONNECTION_RECONNECTED
是属于必定触发的。
第二个事件:CHILD_ADDED
是增加子节点的时候触发的。
其他的比如在设置节点数据等的时候,事件未触发,符合我们了解到的资料。
注意代码中最开始监听的是: String path = "/hicode/club/childPath"; 那么后续的事件触发都是该节点下的。
TreeCache
属于上述2者的结合体,既监听了节点自身的变化,又监听了节点路径的变化。
@Test
public void treeCacheTest() throws Exception {
String path = "/hicode/club/tree";
CuratorFramework client = CuratorFrameworkFactory.builder().retryPolicy(new RetryNTimes(5, 2000))
.connectString(CONNECT_STR).build();
client.start();
TreeCache treeCache = new TreeCache(client, path);
treeCache.start();
//默认会有事件:PathChildrenCacheEvent{type=CONNECTION_RECONNECTED, data=null}
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("事件类型是:" + event);
}
});
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path + "/t1", "xxx".getBytes());
//client.setData().forPath(path, "childPath".getBytes());
ThreadUtil.sleep(1, TimeUnit.SECONDS);
System.out.println("==============");
String tempPath = path + "/t2";
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(tempPath, "xxx".getBytes());
client.setData().forPath(tempPath, "666".getBytes());
client.delete().forPath(tempPath);
ThreadUtil.sleep(5, TimeUnit.MINUTES);
treeCache.close();
client.close();
}
结果为:
事件类型是:TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/hicode/club/tree', stat=566,566,1527856851406,1527856851406,0,11,0,0,0,1,589
, data=[]}}
事件类型是:TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/hicode/club/tree/t1', stat=589,589,1527857005459,1527857005459,0,0,0,72057686640754891,3,0,589
, data=[120, 120, 120]}}
事件类型是:TreeCacheEvent{type=INITIALIZED, data=null}
==============
事件类型是:TreeCacheEvent{type=NODE_ADDED, data=ChildData{path='/hicode/club/tree/t2', stat=590,590,1527857006474,1527857006474,0,0,0,72057686640754891,3,0,590
, data=[120, 120, 120]}}
事件类型是:TreeCacheEvent{type=NODE_UPDATED, data=ChildData{path='/hicode/club/tree/t2', stat=590,591,1527857006474,1527857006483,1,0,0,72057686640754891,3,0,590
, data=[54, 54, 54]}}
事件类型是:TreeCacheEvent{type=NODE_REMOVED, data=ChildData{path='/hicode/club/tree/t2', stat=590,591,1527857006474,1527857006483,1,0,0,72057686640754891,3,0,590
, data=[54, 54, 54]}}
TreeCache
可以理解为NodeCache
+PathCache
的集合。
如上就是 Zookeeper
常用的用于节点监听的方式,over。