专业的编程技术博客社区

网站首页 > 博客文章 正文

Dubbo3.0动态配置中心源码分析(dubbo配置文件详解)

baijin 2024-08-22 09:32:22 博客文章 4 ℃ 0 评论

一、配置中心在 Dubbo 中承担的职责

  1. 外部化配置:启动配置的集中式存储 (简单理解为 dubbo.properties 的外部化存储)。
  2. 服务治理:服务治理规则的存储与通知。
  3. 动态配置:控制动态开关或者动态变更属性值

二、Zookeeper配置中心

1、默认节点结构:

  • namespace,用于不同配置的环境隔离。
  • config,Dubbo 约定的固定节点,不可更改,所有配置和服务治理规则都存储在此节点下。
  • dubbo,所有服务治理规则都是全局性的,dubbo 为默认节点
  • configurators/tag-router/condition-router/migration,不同的服务治理规则类型,node value 存储具体的规则内容

2、类体系

AbstractDynamicConfiguration类workersThreadPool线程池在ZK配置中心里面没有使用到;

3、源码分析

public class ZookeeperDynamicConfiguration extends TreePathDynamicConfiguration {

    private Executor executor;
    private ZookeeperClient zkClient;

    private CacheListener cacheListener;
    private static final int DEFAULT_ZK_EXECUTOR_THREADS_NUM = 1;
    private static final int DEFAULT_QUEUE = 10000;
    private static final Long THREAD_KEEP_ALIVE_TIME = 0L;

    ZookeeperDynamicConfiguration(URL url, ZookeeperTransporter zookeeperTransporter) {
        /** url为zk的连接地址 */
        super(url);

        /** 监听器缓存组件 */
        this.cacheListener = new CacheListener(rootPath);

        /** 获取线程名称*/
        final String threadName = this.getClass().getSimpleName();
        /** 构建线程池,核心线程数和最大线程数:1,空闲被回收的时间:0 ,队列大小:10000 */
        this.executor = new ThreadPoolExecutor(DEFAULT_ZK_EXECUTOR_THREADS_NUM, DEFAULT_ZK_EXECUTOR_THREADS_NUM,
            THREAD_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(DEFAULT_QUEUE),
            new NamedThreadFactory(threadName, true),
            new AbortPolicyWithReport(threadName, url));

        /** 通过zookeeperTransporter的connect根据url与zk建立连接,拿到了一个ZookeeperClient */
        zkClient = zookeeperTransporter.connect(url);
        boolean isConnected = zkClient.isConnected();
        if (!isConnected) {
            throw new IllegalStateException("Failed to connect with zookeeper, pls check if url " + url + " is correct.");
        }
    }

    /**
     * @param key e.g., {service}.configurators, {service}.tagrouters, {group}.dubbo.properties
     * @return
     */
    @Override
    public String getInternalProperty(String key) {
        return zkClient.getContent(buildPathKey("", key));
    }

    @Override
    protected void doClose() throws Exception {
        // zkClient is shared in framework, should not close it here
        // zkClient.close();
        // See: org.apache.dubbo.remoting.zookeeper.AbstractZookeeperTransporter#destroy()
        // All zk clients is created and destroyed in ZookeeperTransporter.
        zkClient = null;
    }

