背景
在学习和nacos的时候,app开发定制发现当手动把服务实例下线后,nacosapp开发定制的服务列表已更新,但是ribbonapp开发定制拉取的服务列表还未更新,app开发定制为了解决这个问题,app开发定制我们先来了解一下他们app开发定制各自的服务更新机制
问题解析
首先大家先思考两个问题:
1,是怎么拉取服务的?
2,ribbon是怎么拉取服务的?
在这里先明确一下:
这里的拉取服务实例是一个懒加载的过程,也就是说在第一次请求的时候才会去拉取服务实例
首先我们先来看下ribbon是怎么拉取服务实例的
//这里是LoadBalancerFeignClient的execute方法中获取ribbon配置的代码IClientConfig requestConfig = getClientConfig(options, clientName);
- 1
- 2
上面会通过你的服务名去初始化ribbon的一些配置,获取ZoneAwareLoadBalancer的实例对象
@Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
在初始化ZoneAwareLoadBalancer的时候会调用DynamicServerListLoadBalancer的restOfInit方法,这个方法就是重点方法
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); //启动一个定时任务去获取服务列表 enableAndInitLearnNewServersFeature(); //拉取服务列表 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
然后我们看下enableAndInitLearnNewServersFeature方法
public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); } //定义了一个UpdateAction,复习了更新的方法,可以发现这里的updateListOfServers方法其实就是restOfInit方法中的updateListOfServers方法 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } };
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
继续看下PollingServerListUpdater.start方法
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, //默认延迟1s initialDelayMs, //默认30s refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
从上面可以很明显的发现,start方法其实就是用了一个线程池去循环执行updateAction.doUpdate();
其实也就是执行updateListOfServers更新服务实例的操作,这里可以发现,这里的定时获取实例的时间间隔是30s
然后我们来看下具体的拉取服务的updateListOfServers方法
@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { //拉取服务实例 servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } //更新缓存 updateAllServerList(servers); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
继续看下**serverListImpl.getUpdatedListOfServers();**注意这里就回去调用nacos的服务
@Override public List<NacosServer> getUpdatedListOfServers() { return getServers(); } private List<NacosServer> getServers() { try { //获取nacos的group String group = discoveryProperties.getGroup(); //根据服务id和group去获取服务的实例 List<Instance> instances = discoveryProperties.namingServiceInstance() .selectInstances(serviceId, group, true); return instancesToServerList(instances); } catch (Exception e) { throw new IllegalStateException( "Can not get service instances from nacos, serviceId=" + serviceId, e); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
看下selectInstances方法的实现
@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
这里的subscribe默认是订阅,继续看下getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); //服务名生成的key String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //先从缓存取 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); //记录正在修改服务 updatingMap.put(serviceName, new Object()); //拉取服务 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //定时任务 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
updateServiceNow方法就会去nacos拉取服务然后放到一个本地缓存中,这里不展开,有兴趣可以自行去看看
下面看一下scheduleUpdateIfAbsent(serviceName, clusters);
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } public synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
可以看到这里是一个默认1s的定时任务,这里执行的task其实就是拉取服务的方法
然后我们会发现一个问题,ribbon是默认30s刷新服务,而nacos是默认1s拉取服务,这样会导致当服务实例下线后,nacos已经感知到,但是ribbon还是旧的服务列表,那么我们要怎么解决这个问题?
解决方案
思路:当监听到nacos服务列表变更的时候通知ribbon,更新ribbon的服务列表
从上面我们可知更新ribbon的操作是由PollingServerListUpdater完成的,那么
1,自定义自己的PollingServerListUpdater类替换以前的
2,复写里面的start方法,定义一个监听器来监听nacos服务的改变
2,监听到服务改变后,执行ribbon服务更新
定义MyPollingServerListUpdater
@Component("ribbonServerListUpdater")public class MyPollingServerListUpdater implements ServerListUpdater {... @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { System.out.println("错误"); } } }; NacosServiceListWatcher nacosServiceListWatcher = new NacosServiceListWatcher(updateAction); try { nacosServiceListWatcher.startWatch(getServerName(updateAction)); } catch (Exception e) { e.printStackTrace(); } scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { System.out.println("业务操作"); } }} //获取服务名 private String getServerName(UpdateAction updateAction) throws Exception { Class<? extends UpdateAction> aClass = updateAction.getClass(); Field field = aClass.getDeclaredField("this$0"); field.setAccessible(true); ZoneAwareLoadBalancer zoneAwareLoadBalancer = (ZoneAwareLoadBalancer ) field.get(updateAction); return zoneAwareLoadBalancer.getName(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
定义监听器
public class NacosServiceListWatcher { private static NacosDiscoveryProperties nacosDiscoveryProperties = (NacosDiscoveryProperties)ApplicationContextUtils.getBeanByClassName(NacosDiscoveryProperties.class); private ServerListUpdater.UpdateAction updateAction; public NacosServiceListWatcher(ServerListUpdater.UpdateAction updateAction){ this.updateAction = updateAction; } public void startWatch(String serviceName) throws NacosException { nacosDiscoveryProperties.namingServiceInstance().subscribe(serviceName, nacosDiscoveryProperties.getGroup(), event -> { if(updateAction != null){ updateAction.doUpdate(); } NamingEvent event1 = (NamingEvent) event; List<Instance> instances = event1.getInstances(); String name = event1.getServiceName(); if(instances != null && !instances.isEmpty()){ instances.stream().forEach(instance -> { System.out.println("服务"+name+":"+instance); }); }else { System.out.println("服务"+name+"列表为空"); } }); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
到这里就结束了,有问题希望大家能指出~