Dubbo-聊聊注册中心的设计( 四 )

ListenerRegistryWrapperListenerRegistryWrapper是对RegistryFactory的扩展,创建Registry时候会包装一个ListenerRegistryWrapper对象,内部维护一个监听器RegistryServiceListener,当注册、取消注册、订阅以及取消订阅的时候,会发送通知 。
AbstractRegistryAbstractRegistry该抽象类是对Registry接口的实现,实现了Registry接口中的注册、订阅、查询、通知等方法,但是注册、订阅、查询、通知等方法只是简单地把URL加入对应的集合,没有具体的注册或订阅逻辑 。此外该类还实现了缓存机制,只不过,它的缓存有两份,一份在内存,一份在磁盘 。
    //本地的Properties文件缓存,在内存中 与file是同步的    private final Properties properties = new Properties();    //该单线程池负责讲Provider的全量数据同步到properties字段和缓存文件中,    //如果syncSaveFile配置为false,就由该线程池异步完成文件写入    private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));    //是否异步写入    private boolean syncSaveFile;    //注册数据的版本号 防止旧数据覆盖新数据    private final AtomicLong lastCacheChanged = new AtomicLong();    //保存Properties失败以后异常重试次数    private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger();    //已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的    private final Set<URL> registered = new ConcurrentHashSet<>();    //消费者Url订阅的监听器集合    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();    //费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合    private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();    //注册中心URL    private URL registryUrl;    //本地磁盘Properties文件    private File file;变更通知当 Provider 端暴露的 URL 发生变化时,ZooKeeper 等服务发现组件会通知 Consumer 端的 Registry 组件,Registry 组件会调用 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表并写入 properties 集合以及本地文件中 。
    protected void notify(List<URL> urls) {        if (CollectionUtils.isEmpty(urls)) {            return;        }        //遍历订阅消费者URL的监听器集合,通知他们        for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {            URL url = entry.getKey();            //匹配            if (!UrlUtils.isMatch(url, urls.get(0))) {                continue;            }            //遍历所有监听器            Set<NotifyListener> listeners = entry.getValue();            if (listeners != null) {                for (NotifyListener listener : listeners) {                    try {                        //通知监听器,URL变化结果                        notify(url, listener, filterEmpty(url, urls));                    } catch (Throwable t) {                        logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);                    }                }            }        }    }    /**     * Notify changes from the Provider side.     *     * @param url      consumer side url     * @param listener listener     * @param urls     provider latest urls     */    protected void notify(URL url, NotifyListener listener, List<URL> urls) {        //参数校验        if (url == null) {            throw new IllegalArgumentException("notify url == null");        }        if (listener == null) {            throw new IllegalArgumentException("notify listener == null");        }        if ((CollectionUtils.isEmpty(urls))                && !ANY_VALUE.equals(url.getServiceInterface())) {            logger.warn("Ignore empty notify urls for subscribe url " + url);            return;        }        if (logger.isInfoEnabled()) {            logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);        }        Map<String, List<URL>> result = new HashMap<>();        for (URL u : urls) {            //按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类            if (UrlUtils.isMatch(url, u)) {                String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);                List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());                //分类结果放入result                categoryList.add(u);            }        }        if (result.size() == 0) {            return;        }        //处理通知监听器URL变化结果        Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {            String category = entry.getKey();            List<URL> categoryList = entry.getValue();            //把分类标实和分类后的列表放入notified的value中            categoryNotified.put(category, categoryList);            //调用NotifyListener监听器            listener.notify(categoryList);            //单个Url变更,并将更改信息同步至内存缓存和磁盘缓存中            saveProperties(url);        }    }    private void saveProperties(URL url) {        if (file == null) {            return;        }        try {            StringBuilder buf = new StringBuilder();            //从通知列表中取出信息            Map<String, List<URL>> categoryNotified = notified.get(url);            //以空格为间隔拼接            if (categoryNotified != null) {                for (List<URL> us : categoryNotified.values()) {                    for (URL u : us) {                        if (buf.length() > 0) {                            buf.append(URL_SEPARATOR);                        }                        buf.append(u.toFullString());                    }                }            }            //推送url至内存缓存            properties.setProperty(url.getServiceKey(), buf.toString());            //增加版本号            long version = lastCacheChanged.incrementAndGet();            if (syncSaveFile) {                //如果磁盘文件未被加锁,将内存缓存同步至磁盘缓存                doSaveProperties(version);            } else {                //如果被加锁了,使用新的线程去执行,当前线程返回                registryCacheExecutor.execute(new SaveProperties(version));            }        } catch (Throwable t) {            logger.warn(t.getMessage(), t);        }    }

经验总结扩展阅读