app开发定制ribbon服务列表和nacos服务列表不一致的问题

背景

在学习和nacos的时候,app开发定制发现当手动把服务实例下线后,nacosapp开发定制的服务列表已更新,但是ribbonapp开发定制拉取的服务列表还未更新,app开发定制为了解决这个问题,app开发定制我们先来了解一下他们app开发定制各自的服务更新机制

问题解析

首先大家先思考两个问题:
1,是怎么拉取服务的?
2,ribbon是怎么拉取服务的?

在这里先明确一下:

这里的拉取服务实例是一个懒加载的过程,也就是说在第一次请求的时候才会去拉取服务实例

首先我们先来看下ribbon是怎么拉取服务实例的

//这里是LoadBalancerFeignClient的execute方法中获取ribbon配置的代码IClientConfig requestConfig = getClientConfig(options, clientName);
  • 1
  • 2

上面会通过你的服务名去初始化ribbon的一些配置,获取ZoneAwareLoadBalancer的实例对象

	@Bean	@ConditionalOnMissingBean	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {			return this.propertiesFactory.get(ILoadBalancer.class, config, name);		}		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,				serverListFilter, serverListUpdater);	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在初始化ZoneAwareLoadBalancer的时候会调用DynamicServerListLoadBalancerrestOfInit方法,这个方法就是重点方法

    void restOfInit(IClientConfig clientConfig) {        boolean primeConnection = this.isEnablePrimingConnections();        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()        this.setEnablePrimingConnections(false);        //启动一个定时任务去获取服务列表        enableAndInitLearnNewServersFeature();		//拉取服务列表        updateListOfServers();        if (primeConnection && this.getPrimeConnections() != null) {            this.getPrimeConnections()                    .primeConnections(getReachableServers());        }        this.setEnablePrimingConnections(primeConnection);        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

然后我们看下enableAndInitLearnNewServersFeature方法

public void enableAndInitLearnNewServersFeature() {        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());        serverListUpdater.start(updateAction);    }     //定义了一个UpdateAction,复习了更新的方法,可以发现这里的updateListOfServers方法其实就是restOfInit方法中的updateListOfServers方法 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {        @Override        public void doUpdate() {            updateListOfServers();        }    };
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