    /**
     * 发布配置
     * @param pathKey
     * @param content
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doPublishConfig(String pathKey, String content) throws Exception {
        /** 将配置项写入zk中,是否临时节点:false */
        zkClient.create(pathKey, content, false);
        return true;
    }

    /**
     * 通过cas机制发布配置
     * @param key
     * @param group
     * @param content
     * @param ticket
     * @return
     */
    @Override
    public boolean publishConfigCas(String key, String group, String content, Object ticket) {
        try {
            if (ticket != null && !(ticket instanceof Stat)) {
                throw new IllegalArgumentException("zookeeper publishConfigCas requires stat type ticket");
            }
            /** 构建key */
            String pathKey = buildPathKey(group, key);
            /** 创建或者更新,是否临时节点:false */
            zkClient.createOrUpdate(pathKey, content, false, ticket == null ? 0 : ((Stat) ticket).getVersion());
            return true;
        } catch (Exception e) {
            logger.warn("zookeeper publishConfigCas failed.", e);
            return false;
        }
    }

    /**
     * 获取配置信息
     * @param pathKey
     * @return
     * @throws Exception
     */
    @Override
    protected String doGetConfig(String pathKey) throws Exception {
        return zkClient.getContent(pathKey);
    }

    /**
     * 获取配置项信息
     * @param key
     * @param group
     * @return
     */
    @Override
    public ConfigItem getConfigItem(String key, String group) {
        String pathKey = buildPathKey(group, key);
        return zkClient.getConfigItem(pathKey);
    }

    /**
     * 删除配置信息
     * @param pathKey
     * @return
     * @throws Exception
     */
    @Override
    protected boolean doRemoveConfig(String pathKey) throws Exception {
        zkClient.delete(pathKey);
        return true;
    }

    /**
     * 获取一批配置key集合
     * @param groupPath
     * @return
     */
    @Override
    protected Collection<String> doGetConfigKeys(String groupPath) {
        return zkClient.getChildren(groupPath);
    }

    @Override
    protected void doAddListener(String pathKey, ConfigurationListener listener) {
        /**
         * 针对指定的配置项去施加一个监听器
         * cacheListener的addListener方法只是维护了缓存结构
         * zkClient.addDataListener才是实际与zk集群交互施加监听器
         */
        cacheListener.addListener(pathKey, listener);
        zkClient.addDataListener(pathKey, cacheListener, executor);
    }

    @Override
    protected void doRemoveListener(String pathKey, ConfigurationListener listener) {
        /** 先从cacheListener里的keyListeners缓存结构中将监听器移除 */
        cacheListener.removeListener(pathKey, listener);
        Set<ConfigurationListener> configurationListeners = cacheListener.getConfigurationListeners(pathKey);
        if (CollectionUtils.isNotEmpty(configurationListeners)) {
            /** 与zk集群进行交互,进行实际的删除监听器逻辑 */
            zkClient.removeDataListener(pathKey, cacheListener);
        }
    }
}

ZookeeperDynamicConfiguration初始化时调用父类TreePathDynamicConfiguration初始化方法

public TreePathDynamicConfiguration(URL url) {
    /** 调用父类AbstractDynamicConfiguration的构造方法,其实就是构建了一个线程池 */
    super(url);
    /** 根据一定规则去构造zk的path,如:/dubbo/config/dubbo */
    this.rootPath = getRootPath(url);
}

TreePathDynamicConfiguration类初始化时调用父类AbstractDynamicConfiguration的初始化方法

public AbstractDynamicConfiguration(URL url) {
    this(getThreadPoolPrefixName(url), getThreadPoolSize(url), getThreadPoolKeepAliveTime(url), getGroup(url),
            getTimeout(url));
}

/**
 *
 * @param threadPoolPrefixName 线程池前缀名称
 * @param threadPoolSize 线程池的大小
 * @param keepAliveTime 线程空闲多少时间被回收
 * @param group 组名
 * @param timeout 超时时间
 */
public AbstractDynamicConfiguration(String threadPoolPrefixName,
                                    int threadPoolSize,
                                    long keepAliveTime,
                                    String group,
                                    long timeout) {
    this.workersThreadPool = initWorkersThreadPool(threadPoolPrefixName, threadPoolSize, keepAliveTime);
    this.group = group;
    this.timeout = timeout;
}

三、Apollo配置中心

所有的服务治理规则都是全局性的,默认从公共命名空间 dubbo 读取和订阅:

不同的规则以不同的 key 后缀区分:

  • configurators,覆盖规则
  • tag-router,标签路由
  • condition-router,条件路由
  • migration, 迁移规则

1、类体系

2、源码分析

