什么是zookeeper的会话过期?

一般来说,我们使用zookeeper是集群形式,如下图,client和zookeeper集群(3个实例)建立一个会话session。
client和zookeeper集群

在这个会话session当中,client其实是随机与其中一个zk provider建立的链接,并且互发心跳heartbeat。zk集群负责管理这个session,并且在所有的provider上维护这个session的信息,包括这个session中定义的临时数据和监视点watcher。

如果再网络不佳或者zk集群中某一台provider挂掉的情况下,有可能出现connection loss的情况,例如client和zk provider1连接断开,这时候client不需要任何的操作(zookeeper api已经给我们做好了),只需要等待client与其他provider重新连接即可。这个过程可能导致两个结果:

在session timeout之内连接成功

这个时候client成功切换到连接另一个provider例如是provider2,由于zk在所有的provider上同步了session相关的数据,此时可以认为无缝迁移了。

在session timeout之内没有重新连接

这就是session expire的情况,这时候zookeeper集群会任务会话已经结束,并清除和这个session有关的所有数据,包括临时节点和注册的监视点 Watcher。

在session超时之后,如果client重新连接上了zookeeper集群,很不幸,zookeeper会发出session expired异常,且不会重建session,也就是不会重建临时数据和watcher。

如何使用curator实现session expired异常的捕获和处理?

首先我们先创建一个链接

这里设置了重试策略retryPolicy和会话超时时间sessionTimeoutMs,并打开链接。

1
2
3
4
5
6
7
8
9
10
11
public void init() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
client = CuratorFrameworkFactory
.builder()
.connectString(zookeeperServer)
.retryPolicy(retryPolicy)
.sessionTimeoutMs(sessionTimeoutMs)
.connectionTimeoutMs(connectionTimeoutMs)
.build();
client.start();
}

客户端注册

1
2
3
4
5
6
7
8
9
10
11
12
13
public void register() {
try {
String rootPath = "/merak";
String hostAddress = InetAddress.getLocalHost().getHostAddress();
String serviceInstance = "/sync" + "-" + hostAddress + "-";
String path = rootPath + serviceInstance;
SessionConnectionListener sessionConnectionListener = new SessionConnectionListener(path, "");
client.getConnectionStateListenable().addListener(sessionConnectionListener);
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
logger.error("注册出错", e);
}
}

这里我们创建了一个临时有序节点node,这个节点将会在session expired触发的时候被自动删除。当session又重新恢复的时候,client只会收到session expired异常和不会自动将临时节点添加到zookeeper中。

为了解决这个问题,我们增加了一个监听器,

client.getConnectionStateListenable().addListener(sessionConnectionListener)这个监听器监听session expired事件,并且在事件发生的时候进行处理,监听器处理的流程如下。

注意:这个监听器注册是可以复用的,即如果多次session expired,不用重复注册监听器。

监听器sessionConnectionListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.openmind.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SessionConnectionListener implements ConnectionStateListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());

private String path;
private String data;

public SessionConnectionListener(String path, String data) {
this.path = path;
this.data = data;
}

@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState){
if(connectionState == ConnectionState.LOST){
logger.error("zk session超时");
while(true){
try {
if(curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()){
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes("UTF-8"));
logger.info("重连zk成功");
break;
}
} catch (InterruptedException e) {
break;
} catch (Exception e){

}
}
}
}
}

这里的ConnectionState.LOST等同于session expired事件,对这个事件的处理是,在一个死循环中重试链接zk,知道链接成功才退出循环。

需要注意的是:一旦重新创建了会话,那么之前会话的所有观察点都会失效,需要重新初始化观察点。