什么是zookeeper的会话过期?
一般来说,我们使用zookeeper是集群形式,如下图,client和zookeeper集群(3个实例)建立一个会话session。

在这个会话session当中,client其实是随机与其中一个zk provider建立的链接,并且互发心跳heartbeat。zk集群负责管理这个session,并且在所有的provider上维护这个session的信息,包括这个session中定义的临时数据和监视点watcher。
如果再网络不佳或者zk集群中某一台provider挂掉的情况下,有可能出现connection loss的情况,例如client和zk provider1连接断开,这时候client不需要任何的操作(zookeeper api已经给我们做好了),只需要等待client与其他provider重新连接即可。这个过程可能导致两个结果:
在session timeout之内连接成功
这个时候client成功切换到连接另一个provider例如是provider2,由于zk在所有的provider上同步了session相关的数据,此时可以认为无缝迁移了。
在session timeout之内没有重新连接
这就是session expire的情况,这时候zookeeper集群会任务会话已经结束,并清除和这个session有关的所有数据,包括临时节点和注册的监视点 Watcher。
在session超时之后,如果client重新连接上了zookeeper集群,很不幸,zookeeper会发出session expired异常,且不会重建session,也就是不会重建临时数据和watcher。
如何使用curator实现session expired异常的捕获和处理?
首先我们先创建一个链接
这里设置了重试策略retryPolicy和会话超时时间sessionTimeoutMs,并打开链接。
1 2 3 4 5 6 7 8 9 10 11
| public void init() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries); client = CuratorFrameworkFactory .builder() .connectString(zookeeperServer) .retryPolicy(retryPolicy) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs) .build(); client.start(); }
|
客户端注册
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void register() { try { String rootPath = "/merak"; String hostAddress = InetAddress.getLocalHost().getHostAddress(); String serviceInstance = "/sync" + "-" + hostAddress + "-"; String path = rootPath + serviceInstance; SessionConnectionListener sessionConnectionListener = new SessionConnectionListener(path, ""); client.getConnectionStateListenable().addListener(sessionConnectionListener); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (Exception e) { logger.error("注册出错", e); } }
|
这里我们创建了一个临时有序节点node,这个节点将会在session expired触发的时候被自动删除。当session又重新恢复的时候,client只会收到session expired异常和不会自动将临时节点添加到zookeeper中。
为了解决这个问题,我们增加了一个监听器,
client.getConnectionStateListenable().addListener(sessionConnectionListener)
这个监听器监听session expired事件,并且在事件发生的时候进行处理,监听器处理的流程如下。
注意:这个监听器注册是可以复用的,即如果多次session expired,不用重复注册监听器。
监听器sessionConnectionListener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.openmind.zookeeper;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class SessionConnectionListener implements ConnectionStateListener { private final Logger logger = LoggerFactory.getLogger(this.getClass());
private String path; private String data;
public SessionConnectionListener(String path, String data) { this.path = path; this.data = data; }
@Override public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState){ if(connectionState == ConnectionState.LOST){ logger.error("zk session超时"); while(true){ try { if(curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()){ curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes("UTF-8")); logger.info("重连zk成功"); break; } } catch (InterruptedException e) { break; } catch (Exception e){
} } } } }
|
这里的ConnectionState.LOST等同于session expired事件,对这个事件的处理是,在一个死循环中重试链接zk,知道链接成功才退出循环。
需要注意的是:一旦重新创建了会话,那么之前会话的所有观察点都会失效,需要重新初始化观察点。