熟悉的配方——dubbo注册原理

了解Zookeeper注册节点的之前,我们先看看我们熟悉的dubbo是如何利用Zookeeper做服务的注册和订阅的。
图1 基于Zookeeper实现的dubbo注册中心节点结构示意图

在dubbo的整体架构设计中,注册中心的节点结构示意图如图1。图中各个圆代表一个节点path。

  • /dubbo:这是Dubbo在Zookeeper上创建的根节点。
  • /dubbo/com.foo.barService:这是服务节点,代表了Dubbo的一个服务。
  • /dubbo/com.foo.barService/providers:这是服务提供者的根节点,其子节点代表了每一个服务的真正提供者。
  • /dubbo/com.foo.barService/consumers:这是服务消费者的根节点,其子节点代表了每一个服务的真正消费者。
  • /dubbo/com.foo.barService/providers/192.168.6.148:这是服务真正的提供者。

服务提供者

服务提供者在服务初始化的时候,首先会在Zookeeper的/dubbo/com.foo.barService/providers节点下创建一个子节点,并写入自己的URL地址,这就代表了com.foo.barService的一个服务提供者。

服务消费者

服务消费者会在服务启动的时候,读取并订阅Zookeeper上/dubbo/com.foo.barService/providers节点下所有的子节点,并解析出所有服务提供者的地址来作为服务地址列表,然后发起正常调用。

同时,服务消费者还会在Zookeeper的/dubbo/com.foo.barService/consumers节点下创建一个临时节点,并写入自己的URL地址,这就代表com.foo.barService的一个服务消费者。

监控中心

监控中心是Dubbo服务治理中的重要一部分,其需要知道一个服务的所有提供者和订阅者及其状态,因此,监控中心在启动的时候,会通过Zookeeper的/dubbo/com.foo.barService读取所有的服务提供者服务消费者的URL地址,并注册 Watcher 来监听其子节点的变化。

所有提供者在Zookeeper上注册的节点都是临时节点,利用的是临时节点的生命周期和客户端会话的特性,因此,一旦提供者服务发生故障不能提供服务时,该临时节点自动会从Zookeeper中删除,这样服务消费者和监控中心都能感知到服务提供者的变化。

关键词检索服务关键词同步的设计

关键词的CRD会引起的关键词DAT(Doubble Array Trie)的重新构建,集群服务各节点都需要知晓此次操作,并载数据到内存,对词表做构建。而每次CRD的操作是反馈到其中某一个服务节点的,因此,必须采取广播的形式通知其他节点接收此次数据的变动,以保证各节点内存数据保持一致。

Zookeeper便搬到多节点关键词同步的架构中,充当消息广播。

图2 关键词同步服务逻辑

通过Zookeeper实现多节点数据同步

Curator库是一个绝好的选择。它是Netflix开发的一套开源软件。Curator比Zookeeper官方原生的API更好更强大。

使用Curator创建Zookeeper客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
String zkHostList = "192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181";
String rootPath = "/merak/syn/dev";

//1000 是重试间隔时间基数,3 是重试次数
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkHostList)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(2000)
.sessionTimeoutMs(10000)
.build();

// TODO
// registerListeners(zkClient);
zkClient.start();

try {
if(null == zkClient.checkExists().forPath(rootPath)){
// root节点不存在时创建临时root节点
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(rootPath);
}
} catch (Exception e) {
logger.warn("创建根节点异常", e);
}

zkHostList: 是zookeerper的连接地址和端口,集群模式中,多个地址逗号分隔即可。

rootPath:是创建的根节点。

retryPolicy: 重试策略,选择 ExponentialBackoffRetry 策略,1000 是重试间隔时间基数,3 是重试次数。

connectionTimeoutMs:连接超时,2000ms。

sessionTimeoutMs:会话超时,10000ms=10s,也就是10s中检测不到心跳,该节点则会自动删除。

CreateMode.PERSISTENT 表示注册的节点是永久的。

创建临时节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 构建dat完成后调用此函数
* @param safepoint
*/
public void createLocalNode(long safepoint){
String path = null;
try {
path = rootPath + "/" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
try {
Stat stat = zkClient.checkExists().forPath(path);
if(null == stat) {
zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, String.valueOf(safepoint).getBytes(StandardCharsets.UTF_8));
} else {
zkClient.setData().withVersion(stat.getVersion()).forPath(path, String.valueOf(safepoint).getBytes(StandardCharsets.UTF_8));
}

} catch (Exception e) {
e.printStackTrace();
}
}

基于前面创建的ZK客户端,假如InetAddress.getLocalHost().getHostAddress()获取的ip地址为192.168.0.100,我们在永久节点/merak/syn/dev下增加临时节点/merak/syn/dev/192.168.0.100

safepoint设置为当前时间戳。我们通过zkCli.sh命令连接客户端查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 79] get /merak/syn/dev/192.168.0.100
1652197253598
cZxid = 0x28
ctime = Tue May 10 23:40:57 CST 2022
mZxid = 0x28
mtime = Tue May 10 23:40:57 CST 2022
pZxid = 0x28
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x180ae8a1b8b000c
dataLength = 13
numChildren = 0

从结果中我们看到,1652197253598 则为safepoint的值。当我们关闭java应用,再次执行上面的命令查看,发现/merak/syn/dev/192.168.0.100节点已经不存在,但父节点/merak/syn/dev存在,因为它是永久节点。

添加连接监听器

上面的代码,session expired的情况下,是无法自动重新在Zookeeper中注册的。要实现这种功能,需要添加一个实现了ConnectionStateListener接口的类,并应用到CuratorFramework对象上。我们需要在创建Zookeeper客户端代码中的“TODO”下面取消registerListeners(zkClient);的注释:

1
2
// TODO
registerListeners(zkClient);

我们来细看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 注册需要监听的监听者对像.
*/
private void registerListeners(CuratorFramework client){
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("CuratorFramework state changed: {}", newState);
if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED){
//使用Curator的NodeCache来做ZNode的监听,不用我们自己实现重复监听
final NodeCache cache = new NodeCache(client, rootPath, false);
NodeCacheListener listener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
String data = new String(cache.getCurrentData().getData(), StandardCharsets.UTF_8);
logger.info("节点变更, data={}", data);
//synManager.syn(data, rootPath); //处理业务逻辑
}
};
try {
cache.getListenable().addListener(listener);
cache.start();
} catch (Exception e) {
logger.error("Start NodeCache error for path: {}, error info: {}", rootPath, e.getMessage());
}
}
// session过期或者网络中断,自我修复
if(connectionState == ConnectionState.LOST){
logger.error("zk session超时");
while(true){
try {
if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){
logger.error("zk 已经重连");
break;
}
} catch (InterruptedException e) {
break;
} catch (Exception e){
logger.error("zk 重连出错", e.getMessage());
}
}
}
}
});

client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
@Override
public void unhandledError(String message, Throwable e) {
logger.info("CuratorFramework unhandledError: {}", message);
}
});
}

本示例只做了简单的代码演示,实际使用中,需要根据业务做接口封装。