熟悉的配方——dubbo注册原理
了解Zookeeper注册节点的之前,我们先看看我们熟悉的dubbo是如何利用Zookeeper做服务的注册和订阅的。

在dubbo的整体架构设计中,注册中心的节点结构示意图如图1。图中各个圆代表一个节点path。
- /dubbo:这是Dubbo在Zookeeper上创建的根节点。
- /dubbo/com.foo.barService:这是服务节点,代表了Dubbo的一个服务。
- /dubbo/com.foo.barService/providers:这是服务提供者的根节点,其子节点代表了每一个服务的真正提供者。
- /dubbo/com.foo.barService/consumers:这是服务消费者的根节点,其子节点代表了每一个服务的真正消费者。
- /dubbo/com.foo.barService/providers/192.168.6.148:这是服务真正的提供者。
服务提供者
服务提供者在服务初始化的时候,首先会在Zookeeper的/dubbo/com.foo.barService/providers
节点下创建一个子节点,并写入自己的URL地址,这就代表了com.foo.barService
的一个服务提供者。
服务消费者
服务消费者会在服务启动的时候,读取并订阅Zookeeper上/dubbo/com.foo.barService/providers
节点下所有的子节点,并解析出所有服务提供者的地址来作为服务地址列表,然后发起正常调用。
同时,服务消费者还会在Zookeeper的/dubbo/com.foo.barService/consumers
节点下创建一个临时节点,并写入自己的URL地址,这就代表com.foo.barService
的一个服务消费者。
监控中心
监控中心是Dubbo服务治理中的重要一部分,其需要知道一个服务的所有提供者和订阅者及其状态,因此,监控中心在启动的时候,会通过Zookeeper的/dubbo/com.foo.barService
读取所有的服务提供者服务消费者的URL地址,并注册 Watcher 来监听其子节点的变化。
所有提供者在Zookeeper上注册的节点都是临时节点,利用的是临时节点的生命周期和客户端会话的特性,因此,一旦提供者服务发生故障不能提供服务时,该临时节点自动会从Zookeeper中删除,这样服务消费者和监控中心都能感知到服务提供者的变化。
关键词检索服务关键词同步的设计
关键词的CRD会引起的关键词DAT(Doubble Array Trie)的重新构建,集群服务各节点都需要知晓此次操作,并载数据到内存,对词表做构建。而每次CRD的操作是反馈到其中某一个服务节点的,因此,必须采取广播的形式通知其他节点接收此次数据的变动,以保证各节点内存数据保持一致。
Zookeeper便搬到多节点关键词同步的架构中,充当消息广播。

通过Zookeeper实现多节点数据同步
Curator库是一个绝好的选择。它是Netflix开发的一套开源软件。Curator比Zookeeper官方原生的API更好更强大。
使用Curator创建Zookeeper客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| String zkHostList = "192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181"; String rootPath = "/merak/syn/dev";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework zkClient = CuratorFrameworkFactory.builder() .connectString(zkHostList) .retryPolicy(retryPolicy) .connectionTimeoutMs(2000) .sessionTimeoutMs(10000) .build();
zkClient.start();
try { if(null == zkClient.checkExists().forPath(rootPath)){ zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(rootPath); } } catch (Exception e) { logger.warn("创建根节点异常", e); }
|
zkHostList: 是zookeerper的连接地址和端口,集群模式中,多个地址逗号分隔即可。
rootPath:是创建的根节点。
retryPolicy: 重试策略,选择 ExponentialBackoffRetry 策略,1000 是重试间隔时间基数,3 是重试次数。
connectionTimeoutMs:连接超时,2000ms。
sessionTimeoutMs:会话超时,10000ms=10s,也就是10s中检测不到心跳,该节点则会自动删除。
CreateMode.PERSISTENT 表示注册的节点是永久的。
创建临时节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public void createLocalNode(long safepoint){ String path = null; try { path = rootPath + "/" + InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } try { Stat stat = zkClient.checkExists().forPath(path); if(null == stat) { zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(path, String.valueOf(safepoint).getBytes(StandardCharsets.UTF_8)); } else { zkClient.setData().withVersion(stat.getVersion()).forPath(path, String.valueOf(safepoint).getBytes(StandardCharsets.UTF_8)); }
} catch (Exception e) { e.printStackTrace(); } }
|
基于前面创建的ZK客户端,假如InetAddress.getLocalHost().getHostAddress()
获取的ip地址为192.168.0.100,我们在永久节点/merak/syn/dev
下增加临时节点/merak/syn/dev/192.168.0.100
。
safepoint设置为当前时间戳。我们通过zkCli.sh命令连接客户端查询:
1 2 3 4 5 6 7 8 9 10 11 12 13
| [zk: localhost:2181(CONNECTED) 79] get /merak/syn/dev/192.168.0.100 1652197253598 cZxid = 0x28 ctime = Tue May 10 23:40:57 CST 2022 mZxid = 0x28 mtime = Tue May 10 23:40:57 CST 2022 pZxid = 0x28 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x180ae8a1b8b000c dataLength = 13 numChildren = 0
|
从结果中我们看到,1652197253598 则为safepoint的值。当我们关闭java应用,再次执行上面的命令查看,发现/merak/syn/dev/192.168.0.100
节点已经不存在,但父节点/merak/syn/dev
存在,因为它是永久节点。
添加连接监听器
上面的代码,session expired的情况下,是无法自动重新在Zookeeper中注册的。要实现这种功能,需要添加一个实现了ConnectionStateListener接口的类,并应用到CuratorFramework对象上。我们需要在创建Zookeeper客户端代码中的“TODO”下面取消registerListeners(zkClient);
的注释:
1 2
| registerListeners(zkClient);
|
我们来细看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
|
private void registerListeners(CuratorFramework client){ client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { logger.info("CuratorFramework state changed: {}", newState); if (newState == ConnectionState.CONNECTED || newState == ConnectionState.RECONNECTED){ final NodeCache cache = new NodeCache(client, rootPath, false); NodeCacheListener listener = new NodeCacheListener() { @Override public void nodeChanged() throws Exception { String data = new String(cache.getCurrentData().getData(), StandardCharsets.UTF_8); logger.info("节点变更, data={}", data); } }; try { cache.getListenable().addListener(listener); cache.start(); } catch (Exception e) { logger.error("Start NodeCache error for path: {}, error info: {}", rootPath, e.getMessage()); } } if(connectionState == ConnectionState.LOST){ logger.error("zk session超时"); while(true){ try { if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){ logger.error("zk 已经重连"); break; } } catch (InterruptedException e) { break; } catch (Exception e){ logger.error("zk 重连出错", e.getMessage()); } } } } });
client.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { @Override public void unhandledError(String message, Throwable e) { logger.info("CuratorFramework unhandledError: {}", message); } }); }
|
本示例只做了简单的代码演示,实际使用中,需要根据业务做接口封装。