public class ApolloDynamicConfiguration implements DynamicConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(ApolloDynamicConfiguration.class);
    private static final String APOLLO_ENV_KEY = "env";
    private static final String APOLLO_ADDR_KEY = "apollo.meta";
    private static final String APOLLO_CLUSTER_KEY = "apollo.cluster";
    private static final String APOLLO_PROTOCOL_PREFIX = "http://";
    private static final String APOLLO_APPLICATION_KEY = "application";
    private static final String APOLLO_APPID_KEY = "app.id";

    private final URL url;
    /** 与Apollo交互的客户端 */
    private final Config dubboConfig;
    /** dubbo配置文件 */
    private final ConfigFile dubboConfigFile;
    /** 缓存监听器,key默认为公共命名空间 dubbo */
    private final ConcurrentMap<String, ApolloListener> listeners = new ConcurrentHashMap<>();

    ApolloDynamicConfiguration(URL url) {
        this.url = url;
        // Instead of using Dubbo's configuration, I would suggest use the original configuration method Apollo provides.
        String configEnv = url.getParameter(APOLLO_ENV_KEY);
        String configAddr = getAddressWithProtocolPrefix(url);
        String configCluster = url.getParameter(CLUSTER_KEY);
        String configAppId = url.getParameter(APOLLO_APPID_KEY);
        if (StringUtils.isEmpty(System.getProperty(APOLLO_ENV_KEY)) && configEnv != null) {
            System.setProperty(APOLLO_ENV_KEY, configEnv);
        }
        if (StringUtils.isEmpty(System.getProperty(APOLLO_ADDR_KEY)) && !ANYHOST_VALUE.equals(url.getHost())) {
            System.setProperty(APOLLO_ADDR_KEY, configAddr);
        }
        if (StringUtils.isEmpty(System.getProperty(APOLLO_CLUSTER_KEY)) && configCluster != null) {
            System.setProperty(APOLLO_CLUSTER_KEY, configCluster);
        }
        if (StringUtils.isEmpty(System.getProperty(APOLLO_APPID_KEY)) && configAppId != null) {
            System.setProperty(APOLLO_APPID_KEY, configAppId);
        }

        String namespace = url.getParameter(CONFIG_NAMESPACE_KEY, DEFAULT_GROUP);
        String apolloNamespace = StringUtils.isEmpty(namespace) ? url.getGroup(DEFAULT_GROUP) : namespace;
        /** 调用ConfigService.getConfig方法封装目标命名空间 apollo的客户端 */
        dubboConfig = ConfigService.getConfig(apolloNamespace);
        /** 调用apollo的API获取dubbo配置文件 */
        dubboConfigFile = ConfigService.getConfigFile(apolloNamespace, ConfigFileFormat.Properties);

        // Decide to fail or to continue when failed to connect to remote server.
        boolean check = url.getParameter(CHECK_KEY, true);
        if (dubboConfig.getSourceType() != ConfigSourceType.REMOTE) {
            if (check) {
                throw new IllegalStateException("Failed to connect to config center, the config center is Apollo, " +
                    "the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv));
            } else {
                logger.warn("Failed to connect to config center, the config center is Apollo, " +
                    "the address is: " + (StringUtils.isNotEmpty(configAddr) ? configAddr : configEnv) +
                    ", will use the local cache value instead before eventually the connection is established.");
            }
        }
    }

    @Override
    public void close() {
        try {
            listeners.clear();
        } catch (UnsupportedOperationException e) {
            logger.warn("Failed to close connect from config center, the config center is Apollo");
        }
    }

    private String getAddressWithProtocolPrefix(URL url) {
        String address = url.getBackupAddress();
        if (StringUtils.isNotEmpty(address)) {
            address = Arrays.stream(COMMA_SPLIT_PATTERN.split(address))
                .map(addr -> {
                    if (addr.startsWith(APOLLO_PROTOCOL_PREFIX)) {
                        return addr;
                    }
                    return APOLLO_PROTOCOL_PREFIX + addr;
                })
                .collect(Collectors.joining(","));
        }
        return address;
    }

    /**
     * Since all governance rules will lay under dubbo group, this method now always uses the default dubboConfig and
     * ignores the group parameter.
     */
    /**
     * 添加监听
     * @param key      the key to represent a configuration
     * @param group    the group where the key belongs to
     * @param listener configuration listener
     */
    @Override
    public void addListener(String key, String group, ConfigurationListener listener) {
        ApolloListener apolloListener = listeners.computeIfAbsent(group + key, k -> createTargetListener(key, group));
        apolloListener.addListener(listener);
        dubboConfig.addChangeListener(apolloListener, Collections.singleton(key));
    }

    /**
     * 删除监听
     * @param key      the key to represent a configuration
     * @param group    the group where the key belongs to
     * @param listener configuration listener
     */
    @Override
    public void removeListener(String key, String group, ConfigurationListener listener) {
        ApolloListener apolloListener = listeners.get(group + key);
        if (apolloListener != null) {
            apolloListener.removeListener(listener);
            if (!apolloListener.hasInternalListener()) {
                dubboConfig.removeChangeListener(apolloListener);
            }
        }
    }

    /**
     * 读取配置
     * @param key     the key to represent a configuration
     * @param group   the group where the key belongs to
     * @param timeout timeout value for fetching the target config
     * @return
     * @throws IllegalStateException
     */
    @Override
    public String getConfig(String key, String group, long timeout) throws IllegalStateException {
        if (StringUtils.isNotEmpty(group)) {
            if (group.equals(url.getApplication())) {
                return ConfigService.getAppConfig().getProperty(key, null);
            } else {
                return ConfigService.getConfig(group).getProperty(key, null);
            }
        }
        return dubboConfig.getProperty(key, null);
    }

    /**
     * Recommend specify namespace and group when using Apollo.
     * <p>
     * <dubbo:config-center namespace="governance" group="dubbo" />, 'dubbo=governance' is for governance rules while
     * 'group=dubbo' is for properties files.
     *
     * @param key     default value is 'dubbo.properties', currently useless for Apollo.
     * @param group
     * @param timeout
     * @return
     * @throws IllegalStateException
     */
    @Override
    public String getProperties(String key, String group, long timeout) throws IllegalStateException {
        if (StringUtils.isEmpty(group)) {
            return dubboConfigFile.getContent();
        }
        if (group.equals(url.getApplication())) {
            return ConfigService.getConfigFile(APOLLO_APPLICATION_KEY, ConfigFileFormat.Properties).getContent();
        }

        ConfigFile configFile = ConfigService.getConfigFile(group, ConfigFileFormat.Properties);
        if (configFile == null) {
            throw new IllegalStateException("There is no namespace named " + group + " in Apollo.");
        }
        return configFile.getContent();
    }

    /**
     * This method will be used by Configuration to get valid value at runtime.
     * The group is expected to be 'app level', which can be fetched from the 'config.appnamespace' in url if necessary.
     * But I think Apollo's inheritance feature of namespace can solve the problem .
     */
    @Override
    public String getInternalProperty(String key) {
        return dubboConfig.getProperty(key, null);
    }

    /**
     * Ignores the group parameter.
     *
     * @param key   property key the native listener will listen on
     * @param group to distinguish different set of properties
     * @return
     */
    private ApolloListener createTargetListener(String key, String group) {
        return new ApolloListener();
    }

    public class ApolloListener implements ConfigChangeListener {

        private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();

        ApolloListener() {
        }

        @Override
        public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
            for (String key : changeEvent.changedKeys()) {
                ConfigChange change = changeEvent.getChange(key);
                if ("".equals(change.getNewValue())) {
                    logger.warn("an empty rule is received for " + key + ", the current working rule is " +
                        change.getOldValue() + ", the empty rule will not take effect.");
                    return;
                }

                ConfigChangedEvent event = new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change));
                listeners.forEach(listener -> listener.process(event));
            }
        }

        private ConfigChangeType getChangeType(ConfigChange change) {
            if (change.getChangeType() == PropertyChangeType.DELETED) {
                return ConfigChangeType.DELETED;
            }
            return ConfigChangeType.MODIFIED;
        }

        void addListener(ConfigurationListener configurationListener) {
            this.listeners.add(configurationListener);
        }

        void removeListener(ConfigurationListener configurationListener) {
            this.listeners.remove(configurationListener);
        }

        boolean hasInternalListener() {
            return listeners != null && listeners.size() > 0;
        }
    }

}

