网站首页 > 博客文章 正文
本章内容
阅读说明:本文主要剖析Nacos服务注册的关键源码,为了讲清楚源码中的关键步骤与实现细节,会从三个维度进行解析:
- 时序图:快速了解服务注册的交互链路。
- 方法说明:快速了解主要方法的作用。
- 源码解析:解析服务注册实现细节(关键代码均有注释说明)。
由于本文篇幅较长,各位看官可以通过时序图+方法说明的方式快速了解Nacos服务注册的实现步骤以及设计思想,源码当作补充说明。
版本说明:服务注册源码分析基于Nacos-1.4.3版本。
服务注册客户端入口
Nacos客户端依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
基于Springboot自动装备原理,服务启动时会自动加载Nacos客户端相关的配置类。
如图所示:
其中,与服务注册相关的配置类为:NacosServiceRegistryAutoConfiguration。
如图所示:
NacosServiceRegistryAutoConfiguration是Nacos服务注册的自动配置类,该类的主要功能是按条件注入NacosAutoServiceRegistration、NacosServiceRegistry、NacosRegistration等服务注册相关的Bean。
客户端发送服务注册请求
服务注册客户端交互逻辑,如图所示:
NacosAutoServiceRegistration
NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration,该类实现了ApplicationListener接口的onApplicationEvent()方法。
服务启动,Spring初始化完成后,发布对应的初始化完成事件(WebServerInitializedEvent),触发AbstractAutoServiceRegistration.onApplicationEvent()方法调用逻辑,开启注册流程。
源码如下:
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
...
// 通过CAS方式设置服务端口
this.port.compareAndSet(0, event.getWebServer().getPort());
// 开启服务注册流程
this.start();
}
start()方法主要做以下事情:
- 发布服务注册相关事件。
- 执行服务注册。
- 发布服务注册完成事件。
- 更新服务运行状态。
源码如下:
public void start() {
...
// 服务启动时,服务运行状态为false
if (!this.running.get()) {
// 发布服务预注册事件
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration()));
// 执行服务注册
register();
if (shouldRegisterManagement()) {
registerManagement();
}
// 发布服务注册完成事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration()));
// 更新服务运行状态为true
this.running.compareAndSet(false, true);
}
}
register()方法主要做以下事情:
- 获取服务注册基本信息(如:服务名、服务IP、端口等)。
- 调用NacosServiceRegistry#register方法执行服务注册。
源码如下:
protected void register() {
this.serviceRegistry.register(getRegistration());
}
其中,getRegistration()主要作用是从NacosRegistration中获取Nacos服务注册基本信息。
NacosRegistration
NacosRegistration主要用于管理Nacos服务注册基本信息,该类主要封装了NacosDiscoveryProperties的配置信息(application.yml),同时,该类实现了Registration接口(继承自ServiceInstance),Registration是Springcloud提供的服务注册信息通用接口,用于管理服务注册的基本信息(如:服务ID、服务IP、端口等)。
NacosServiceRegistry
NacosServiceRegistry类实现了ServiceRegistry接口,该接口是Springcloud提供的服务注册通用接口,声明了服务注册、取消注册等方法。
NacosServiceRegistry#register方法主要做以下事情:
- 创建服务实例封装服务注册信息。
- 调用NamingService#registerInstance方法发起Nacos服务注册。
源码如下:
// com.alibaba.cloud.nacos.registry.NacosServiceRegistry.register
@Override
public void register(Registration registration) {
// ...
// 获取Nacos命名服务
NamingService namingService = namingService();
// 获取服务名,默认为应用名
String serviceId = registration.getServiceId();
// 获取服务分组名,默认为DEFAULT_GROUP
String group = nacosDiscoveryProperties.getGroup();
// 创建服务实例,封装服务注册信息(如:服务IP、端口、权重、服务节点类型等)
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 调用NamingService.registerInstance()方法进行服务注册
namingService.registerInstance(serviceId, group, instance);
// ...
}catch (Exception e) {
// ...
}
}
NamingService
NamingService接口定义了服务注册、服务发现等服务注册相关的核心方法,它的默认实现为NacosNamingService。
NacosNamingService#registerInstance方法主要做以下事情:
- 构建心跳信息,并向Nacos服务端定时(5秒/次)发送心跳请求(临时实例走的是Nacos的AP模式,Nacos服务端需要检测服务的可用性)。
- 调用NamingProxy#registerService方法向Nacos服务端注册服务实例。
源码如下:
// com.alibaba.nacos.client.naming.NacosNamingService#registerInstance
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
NamingUtils.checkInstanceIsLegal(instance);
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
// 判断是否为临时实例
if (instance.isEphemeral()) {
// 构建服务心跳信息
BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
// 构建心跳信息,并向Nacos服务端定时(5秒/次)发送心跳请求
beatReactor.addBeatInfo(groupedServiceName, beatInfo);
}
// 向Nacos服务端注册服务实例
serverProxy.registerService(groupedServiceName, groupName, instance);
}
NamingProxy#registerService方法主要做以下事情:
- 组装请求参数。
- 调用服务注册接口(/v1/ns/instance)向Nacos服务端注册服务实例。
源码如下:
// com.alibaba.nacos.client.naming.net.NamingProxy.registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
// ...
// 组装请求参数
final Map<String, String> params = new HashMap<String, String>(16);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
// 调用Nacos服务端服务注册接口(/v1/ns/instance)注册服务实例。
reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
}
public String reqApi(String api, Map<String, String> params, String method) throws NacosException {
return reqApi(api, params, Collections.EMPTY_MAP, method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)
throws NacosException {
return reqApi(api, params, body, getServerList(), method);
}
public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
String method) throws NacosException {
// ...
if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < maxRetry; i++) {
try {
// 调用服务注册接口(/v1/ns/instance)向Nacos服务端注册服务实例
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
// ...
}
}
} else {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index);
try {
// 调用服务注册接口(/v1/ns/instance)向Nacos服务端注册服务实例
return callServer(api, params, body, server, method);
} catch (NacosException e) {
// ...
}
index = (index + 1) % servers.size();
}
}
// ...
}
服务端服务注册入口
Nacos服务端启动类为nacos-console模块中的com.alibaba.nacos.Nacos,在nacos-console模块中引入了nacos-naming模块,该模块中的com.alibaba.nacos.naming.controllers包中提供了服务注册、服务发现、健康检查等核心接口。如图所示:
服务注册与同步
获取服务最新服务实例列表交互逻辑,如图所示:
InstanceController
Nacos服务端接收客户端发送的服务注册请求后,调用InstanceController#register方法处理服务注册请求。
InstanceController#register方法主要做以下事情:
- 根据请求参数获取namespaceId,默认为:public。
- 根据请求参数获取serviceName,格式:groupName@@serviceName。
- 根据请求参数构建服务实例。
- 调用ServiceManage#registerInstance方法注册服务实例。
源码如下:
// com.alibaba.nacos.naming.controllers.InstanceController.register
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
// 根据客户端请求参数获取namespaceId,默认为:public
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
// 根据客户端请求参数获取serviceName,格式:groupName@@serviceName
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
// 根据客户端请求参数构建服务实例
final Instance instance = parseInstance(request);
// 调用ServiceManage#registerInstance方法注册服务实例
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
ServiceManage
ServiceManage是Nacos服务的核心管理类,其中定义了服务注册表以及服务相关的各种核心API。如图所示:
ServiceManager#registerInstance方法主要做以下事情:
- 创建并初始化服务,将其添加到注册表中(第一次注册服务)。
- 根据namespaceId,、serviceName获取对应的服务。
- 向服务中添加要注册的服务实例。
源码如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// 创建并初始化服务,将其添加到注册表中(第一次注册服务)
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
// 根据namespaceId,、serviceName获取对应的服务
Service service = getService(namespaceId, serviceName);
// ...
// 向服务中添加要注册的服务实例
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
ServiceManager#createEmptyService方法主要做以下事情:
- 判断服务在注册表中是否存在:
- 存在,则不做处理。
- 不存在,则创建服务:
- 创建服务并将服务添加到注册表中。
- 开启服务心跳检测。
- 监听服务实例变更。
- 如果不是创建本地服务,则根据服务实例类型(临时、持久)选择不同协议进行服务注册与同步。
源码如下:
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
createServiceIfAbsent(namespaceId, serviceName, local, null);
}
// com.alibaba.nacos.naming.core.ServiceManager#createServiceIfAbsent
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
// 根据namespaceId、serviceName从注册表中获取对应的服务
Service service = getService(namespaceId, serviceName);
// 如果服务不存在,则创建并初始化服务
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
// 创建服务
service = new Service();
// 设置服务名
service.setName(serviceName);
// 设置namespaceId
service.setNamespaceId(namespaceId);
// 设置分组名
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
// 根据服务实例列表生成MD5校验值(如:3c84b68316cb8449ca2842a990e2dd7d)
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
// 校验服务名和服务集群名格式
service.validate();
// 该方法主要做一下事情:
// 1.将服务添加到注册表中
// 2.初始化服务并启动心跳检测(5秒/次)
// 3.监听服务实例变更
putServiceAndInit(service);
// 不是创建本地服务
if (!local) {
// 根据服务实例类型(临时、持久)选择不同协议进行服务注册与同步:
// 临时实例:基于Distro协议将服务信息同步给集群中的其他Nacos节点(即:AP模式)
// 持久实例:基于Raft协议将服务信息同步给集群中的其他Nacos节点(即:CP模式)
addOrReplaceService(service);
}
}
}
ServiceManager#addInstance方法主要做以下事情:
- 根据namespaceId、serviceName、实例类型(临时、持久)生成服务唯一标识。
- 根据namespaceId,、serviceName获取对应的服务。
- 对服务对象添加同步锁:
- 获取服务最新的服务实例列表(旧实例列表+待注册实例列表)。
- 根据服务实例类型(临时、持久)选择不同协议进行服务注册与同步:
- 临时实例:基于Distro协议将服务信息同步给集群中的其他Nacos节点(即:AP模式)。
- 持久实例:基于Raft协议将服务信息同步给集群中的其他Nacos节点(即:CP模式)。
源码如下:
// com.alibaba.nacos.naming.core.ServiceManager#addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据namespaceId、serviceName、实例类型(临时、持久)构建服务唯一标识:
// 临时实例:com.alibaba.nacos.naming.iplist.ephemeral.namespaceId##groupName@@serviceName
// 持久实例:com.alibaba.nacos.naming.iplist.namespaceId##groupName@@serviceName
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
// 根据namespaceId,、serviceName获取对应的服务
Service service = getService(namespaceId, serviceName);
// 对服务对象添加同步锁
synchronized (service) {
// 通过对比待注册实例列表与旧实例列表,获取最新的实例列表(旧实例列表+待注册实例列表)。
// 注意:此处会将当前服务实例列表拷贝到一个新的服务实例列表中,基于CopyOnWrite思想
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
// 将新的服务实例列表封装为Instances
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// 根据服务实例类型(临时、持久)选择不同协议进行服务注册与同步:
// 临时实例:基于Distro协议将服务信息同步给集群中的其他Nacos节点(即:AP模式)
// 持久实例:基于Raft协议将服务信息同步给集群中的其他Nacos节点(即:CP模式)
consistencyService.put(key, instances);
}
}
ServiceManager#addIpAddresses->updateIpAddresses方法主要做以下事情:
- 根据服务信息尝试从数据集(DataStore)中获取旧的服务实例列表。
- 对比待处理的服务实例列表和旧的服务实例列表:
- 如果是删除操作,则将服务实例从最新的服务实例列表中移除。
- 如果是新增或更新操作,则将服务实例加入或更新到最新的服务实例列表中。
源码如下:
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
throws NacosException {
// 根据服务唯一标识获取当前数据集(dataStore)中旧的服务实例列表,返回值是Datum(key:服务唯一标识,value:服务实例列表)
Datum datum = consistencyService
.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
// 根据实例类型(临时、持久)获取现有服务实例列表
List<Instance> currentIPs = service.allIPs(ephemeral);
// 创建Map来保存现有服务实例列表,key:服务实例IP+Port,value:服务实例
Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
// 创建Set来保存现有服务实例的instanceId(实例唯一标识)
Set<String> currentInstanceIds = Sets.newHashSet();
// 遍历现有服务实例列表
for (Instance instance : currentIPs) {
// 将现有服务实例保存到新创建的Map中
currentInstances.put(instance.toIpAddr(), instance);
// 将现有服务实例的instanceId保存到新创建的Set中
currentInstanceIds.add(instance.getInstanceId());
}
// 定义新的Map来保存更新后的服务实例列表
Map<String, Instance> instanceMap;
if (datum != null && null != datum.value) {
// 如果服务存在旧的实例列表,则更新旧实例的健康状态和心跳时间并将其添加到新的Map中
instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
} else {
// 如果服务不存在旧的实例列表,则创建新Map
instanceMap = new HashMap<>(ips.length);
}
// 遍历待处理服务实例列表
for (Instance instance : ips) {
// 如果待处理服务对应的集群信息不存在,则创建并初始化服务对应的集群信息
if (!service.getClusterMap().containsKey(instance.getClusterName())) {
Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
// ...
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
// 如果是删除操作,则将待处理服务实例从Map中移除
instanceMap.remove(instance.getDatumKey());
} else {
// 根据待处理服务信息组成服务唯一标识从旧的服务实例列表中查找
Instance oldInstance = instanceMap.get(instance.getDatumKey());
// 如果是旧的服务实例,则更新服务实例信息
if (oldInstance != null) {
instance.setInstanceId(oldInstance.getInstanceId());
} else {
// 如果是新的服务实例,则新增服务实例信息(即:生成新的instanceId)
instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
}
// 将新的服务实例加入Map中
instanceMap.put(instance.getDatumKey(), instance);
}
}
// ...
// 将最新服务实例列表对应的Map转为对应的List,并返回
return new CopyOnWriteArrayList<>(instanceMap.values());
}
ConsistencyService#put方法为服务注册与同步核心逻辑,该方法主要做以下事情:
根据服务唯一标识前缀判断服务实例类型(临时、持久):
- com.alibaba.nacos.naming.iplist.ephemeral:临时实例,则基于Distro协议将服务信息同步给集群中的其他Nacos节点(即:AP模式)。
- com.alibaba.nacos.naming.iplist:持久实例,则基于Raft协议将服务信息同步给集群中的其他Nacos节点(即:CP模式)。
- 以临时实例为例,处理步骤:
- 将服务对应的实例列表信息封装成Datum加入到DataStore(作用是根据服务唯一标识获取对应的服务实例列表)中。
- 将服务唯一标识、操作类型封装成一个Task任务加入到DistroConsistencyServiceImpl.Notifier的任务队列中。
- 遍历集群中除自己以外的其他Nacos节点:
- 先将服务信息(资源)、资源类型(如:服务信息同步、元数据同步)以及目标节点封装成DistroKey。
- 再将DistroKey、操作类型(如:新增、删除实例)封装成延时任务DelayTask(延时时长:1秒)添加到Nacos延时任务执行器的任务集合中。
源码如下:
// com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {
// 根据服务唯一标识前缀判断服务实例类型(临时、持久)
mapConsistencyService(key).put(key, value);
}
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
@Override
public void put(String key, Record value) throws NacosException {
// 1.将服务对应的实例列表信息封装成Datum加入到DataStore(作用是根据服务唯一标识获取对应的服务实例列表)中。
// 2.将服务唯一标识、操作类型封装成一个Task任务加入到DistroConsistencyServiceImpl.Notifier的任务队列中。
onPut(key, value);
// 遍历集群中除自己以外的其他Nacos节点:
// 先将服务信息(资源)、资源类型(如:服务信息同步、元数据同步)以及目标节点封装成DistroKey。
// 再将DistroKey、操作类型(如:新增、删除实例)封装成延时任务DelayTask(延时时长:1秒)添加到Nacos延时任务执行器的任务集合中。
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
public void onPut(String key, Record value) {
// 判断是否为临时实例
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
// 将服务信息封装成Datum(key:服务唯一标识,value:服务实例列表)
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
// 将Datum添加到DataStore中
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
// 向Notifier的任务队列中添加任务
notifier.addTask(key, DataOperation.CHANGE);
}
public void addTask(String datumKey, DataOperation action) {
// 如果服务集合中存在服务并且服务操作类型为更新,则直接返回
if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
return;
}
// 如果服务操作类型为更新,则将服务添加到服务集合中
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
// 向Notifier的任务队列中添加任务
tasks.offer(Pair.with(datumKey, action));
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
// 遍历集群中除自己以外的其他Nacos节点
for (Member each : memberManager.allMembersWithoutSelf()) {
// 构建DistroKey,其中:
// resourceKey为服务唯一标识
// resourceType为com.alibaba.nacos.naming.iplist.(表示同步服务信息)
// targetServer为Nacos节点
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
each.getAddress());
// 根据创建的DistroKey和操作类型构建Distro延时任务(延时时长:1秒)
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
// 向延时任务执行器(NacosDelayTaskExecuteEngine)中添加延时任务
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
}
}
}
服务注册
服务注册交互逻辑,如图所示:
DistroConsistencyServiceImpl初始化时会开启一个线程池DISTRO_NOTIFY_EXECUTOR,其中只有一个线程(即:DistroConsistencyServiceImpl.Notifier),主要作用是通过死循环的方式从任务队列(Notifier.tasks)中取出任务进行处理(即:将服务实例列表更新到注册表中)。
源码如下:
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#init
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
// com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
// ...
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
try {
// 从任务队列中取出任务
Pair<String, DataOperation> pair = tasks.take();
// 任务处理
handle(pair);
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
//...
}
DistroConsistencyServiceImpl.Notifier#handle方法主要作用是遍历任务中的服务信息:
- 如果是更新操作,则更新服务对应的实例列表。
- 如果是删除操作,则从服务对应的实例列表中删除实例。
源码如下:
private void handle(Pair<String, DataOperation> pair) {
try {
// ...
// 遍历任务中服务信息(Service继承来RecordListener)
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
// 更新操作
if (action == DataOperation.CHANGE) {
// 更新服务对应的实例列表
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
// 删除操作
if (action == DataOperation.DELETE) {
// 从服务对应的实例列表中删除实例
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
// ...
}
}
// ...
} catch (Throwable e) {
// ...
}
}
更新操作
更新操作由Service#onChange方法进行处理,该方法主要做以下事情:
- 更新服务对应的实例列表信息。
- 更新服务MD5校验码。
源码如下:
public void onChange(String key, Instances value) throws Exception {
// ...
// 更新服务对应的实例列表信息
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
// 更新服务MD5校验码
recalculateChecksum();
}
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
// 创建Map:用于保存服务对应的Cluster信息
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
// 遍历服务实例列表
for (Instance instance : instances) {
try {
if (instance == null) {
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
// 如果服务实例对应的集群信息不存在,则创建并初始化集群信息
if (!clusterMap.containsKey(instance.getClusterName())) {
// ... 日志信息
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
// 如果服务对应的集群实例列表不存在,则创建集群实例列表
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
// 将服务实例添加到服务集群实例列表中
clusterIPs.add(instance);
} catch (Exception e) {
// ... 日志信息
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
// 更新集群实例列表
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
}
// 设置服务最后更新时间
setLastModifiedMillis(System.currentTimeMillis());
// 发布服务信息变更事件(即:将服务变更通知订阅该服务的客户端)
getPushService().serviceChanged(this);
// ...
}
// com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
// 根据实例类型(临时、持久)获取旧的服务实例列表
Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
// 将旧的服务实例列表转储到新创建Map中(key:服务唯一标识,value:服务实例)
HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
for (Instance ip : toUpdateInstances) {
oldIpMap.put(ip.getDatumKey(), ip);
}
// 对比待处理服务实例列表和旧服务实例列表,获取需要更新的服务实例列表
List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values());
if (updatedIPs.size() > 0) {
for (Instance ip : updatedIPs) {
Instance oldIP = oldIpMap.get(ip.getDatumKey());
if (!ip.isMarked()) {
ip.setHealthy(oldIP.isHealthy());
}
// ...
}
}
// 获取待处理服务实例列表中不存在于旧服务实例列表中的实例(即:新增实例),更新该实例的健康状态为健康
List<Instance> newIPs = subtract(ips, oldIpMap.values());
if (newIPs.size() > 0) {
// ... 日志信息
for (Instance ip : newIPs) {
// 更新服务实例健康状态为健康
HealthCheckStatus.reset(ip);
}
}
// 获取旧服务实例列表中不存在于待处理实例列表中的实例(即:移除实例),更新该实例的健康状态为死亡
List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
if (deadIPs.size() > 0) {
// ... 日志信息
for (Instance ip : deadIPs) {
// 更新服务实例健康状态为死亡
HealthCheckStatus.remv(ip);
}
}
toUpdateInstances = new HashSet<>(ips);
// 用最新的服务实例列表覆盖旧的服务实例列表(CopyOnWrite思想)
if (ephemeral) {
ephemeralInstances = toUpdateInstances;
} else {
persistentInstances = toUpdateInstances;
}
}
/**
* 获取需要更新的服务实例列表
* @param newInstance 待处理服务实例列表
* @param oldInstance 旧服务实例列表
* @return
*/
private List<Instance> updatedIps(Collection<Instance> newInstance, Collection<Instance> oldInstance) {
// 获得待处理服务实例列表和旧服务实例列表的交集
List<Instance> intersects = (List<Instance>) CollectionUtils.intersection(newInstance, oldInstance);
// 将交集中的实例存储到stringIpAddressMap中,key:ip+port,value:instance
Map<String, Instance> stringIpAddressMap = new ConcurrentHashMap<>(intersects.size());
for (Instance instance : intersects) {
stringIpAddressMap.put(instance.getIp() + ":" + instance.getPort(), instance);
}
// 定义交集Map,key:服务实例,value:标识
// 1-表示实例只存在于待处理服务实例列表或者旧服务实例列表中
// 2-表示实例同时存在于待处理服务实例列表和旧服务实例列表中
Map<String, Integer> intersectMap = new ConcurrentHashMap<>(newInstance.size() + oldInstance.size());
// 定义需要更新的服务实例Map
Map<String, Instance> updatedInstancesMap = new ConcurrentHashMap<>(newInstance.size());
// 定义待处理的服务实例Map
Map<String, Instance> newInstancesMap = new ConcurrentHashMap<>(newInstance.size());
// 将旧服务实例列表中实例加入交集Map中,并设置实例的标识为1
for (Instance instance : oldInstance) {
if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) {
intersectMap.put(instance.toString(), 1);
}
}
// 将待处理服务实例列表中实例加入交集Map中,如果实例在交集Map中已经存在,则设置为2(即:替换),否则设置为1(即:新增)
for (Instance instance : newInstance) {
if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) {
if (intersectMap.containsKey(instance.toString())) {
intersectMap.put(instance.toString(), 2);
} else {
intersectMap.put(instance.toString(), 1);
}
}
// 将待处理实例加入newInstancesMap中(这一步很重要,后续用于新实例覆盖旧实例)
newInstancesMap.put(instance.toString(), instance);
}
// 遍历交集Map
for (Map.Entry<String, Integer> entry : intersectMap.entrySet()) {
String key = entry.getKey();
Integer value = entry.getValue();
// 将交集Map中所有标识为1(注意:标识为1的实例包含待处理实例和旧实例)且存在于待处理实例列表中的实例加入最新的服务实例列表中(即:需要更新的旧实例+新实例)
if (value == 1) {
if (newInstancesMap.containsKey(key)) {
updatedInstancesMap.put(key, newInstancesMap.get(key));
}
}
}
return new ArrayList<>(updatedInstancesMap.values());
}
删除操作
更新操作由Service#onDelete方法进行处理,该方法主要做以下事情:
- 更新服务对应的实例列表信息(即:从服务实例列表中移除实例)。
源码如下:
@Override
public void onDelete(String key) throws Exception {
boolean isEphemeral = KeyBuilder.matchEphemeralInstanceListKey(key);
for (Cluster each : clusterMap.values()) {
each.updateIps(Collections.emptyList(), isEphemeral);
}
}
// 其中each.updateIps方法就是更新操作源码中的updateIps方法。
服务信息同步
在前面分析ConsistencyService#put方法时,会将服务信息封装成延时任务DelayTask(延时时长:1秒)添加到Nacos延时任务执行器(NacosDelayTaskExecuteEngine)的任务集合中。
NacosDelayTaskExecuteEngine在创建时会创建一个定时任务去执行NacosDelayTaskExecuteEngine#processTasks方法。
源码如下:
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
// 创建一个单线程的线程池
processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
// 创建定时任务处理延时任务(延时时长:100ms,时间间隔:100ms)
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}
// com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine.ProcessRunnable
// 延时任务处理线程
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
// 处理任务,如:Nacos集群中节点之间的数据同步
processTasks();
} catch (Throwable e) {
getEngineLog().error(e.toString(), e);
}
}
}
NacosDelayTaskExecuteEngine#processTasks方法主要做以下事情:
- 根据DistroKey获取对应的延时任务。
- 根据taskKey获取对应的任务处理器。
- 执行任务处理器的processor()方法对任务进行处理。
- 任务处理失败,则将任务重新加入到任务集合中进行重试。
源码如下:
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
// 根据taskKey(DistroKey)获取对应的延时任务
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
// 根据taskKey获取对应的任务处理器,默认为:TaskExecuteWorker
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
// 任务处理失败,则将任务重新加入到任务集合中进行重试。
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
private void retryFailedTask(Object key, AbstractDelayTask task) {
task.setLastProcessTime(System.currentTimeMillis());
addTask(key, task);
}
【阅读推荐】
更多精彩内容(如:Redis、数据结构与算法、Kafka等)请移步【南秋同学】个人主页进行查阅。
【作者简介】
一枚热爱技术和生活的老贝比,专注于Java领域,关注【南秋同学】带你一起学习成长~
- 上一篇: 「Nacos技术专题」服务注册与发现相关的原理分析
- 下一篇: 一文深入理解AP架构Nacos注册原理
猜你喜欢
- 2024-10-01 微服务学习笔记(微服务怎么学)
- 2024-10-01 干货:SpringBoot集成Nacos,填坑篇
- 2024-10-01 记一次把Nacos做成服务并开机启动
- 2024-10-01 Nacos 配置中心与注册中心(nacos配置中心连接超时)
- 2024-10-01 小白入门必知必会-Nacos单机安装(nacos入门教程)
- 2024-10-01 windows系统 安装nacos服务注册与发现中心
- 2024-10-01 网络环境问题导致的nacos集群故障
- 2024-10-01 分布式服务限流降级熔断解决方案Nacos之Dashboard界面配置含义
- 2024-10-01 Nacos你真的理解了吗(nacos百科)
- 2024-10-01 java微服务环境配置——注册中心 配置中心Nacos
你 发表评论:
欢迎- 367℃用AI Agent治理微服务的复杂性问题|QCon
- 358℃初次使用IntelliJ IDEA新建Maven项目
- 357℃手把手教程「JavaWeb」优雅的SpringMvc+Mybatis整合之路
- 351℃Maven技术方案最全手册(mavena)
- 348℃安利Touch Bar 专属应用,让闲置的Touch Bar活跃起来!
- 346℃InfoQ 2024 年趋势报告:架构篇(infoq+2024+年趋势报告:架构篇分析)
- 345℃IntelliJ IDEA 2018版本和2022版本创建 Maven 项目对比
- 342℃从头搭建 IntelliJ IDEA 环境(intellij idea建包)
- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)