继续看下PollingServerListUpdater.start方法

 @Override    public synchronized void start(final UpdateAction updateAction) {        if (isActive.compareAndSet(false, true)) {            final Runnable wrapperRunnable = new Runnable() {                @Override                public void run() {                    if (!isActive.get()) {                        if (scheduledFuture != null) {                            scheduledFuture.cancel(true);                        }                        return;                    }                    try {                        updateAction.doUpdate();                        lastUpdated = System.currentTimeMillis();                    } catch (Exception e) {                        logger.warn("Failed one update cycle", e);                    }                }            };            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(                    wrapperRunnable,                    //默认延迟1s                    initialDelayMs,                    //默认30s                    refreshIntervalMs,                    TimeUnit.MILLISECONDS            );        } else {            logger.info("Already active, no-op");        }    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

从上面可以很明显的发现,start方法其实就是用了一个线程池去循环执行updateAction.doUpdate();
其实也就是执行updateListOfServers更新服务实例的操作,这里可以发现,这里的定时获取实例的时间间隔是30s

然后我们来看下具体的拉取服务的updateListOfServers方法

    @VisibleForTesting    public void updateListOfServers() {        List<T> servers = new ArrayList<T>();        if (serverListImpl != null) {        	//拉取服务实例            servers = serverListImpl.getUpdatedListOfServers();            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",                    getIdentifier(), servers);            if (filter != null) {                servers = filter.getFilteredListOfServers(servers);                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",                        getIdentifier(), servers);            }        }        //更新缓存        updateAllServerList(servers);    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

继续看下**serverListImpl.getUpdatedListOfServers();**注意这里就回去调用nacos的服务

	@Override	public List<NacosServer> getUpdatedListOfServers() {		return getServers();	}	private List<NacosServer> getServers() {		try {			//获取nacos的group			String group = discoveryProperties.getGroup();			//根据服务id和group去获取服务的实例			List<Instance> instances = discoveryProperties.namingServiceInstance()					.selectInstances(serviceId, group, true);			return instancesToServerList(instances);		}		catch (Exception e) {			throw new IllegalStateException(					"Can not get service instances from nacos, serviceId=" + serviceId,					e);		}	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

看下selectInstances方法的实现

    @Override    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {        ServiceInfo serviceInfo;        if (subscribe) {            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));        } else {            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));        }        return selectInstances(serviceInfo, healthy);    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

这里的subscribe默认是订阅,继续看下getServiceInfo

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());        //服务名生成的key        String key = ServiceInfo.getKey(serviceName, clusters);        if (failoverReactor.isFailoverSwitch()) {            return failoverReactor.getService(key);        }		//先从缓存取        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);        if (null == serviceObj) {            serviceObj = new ServiceInfo(serviceName, clusters);			            serviceInfoMap.put(serviceObj.getKey(), serviceObj);			//记录正在修改服务            updatingMap.put(serviceName, new Object());            //拉取服务            updateServiceNow(serviceName, clusters);            updatingMap.remove(serviceName);        } else if (updatingMap.containsKey(serviceName)) {            if (UPDATE_HOLD_INTERVAL > 0) {                // hold a moment waiting for update finish                synchronized (serviceObj) {                    try {                        serviceObj.wait(UPDATE_HOLD_INTERVAL);                    } catch (InterruptedException e) {                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);                    }                }            }        }		//定时任务        scheduleUpdateIfAbsent(serviceName, clusters);        return serviceInfoMap.get(serviceObj.getKey());    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

updateServiceNow方法就会去nacos拉取服务然后放到一个本地缓存中,这里不展开,有兴趣可以自行去看看

下面看一下scheduleUpdateIfAbsent(serviceName, clusters);

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {            return;        }        synchronized (futureMap) {            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {                return;            }            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);        }    }    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

可以看到这里是一个默认1s的定时任务,这里执行的task其实就是拉取服务的方法

然后我们会发现一个问题,ribbon是默认30s刷新服务,而nacos是默认1s拉取服务,这样会导致当服务实例下线后,nacos已经感知到,但是ribbon还是旧的服务列表,那么我们要怎么解决这个问题?

解决方案

思路:当监听到nacos服务列表变更的时候通知ribbon,更新ribbon的服务列表
从上面我们可知更新ribbon的操作是由PollingServerListUpdater完成的,那么

1,自定义自己的PollingServerListUpdater类替换以前的
2,复写里面的start方法,定义一个监听器来监听nacos服务的改变
2,监听到服务改变后,执行ribbon服务更新

定义MyPollingServerListUpdater

@Component("ribbonServerListUpdater")public class MyPollingServerListUpdater implements ServerListUpdater {...	@Override    public synchronized void start(final UpdateAction updateAction) {        if (isActive.compareAndSet(false, true)) {            final Runnable wrapperRunnable = new Runnable() {                @Override                public void run() {                    if (!isActive.get()) {                        if (scheduledFuture != null) {                            scheduledFuture.cancel(true);                        }                        return;                    }                    try {                        updateAction.doUpdate();                        lastUpdated = System.currentTimeMillis();                    } catch (Exception e) {                        System.out.println("错误");                    }                }            };            NacosServiceListWatcher nacosServiceListWatcher = new NacosServiceListWatcher(updateAction);            try {                nacosServiceListWatcher.startWatch(getServerName(updateAction));            } catch (Exception e) {                e.printStackTrace();            }            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(                    wrapperRunnable,                    initialDelayMs,                    refreshIntervalMs,                    TimeUnit.MILLISECONDS            );        } else {            System.out.println("业务操作");        }    }}	//获取服务名    private String getServerName(UpdateAction updateAction) throws Exception {        Class<? extends UpdateAction> aClass = updateAction.getClass();        Field field = aClass.getDeclaredField("this$0");        field.setAccessible(true);        ZoneAwareLoadBalancer zoneAwareLoadBalancer = (ZoneAwareLoadBalancer ) field.get(updateAction);        return zoneAwareLoadBalancer.getName();    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

定义监听器

public class NacosServiceListWatcher {    private static NacosDiscoveryProperties nacosDiscoveryProperties = (NacosDiscoveryProperties)ApplicationContextUtils.getBeanByClassName(NacosDiscoveryProperties.class);    private ServerListUpdater.UpdateAction updateAction;    public NacosServiceListWatcher(ServerListUpdater.UpdateAction updateAction){        this.updateAction = updateAction;    }    public void startWatch(String serviceName) throws NacosException {        nacosDiscoveryProperties.namingServiceInstance().subscribe(serviceName, nacosDiscoveryProperties.getGroup(), event -> {            if(updateAction != null){                updateAction.doUpdate();            }            NamingEvent event1 = (NamingEvent) event;            List<Instance> instances = event1.getInstances();            String name = event1.getServiceName();            if(instances != null && !instances.isEmpty()){                instances.stream().forEach(instance -> {                    System.out.println("服务"+name+":"+instance);                });            }else {                System.out.println("服务"+name+"列表为空");            }        });    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

到这里就结束了,有问题希望大家能指出~

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发