四、Nacos配置中心

所有的服务治理规则都是全局的,默认从 namespace: public 下进行读取, 通过 dataId: interface name 以及 group: dubbo 去读取和订阅:

不同的规则以 dataId 的后缀区分:

  • configurators,覆盖规则
  • tag-router,标签路由
  • condition-router,条件路由
  • migration, 迁移规则

2、类体系

3、源码分析

public class NacosDynamicConfiguration implements DynamicConfiguration {

    private static final String GET_CONFIG_KEYS_PATH = "/v1/cs/configs";

    private final Logger logger = LoggerFactory.getLogger(getClass());
    /**
     * the default timeout in millis to get config from nacos
     */
    private static final long DEFAULT_TIMEOUT = 5000L;

    private Properties nacosProperties;

    /**
     * The nacos configService
     */
    private final NacosConfigServiceWrapper configService;

    private HttpAgent httpAgent;

    /**
     * The map store the key to {@link NacosConfigListener} mapping
     */
    private final Map<String, NacosConfigListener> watchListenerMap;

    private MD5Utils md5Utils = new MD5Utils();

    NacosDynamicConfiguration(URL url) {
        this.nacosProperties = buildNacosProperties(url);
        /** 相对于nacos配置服务客户端 */
        this.configService = buildConfigService(url);
        this.httpAgent = getHttpAgent(configService.getConfigService());
        watchListenerMap = new ConcurrentHashMap<>();
    }

