ListenerRegistryWrapperListenerRegistryWrapper是对RegistryFactory的扩展,创建Registry时候会包装一个ListenerRegistryWrapper对象,内部维护一个监听器RegistryServiceListener,当注册、取消注册、订阅以及取消订阅的时候,会发送通知 。
AbstractRegistryAbstractRegistry该抽象类是对Registry接口的实现,实现了Registry接口中的注册、订阅、查询、通知等方法,但是注册、订阅、查询、通知等方法只是简单地把URL加入对应的集合,没有具体的注册或订阅逻辑 。此外该类还实现了缓存机制,只不过,它的缓存有两份,一份在内存,一份在磁盘 。
//本地的Properties文件缓存,在内存中 与file是同步的 private final Properties properties = new Properties(); //该单线程池负责讲Provider的全量数据同步到properties字段和缓存文件中, //如果syncSaveFile配置为false,就由该线程池异步完成文件写入 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); //是否异步写入 private boolean syncSaveFile; //注册数据的版本号 防止旧数据覆盖新数据 private final AtomicLong lastCacheChanged = new AtomicLong(); //保存Properties失败以后异常重试次数 private final AtomicInteger savePropertiesRetryTimes = new AtomicInteger(); //已经注册服务的URL集合,注册的URL不仅仅可以是服务提供者的,也可以是服务消费者的 private final Set<URL> registered = new ConcurrentHashSet<>(); //消费者Url订阅的监听器集合 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>(); //费者被通知的服务URL集合,最外部URL的key是消费者的URL,value是一个map集合,里面的map中的key为分类名,value是该类下的服务url集合 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>(); //注册中心URL private URL registryUrl; //本地磁盘Properties文件 private File file;变更通知当 Provider 端暴露的 URL 发生变化时,ZooKeeper 等服务发现组件会通知 Consumer 端的 Registry 组件,Registry 组件会调用 notify() 方法,被通知的 Consumer 能匹配到所有 Provider 的 URL 列表并写入 properties 集合以及本地文件中 。
protected void notify(List<URL> urls) { if (CollectionUtils.isEmpty(urls)) { return; } //遍历订阅消费者URL的监听器集合,通知他们 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); //匹配 if (!UrlUtils.isMatch(url, urls.get(0))) { continue; } //遍历所有监听器 Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { //通知监听器,URL变化结果 notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t); } } } } } /** * Notify changes from the Provider side. * * @param url consumer side url * @param listener listener * @param urls provider latest urls */ protected void notify(URL url, NotifyListener listener, List<URL> urls) { //参数校验 if (url == null) { throw new IllegalArgumentException("notify url == null"); } if (listener == null) { throw new IllegalArgumentException("notify listener == null"); } if ((CollectionUtils.isEmpty(urls)) && !ANY_VALUE.equals(url.getServiceInterface())) { logger.warn("Ignore empty notify urls for subscribe url " + url); return; } if (logger.isInfoEnabled()) { logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); } Map<String, List<URL>> result = new HashMap<>(); for (URL u : urls) { //按照url中key为category对应的值进行分类,如果没有该值,就找key为providers的值进行分类 if (UrlUtils.isMatch(url, u)) { String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY); List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>()); //分类结果放入result categoryList.add(u); } } if (result.size() == 0) { return; } //处理通知监听器URL变化结果 Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>()); for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); //把分类标实和分类后的列表放入notified的value中 categoryNotified.put(category, categoryList); //调用NotifyListener监听器 listener.notify(categoryList); //单个Url变更,并将更改信息同步至内存缓存和磁盘缓存中 saveProperties(url); } } private void saveProperties(URL url) { if (file == null) { return; } try { StringBuilder buf = new StringBuilder(); //从通知列表中取出信息 Map<String, List<URL>> categoryNotified = notified.get(url); //以空格为间隔拼接 if (categoryNotified != null) { for (List<URL> us : categoryNotified.values()) { for (URL u : us) { if (buf.length() > 0) { buf.append(URL_SEPARATOR); } buf.append(u.toFullString()); } } } //推送url至内存缓存 properties.setProperty(url.getServiceKey(), buf.toString()); //增加版本号 long version = lastCacheChanged.incrementAndGet(); if (syncSaveFile) { //如果磁盘文件未被加锁,将内存缓存同步至磁盘缓存 doSaveProperties(version); } else { //如果被加锁了,使用新的线程去执行,当前线程返回 registryCacheExecutor.execute(new SaveProperties(version)); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } }
经验总结扩展阅读
- 2023年10月2日公司注册好不好 2023年农历八月十八宜公司注册吗
- 2023年10月3日是公司注册的黄道吉日吗 2023年农历八月十九公司注册吉日
- 2023年农历正月十七公司注册吉日 2023年2月7日是公司注册吉日吗
- 2023年10月4日公司注册行吗 2023年10月4日是公司注册吉日吗
- 驱动开发:内核枚举Registry注册表回调
- 2023年10月5日公司注册黄道吉日 2023年10月5日是公司注册的黄道吉日吗
- 2023年2月8日是公司注册的黄道吉日吗 2023年2月8日公司注册黄道吉日
- 2023年10月6日公司注册黄道吉日 2023年农历八月廿二宜公司注册吗
- 2023年农历八月廿三宜公司注册吗 2023年10月7日公司注册吉日一览表
- 2023年农历八月廿四宜公司注册吗 2023年10月8日公司注册黄道吉日