专业的编程技术博客社区

网站首页 > 博客文章 正文

Apollo 服务端与客户端原理概述(apollo服务器是干嘛的)

baijin 2024-08-22 09:31:36 博客文章 4 ℃ 0 评论

服务端

1、服务端实时推送设计

  • 用户在 Portal 中进行配置的编辑和发布。
  • Portal 会调用 Admin Service 提供的接口进行发布操作。
  • Admin Service 收到请求后,发送 ReleaseMessage 给各个 Config Service,通知 Config Service 配置发生变化。
  • Config Service 收到 ReleaseMessage 后,通知对应的客户端,基于 Http 长连接实现。

2. 发送 ReleaseMessage 的实现方式

ReleaseMessage 消息是通过 Mysql 实现了一个简单的消息队列。之所以没有采用消息中间件,是为了让 Apollo 在部署的时候尽量简单,尽可能减少外部依赖

  • Admin Service 在配置发布后会往 ReleaseMessage 表插入一条消息记录。
  • Config Service 会启动一个线程定时扫描 ReleaseMessage 表,来查看是否有新的消息记录。
  • Config Service 发现有新的消息记录,就会通知到所有的消息监听器。
  • 消息监听器得到配置发布的信息后,就会通知对应的客户端。

3. Config Service 通知客户端的实现方式

通知采用基于 Http 长连接实现,主要分为下面几个步骤:

  • 客户端会发起一个 Http 请求到 Config Service 的 notifications/v2 接口。
  • notifications/v2 接口通过 Spring DeferredResult 把请求挂起,不会立即返回。
  • 如果在 60s 内没有该客户端关心的配置发布,那么会返回 Http 状态码 304 给客户端。
  • 如果发现配置有修改,则会调用 DeferredResult 的 setResult 方法,传入有配置变化的 namespace 信息,同时该请求会立即返回。
  • 客户端从返回的结果中获取到配置变化的 namespace 后,会立即请求 Config Service 获取该 namespace 的最新配置。

客户端

1、设计原理

  • 客户端和服务端保持了一个长连接,编译配置的实时更新推送。
  • 定时拉取配置是客户端本地的一个定时任务,默认为每 5 分钟拉取一次,也可以通过在运行时指定 System Property:apollo.refreshInterval 来覆盖,单位是分钟,推送+定时拉取=双保险
  • 客户端从 Apollo 配置中心服务端获取到应用的最新配置后,会保存在内存中。
  • 客户端会把从服务端获取到的配置在本地文件系统缓存一份,当服务或者网络不可用时,可以使用本地的配置,也就是我们的本地开发模式 env=Local。

2、与 Spring 集成后通过 @Value 获取配置原理

Spring 从 3.1 版本开始增加了 ConfigurableEnvironment 和 PropertySource:

  • ConfigurableEnvironment 实现了 Environment 接口,并且包含了多个 PropertySource
  • PropertySource 可以理解为很多个 Key-Value 的属性配置,在运行时的结构形如下图所示。

需要注意的是,PropertySource 之间是有优先级顺序的,如果有一个 Key 在多个 property source 中都存在,那么位于前面的 property source 优先。
集成的原理就是在应用启动阶段,Apollo 从远端获取配置,然后组装成 PropertySource 并插入到第一个即可,如下图所示。

3、动态刷新配置的实现原理

Apollo Client 中定义了 SpringValueProcessor 类,其实现了 BeanPostProcessor 用于处理值修改。

public class SpringValueProcessor extends ApolloProcessor implements BeanFactoryPostProcessor, BeanFactoryAware {
  
  private PlaceholderHelper placeholderHelper = new PlaceholderHelper();
	private BeanFactory beanFactory;
  public SpringValueRegistry springValueRegistry = new SpringValueRegistry();

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Class clazz = bean.getClass();
        for (Field field : findAllField(clazz)) {
            processField(bean, beanName, field);
        }
        return bean;
    }

	/**
   * 核心处理
   */
    private void processField(Object bean, String beanName, Field field) {
        // register @Value on field
        Value value = field.getAnnotation(Value.class);
        if (value == null) {
            return;
        }
      
        Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
      
        if (keys.isEmpty()) {
            return;
        }
      
        for (String key : keys) {
            SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
      			springValueRegistry.register(beanFactory, key, springValue);
      			logger.debug("Monitoring {}", springValue);
        }
    }
  
}

通过实现 BeanPostProcessor 来处理每个 bean 中的值,然后将这个配置信息封装成一个 SpringValue 存储到 springValueRegistry 中

SpringValue 代码如下所示。

public class SpringValue {
    private MethodParameter methodParameter;
    private Field field;
    private Object bean;
    private String beanName;
    private String key;
    private String placeholder;
    private Class<?> targetType;
    private Type genericType;
    private boolean isJson;
}

SpringValueRegistry 就是利用 Map 来存储,代码如下所示。

public class SpringValueRegistry {
    private final Map<BeanFactory, Multimap<String, SpringValue>> registry = Maps.newConcurrentMap();
    private final Object LOCK = new Object();

    // 注册
    public void register(BeanFactory beanFactory, String key, SpringValue springValue) {
        if (!registry.containsKey(beanFactory)) {
            synchronized (LOCK) {
                if (!registry.containsKey(beanFactory)) {
                    registry.put(beanFactory, LinkedListMultimap.<String, SpringValue>create());
                }
            }
        }
        registry.get(beanFactory).put(key, springValue);
    }

    // 获取
    public Collection<SpringValue> get(BeanFactory beanFactory, String key) {
        Multimap<String, SpringValue> beanFactorySpringValues = registry.get(beanFactory);
        if (beanFactorySpringValues == null) {
            return null;
        }
        return beanFactorySpringValues.get(key);
    }
}

当 AutoUpdateConfigChangeListener 监听到变化就会调用 onChange 更新值

public class AutoUpdateConfigChangeListener implements ConfigChangeListener{
  
    private final SpringValueRegistry springValueRegistry;
  
  	@Override
    public void onChange(ConfigChangeEvent changeEvent) {
      Set<String> keys = changeEvent.changedKeys();
      if (CollectionUtils.isEmpty(keys)) {
        return;
      }
      for (String key : keys) {
        // 1. check whether the changed key is relevant
        Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
        if (targetValues == null || targetValues.isEmpty()) {
          continue;
        }

        // 2. update the value 更新值
        for (SpringValue val : targetValues) {
          updateSpringValue(val);
        }
      }
    }

}

最后

本文参考

http://c.biancheng.net/view/5480.html

http://c.biancheng.net/view/5482.html

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表