    private NacosConfigServiceWrapper buildConfigService(URL url) {
        ConfigService configService = null;
        try {
            /** 通过配置文件构建ConfigService */
            configService = NacosFactory.createConfigService(nacosProperties);
        } catch (NacosException e) {
            if (logger.isErrorEnabled()) {
                logger.error(e.getErrMsg(), e);
            }
            throw new IllegalStateException(e);
        }
        /**
         * 将ConfigService封装到NacosConfigServiceWrapper中
         * NacosConfigServiceWrapper为ConfigService的包装类
         */
        return new NacosConfigServiceWrapper(configService);
    }

    private HttpAgent getHttpAgent(ConfigService configService) {
        HttpAgent agent = null;
        try {
            Field field = configService.getClass().getDeclaredField("agent");
            field.setAccessible(true);
            agent = (HttpAgent) field.get(configService);
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        return agent;
    }

    private Properties buildNacosProperties(URL url) {
        Properties properties = new Properties();
        setServerAddr(url, properties);
        setProperties(url, properties);
        return properties;
    }

    private void setServerAddr(URL url, Properties properties) {
        StringBuilder serverAddrBuilder =
                new StringBuilder(url.getHost()) // Host
                        .append(':')
                        .append(url.getPort()); // Port

        // Append backup parameter as other servers
        String backup = url.getParameter(BACKUP_KEY);
        if (backup != null) {
            serverAddrBuilder.append(',').append(backup);
        }
        String serverAddr = serverAddrBuilder.toString();
        properties.put(SERVER_ADDR, serverAddr);
    }

    private static void setProperties(URL url, Properties properties) {
        // Get the parameters from constants
        Map<String, String> parameters = url.getParameters(of(PropertyKeyConst.class));
        // Put all parameters
        properties.putAll(parameters);
    }

    private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName) {
        String propertyValue = url.getParameter(propertyName);
        if (StringUtils.isNotEmpty(propertyValue)) {
            properties.setProperty(propertyName, propertyValue);
        }
    }

    private static void putPropertyIfAbsent(URL url, Properties properties, String propertyName, String defaultValue) {
        String propertyValue = url.getParameter(propertyName);
        if (StringUtils.isNotEmpty(propertyValue)) {
            properties.setProperty(propertyName, propertyValue);
        } else {
            properties.setProperty(propertyName, defaultValue);
        }
    }

    /**
     * Ignores the group parameter.
     *
     * @param key   property key the native listener will listen on
     * @param group to distinguish different set of properties
     * @return
     */
    private NacosConfigListener createTargetListener(String key, String group) {
        NacosConfigListener configListener = new NacosConfigListener();
        configListener.fillContext(key, group);
        return configListener;
    }

    @Override
    public void close() throws Exception {
        configService.shutdown();
    }

    /**
     * 添加监听器
     * @param key      the key to represent a configuration
     * @param group    the group where the key belongs to
     * @param listener configuration listener
     */
    @Override
    public void addListener(String key, String group, ConfigurationListener listener) {
        String listenerKey = buildListenerKey(key, group);
        NacosConfigListener nacosConfigListener =
                watchListenerMap.computeIfAbsent(listenerKey, k -> createTargetListener(key, group));
        nacosConfigListener.addListener(listener);
        try {
            configService.addListener(key, group, nacosConfigListener);
        } catch (NacosException e) {
            logger.error(e.getMessage());
        }
    }

    /**
     * 删除监听器
     * @param key      the key to represent a configuration
     * @param group    the group where the key belongs to
     * @param listener configuration listener
     */
    @Override
    public void removeListener(String key, String group, ConfigurationListener listener) {
        String listenerKey = buildListenerKey(key, group);
        NacosConfigListener eventListener = watchListenerMap.get(listenerKey);
        if (eventListener != null) {
            eventListener.removeListener(listener);
        }
    }

    /**
     * 获取配置
     * @param key     the key to represent a configuration
     * @param group   the group where the key belongs to
     * @param timeout timeout value for fetching the target config
     * @return
     * @throws IllegalStateException
     */
    @Override
    public String getConfig(String key, String group, long timeout) throws IllegalStateException {
        try {
            long nacosTimeout = timeout < 0 ? getDefaultTimeout() : timeout;
            if (StringUtils.isEmpty(group)) {
                group = DEFAULT_GROUP;
            }
            return configService.getConfig(key, group, nacosTimeout);
        } catch (NacosException e) {
            logger.error(e.getMessage());
        }
        return null;
    }

    /**
     * 获取配置项
     * @param key
     * @param group
     * @return
     */
    @Override
    public ConfigItem getConfigItem(String key, String group) {
        String content = getConfig(key, group);
        String casMd5 = "";
        if (StringUtils.isNotEmpty(content)) {
            casMd5 = md5Utils.getMd5(content);
        }
        return new ConfigItem(content, casMd5);
    }

    @Override
    public Object getInternalProperty(String key) {
        try {
            return configService.getConfig(key, DEFAULT_GROUP, getDefaultTimeout());
        } catch (NacosException e) {
            logger.error(e.getMessage());
        }
        return null;
    }

    /**
     * 添加配置
     * @param key     the key to represent a configuration
     * @param group   the group where the key belongs to
     * @param content the content of configuration
     * @return
     */
    @Override
    public boolean publishConfig(String key, String group, String content) {
        boolean published = false;
        try {
            published = configService.publishConfig(key, group, content);
        } catch (NacosException e) {
            logger.error(e.getErrMsg(), e);
        }
        return published;
    }

    @Override
    public boolean publishConfigCas(String key, String group, String content, Object ticket) {
        try {
            if (!(ticket instanceof String)) {
                throw new IllegalArgumentException("nacos publishConfigCas requires string type ticket");
            }
            return configService.publishConfigCas(key, group, content, (String) ticket);
        } catch (NacosException e) {
            logger.warn("nacos publishConfigCas failed.", e);
            return false;
        }
    }

    @Override
    public long getDefaultTimeout() {
        return DEFAULT_TIMEOUT;
    }

    /**
     * TODO Nacos does not support atomic update of the value mapped to a key.
     *
     * @param group the specified group
     * @return
     */
    @Override
    public SortedSet<String> getConfigKeys(String group) {
        // TODO use Nacos Client API to replace HTTP Open API
        SortedSet<String> keys = new TreeSet<>();
        try {

            Map<String, String> paramsValues = new HashMap<>();
            paramsValues.put("search", "accurate");
            paramsValues.put("dataId", "");
            paramsValues.put("group", group.replace(SLASH_CHAR, HYPHEN_CHAR));
            paramsValues.put("pageNo", "1");
            paramsValues.put("pageSize", String.valueOf(Integer.MAX_VALUE));

            String encoding = getProperty(ENCODE, "UTF-8");

            HttpRestResult<String> result = httpAgent.httpGet(GET_CONFIG_KEYS_PATH, emptyMap(), paramsValues, encoding, 5 * 1000);
            Stream<String> keysStream = toKeysStream(result.getData());
            if (keysStream != null) {
                keysStream.forEach(keys::add);
            }
        } catch (Exception e) {
            if (logger.isErrorEnabled()) {
                logger.error(e.getMessage(), e);
            }
        }
        return keys;
    }


    @Override
    public boolean removeConfig(String key, String group) {
        boolean removed = false;
        try {
            removed = configService.removeConfig(key, group);
        } catch (NacosException e) {
            if (logger.isErrorEnabled()) {
                logger.error(e.getMessage(), e);
            }
        }
        return removed;
    }

    private Stream<String> toKeysStream(String content) {
        JSONObject jsonObject = JSON.parseObject(content);
        if (jsonObject == null) {
            return null;
        }
        JSONArray pageItems = jsonObject.getJSONArray("pageItems");
        if (pageItems == null) {
            return null;
        }
        return pageItems.stream()
                .map(object -> (JSONObject) object)
                .map(json -> json.getString("dataId"));
    }

    private String getProperty(String name, String defaultValue) {
        return nacosProperties.getProperty(name, defaultValue);
    }

    public class NacosConfigListener extends AbstractSharedListener {

        private Set<ConfigurationListener> listeners = new CopyOnWriteArraySet<>();
        /**
         * cache data to store old value
         */
        private Map<String, String> cacheData = new ConcurrentHashMap<>();

        @Override
        public Executor getExecutor() {
            return null;
        }

        /**
         * receive
         *
         * @param dataId     data ID
         * @param group      group
         * @param configInfo content
         */
        @Override
        public void innerReceive(String dataId, String group, String configInfo) {
            String oldValue = cacheData.get(dataId);
            ConfigChangedEvent event = new ConfigChangedEvent(dataId, group, configInfo, getChangeType(configInfo, oldValue));
            if (configInfo == null) {
                cacheData.remove(dataId);
            } else {
                cacheData.put(dataId, configInfo);
            }
            listeners.forEach(listener -> listener.process(event));
        }

        void addListener(ConfigurationListener configurationListener) {

            this.listeners.add(configurationListener);
        }

        void removeListener(ConfigurationListener configurationListener) {
            this.listeners.remove(configurationListener);
        }

        private ConfigChangeType getChangeType(String configInfo, String oldValue) {
            if (StringUtils.isBlank(configInfo)) {
                return ConfigChangeType.DELETED;
            }
            if (StringUtils.isBlank(oldValue)) {
                return ConfigChangeType.ADDED;
            }
            return ConfigChangeType.MODIFIED;
        }
    }

    protected String buildListenerKey(String key, String group) {
        return key + HYPHEN_CHAR + group;
    }
}

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表