软件开发定制定制Ribbon从入门到源码解析

目录


1、简介

软件开发定制定制在微服务架构中,软件开发定制定制服务拆分成一个个的微服务,软件开发定制定制并且以集群化的方式进行部署;软件开发定制定制此时服务与服务之间的软件开发定制定制调用变得复杂了起来,软件开发定制定制客户端需要自主选择调软件开发定制定制用服务端集群中的某个服务,软件开发定制定制这就是我们经常说到的软件开发定制定制客户端负载均衡,在Spring Cloud软件开发定制定制生态中使用的比较广泛的技术是。

2、案例

软件开发定制定制无论是使用Fegin还是RestTemplate软件开发定制定制发起服务调用,软件开发定制定制客户端负载均衡均是通过Ribbon来实现,这里使用RestTemplate演示案例。

2.1 搭建服务EurekaServer

  • pom依赖

  1. <dependency>
  2.     <groupId>org.springframework.cloud</groupId>
  3.     <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
  4. </dependency>
  • application.yml

  1. server:
  2.   port: 18888
  3. spring:
  4.   application:
  5.     name: eurekaServer
  6. eureka:
  7.   client:
  8. #    fetch-registry: false
  9. #    register-with-eureka: false
  10.     service-url:
  11.       defaultZone: http://127.0.0.1:18888/eureka
  • 启动类

  1. @EnableEurekaServer
  2. @SpringBootApplication
  3. public class EurekaServerApplication {
  4.     public static void main(String[] args) {
  5.         SpringApplication.run(EurekaServerApplication.class, args);
  6.     }
  7. }

2.2 搭建order-service服务

  • pom依赖

  1. <!--web-->
  2. <dependency>
  3.       <groupId>org.springframework.boot</groupId>
  4.       <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!--EurekaClient-->
  7. <dependency>
  8.       <groupId>org.springframework.cloud</groupId>
  9.       <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  10. </dependency>
  • application.yml

  1. # server port
  2. server:
  3.   port: 18070
  4. # name
  5. spring:
  6.   application:
  7.     name: order-service
  8. # eureka server
  9. eureka:
  10.   client:
  11.     service-url:
  12.       defaultZone: http://127.0.0.1:18888/eureka
  • 模拟业务代码

  1. @RestController
  2. @RequestMapping("order")
  3. public class OrderController {
  4.     
  5.     @Autowired
  6.     private OrderService orderService;
  7.     
  8.     @GetMapping("{orderId}")
  9.     public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {
  10.         // 根据id查询订单并返回
  11.         return orderService.queryOrderById(orderId);
  12.     }
  13. }
  1. @Service
  2. public class OrderService {
  3.     @Autowired
  4.     private RestTemplate restTemplate;
  5.     @Autowired
  6.     private OrderMapper orderMapper;
  7.     public Order queryOrderById(Long orderId) {
  8.         // 1.查询订单
  9.         Order order = orderMapper.findById(orderId);
  10.         // 2、查询用户信息
  11.         if (Objects.nonNull(order)) {
  12.             String url = String.format("http://user-service/user/%s", order.getUserId());
  13.             User user = restTemplate.getForObject(url, User.class);
  14.             // 3、封装用户信息
  15.             order.setUser(user);
  16.         }
  17.         // 4.返回
  18.         return order;
  19.     }
  20. }
  • 启动类中注入RestTemplate并开启负载均衡

  1. @MapperScan("com.lzb.order.mapper")
  2. @SpringBootApplication
  3. @EnableEurekaClient
  4. public class OrderApplication {
  5.     public static void main(String[] args) {
  6.         SpringApplication.run(OrderApplication.class, args);
  7.     }
  8.     /**
  9.      * RestTemplate bean容器的注入
  10.      * LoadBalanced 负载均衡注解
  11.      * @return
  12.      */
  13.     @Bean
  14.     @LoadBalanced
  15.     public RestTemplate restTemplate() {
  16.         return new RestTemplate();
  17.     }
  18. }

2.3 搭建user-service服务

  • pom依赖

  1. <!--web-->
  2. <dependency>
  3.       <groupId>org.springframework.boot</groupId>
  4.       <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <!--EurekaClient-->
  7. <dependency>
  8.       <groupId>org.springframework.cloud</groupId>
  9.       <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
  10. </dependency>
  • application.yml

  1. # server port
  2. server:
  3.   port: 18080
  4. # name
  5. spring:
  6.   application:
  7.     name: user-service
  8. # eureka server
  9. eureka:
  10.   client:
  11.     service-url:
  12.       defaultZone: http://127.0.0.1:18888/eureka
  • 模拟业务代码

  1. @RestController
  2. @RequestMapping("/user")
  3. public class UserController {
  4.     @Autowired
  5.     private UserService userService;
  6.     @GetMapping("/{id}")
  7.     public User queryById(@PathVariable("id") Long id) {
  8.         return userService.queryById(id);
  9.     }
  10. }
  • 启动类

  1. @MapperScan("com.lzb.user.mapper")
  2. @SpringBootApplication
  3. @EnableEurekaClient
  4. public class UserApplication {
  5.     public static void main(String[] args) {
  6.         SpringApplication.run(UserApplication.class, args);
  7.     }
  8. }

2.4 服务启动

在上述服务搭建之后,可以看出order-service服务调用了user-service服务,因此我将user-service服务集群部署,并且在order-service注入了RestTemplate且标注了LoadBalanced注解;启动顺序如下所示:

  • 启动EurekaServer

  • 启动user-service

  • 启动user-service2

  • 启动order-service

关于IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:

  • 首先启动该服务,直至服务启动成功

  • 右键启动的服务,选择Copy Configuration

  • Edit Configuration中修改服务Name;传入端口参数,在Environment中的VM options键入-Dserver.port=xxxx;点击Apply;点击OK即可;

  • 启动服务,右上角选择刚刚编辑的服务信息,DEBUG启动即可。

  • 服务启动后Eureka Server中服务注册信息如下所示

2.5 测试结果

清空user-service和user-service2的控制台日志,在浏览器中请求四次order-service,order-service中会通过RestTemplate调用order-service,由于RestTemplate使用了LoadBlanced注解修饰,因此Ribbon托管了RestTemplate,在发起调用之前会解析服务名获取服务Ip和port,然后根据选择服务进行调用!

可以在console打印的日志中看出,第一次请求大到了user-service,第二次请求打到了user-service1,第三次请求大到了user-service,第四次请求打到了user-service1

3、Ribbon如何实现负载均衡

可以试想一下,如果是你本人去实现一个Ribbon的功能你会怎么做?我想大家的思路应该都差不多如下:

  • 拦截Http请求

  • 解析请求中的服务名

  • 在Eureka Client拉取的Eureka Server中注册的可用服务信息中,根据服务名获取服务IP和Port信息

  • 根据负载均衡策略选择服务提供者发起http请求

3.1 拦截http请求

在springboot中常用的拦截器有三个:

  • org.springframework.web.servlet.HandlerInterceptor

  • org.springframework.http.client.ClientHttpRequestInterceptor

  • feign.RequestInterceptor

三者均是对http请求进行拦截,但是3个拦截器应用的项目不同,HandlerInterceptor主要是处理http servlet请求;ClientHttpRequestInterceptor主要是处理HttpTemplate请求或者Ribbon请求;RequestInterceptor用于处理Fegin请求,Fegin本质上是http请求;因此很明显,Ribbon实现的是ClientHttpRequestInterceptor拦截器。

3.2 解析请求中的服务名

org.springframework.http.client.ClientHttpRequestInterceptor接口中只有一个方法intercept(),其子类均会重写该方法org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。并且在浏览器中访问order-service,order-service中会使用RestTemplate请求user-service

此时可以看到request.getURI()得到的是 通过final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名

3.3 根据服务名获取服务IP和Port信息

在org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor类中重写的intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装http请求,不是很重要,最终的是org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient类中execute()方法。

此处的serviceId即为服务名user-service,this.getLoadBalancer(serviceId);会根据服务名从eureka中解析中对应的服务地址和端口。 this.getLoadBalancer(serviceId)方法调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory类中的getLoadBalancer()方法,随后调用了org.springframework.cloud.netflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是spring 容器中的ILoadBalancer.class实现类com.netflix.loadbalancer.DynamicServerListLoadBalancer实例。 那现在还有最后一个问题,DynamicServerListLoadBalancer实例中的服务信息是怎么来的呢?这里其实是Eureka Clinet从Eureka Server中拉取的服务列表。

3.4 根据负载均衡策略发起http请求

最后一步就是根据负载均衡策略选择服务提供者发起http请求,负载均衡策略的选择在com.netflix.loadbalancer.ZoneAwareLoadBalancer的chooseServer()方法中实现。在选择发起请求的服务之后执行org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient中的execute()方法即完成整个Ribbon负载均衡过程。

4、简单源码解析

在Ribbon整个源码体系中,ILoadBalancer接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。

4.1 ILoadBalancer

com.netflix.loadbalancer.ILoadBalancer是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。

方法作用如下所示:

方法名作用
addServers1、服务器列表初始化
2、添加新的服务
chooseServer从负载均衡器中选择服务器
markServerDown负载均衡客户端主动通知下机,否则不可用的服务将会存活到下一个ping周期
getServerList@Deprecated
getReachableServers获取能正常访问的服务器
getAllServers获取所有已知的服务器,包括可访问的和不可访问的

4.2 AbstractLoadBalancer

com.netflix.loadbalancer.AbstractLoadBalancer是一个抽象类,它实现了com.netflix.loadbalancer.ILoadBalancer接口;其源码非常少,如下所示:

  1. public abstract class AbstractLoadBalancer implements ILoadBalancer {
  2.     
  3.     public enum ServerGroup{
  4.         ALL,
  5.         STATUS_UP,
  6.         STATUS_NOT_UP        
  7.     }
  8.     public Server chooseServer() {
  9.      return chooseServer(null);
  10.     }
  11.     public abstract List<Server> getServerList(ServerGroup serverGroup);
  12.     public abstract LoadBalancerStats getLoadBalancerStats();    
  13. }

AbstractLoadBalancer抽象类中定义类一个ServerGroup内部枚举类,ServerGroup用于标志服务实例的分组类型:

  • ALL 表示所有服务

  • STATUS_UP 表示正常服务

  • STATUS_NOT_UP 表示下线服务

4.3 BaseLoadBalancer

com.netflix.loadbalancer.BaseLoadBalancer类继承了com.netflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer类源码比较复杂,但是有几个点是比较重要的。

  • allServerList 用于保存所有服务实例

  • upServerList用于保存所有在线服务实例

  1. @Monitor(name = PREFIX + "AllServerList"type = DataSourceType.INFORMATIONAL)
  2. protected volatile List<Server> allServerList = Collections
  3.         .synchronizedList(new ArrayList<Server>());
  4. @Monitor(name = PREFIX + "UpServerList"type = DataSourceType.INFORMATIONAL)
  5. protected volatile List<Server> upServerList = Collections
  6.         .synchronizedList(new ArrayList<Server>());
  • 定义负载均衡默认策略为轮询

  1. private final static IRule DEFAULT_RULE = new RoundRobinRule(); 
  2. protected IRule rule = DEFAULT_RULE;
  • IPingStrategy表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为SerialPingStrategy,SerialPingStrategy中的pingServers方法就是遍历所有服务实例,一个个发送ping请求,查看服务是否有效。

  1. private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
  2. protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
  • BaseLoadBalancer构造函数中启动了一个PingTask,PingTask每隔10秒钟会ping一次服务列表中的服务是否可用,PingTask中干的事情就是pingStrategy服务检查策略。

  1. protected int pingIntervalSeconds = 10;
  2. public BaseLoadBalancer() {
  3.     this.name = DEFAULT_NAME;
  4.     this.ping = null;
  5.     setRule(DEFAULT_RULE);
  6.     setupPingTask();
  7.     lbStats = new LoadBalancerStats(DEFAULT_NAME);
  8. }
  9. void setupPingTask() {
  10.     if (canSkipPing()) {
  11.         return;
  12.     }
  13.     if (lbTimer != null) {
  14.         lbTimer.cancel();
  15.     }
  16.     lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
  17.             true);
  18.     lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
  19.     forceQuickPing();
  20. }

4.4 DynamicServerListLoadBalancer

com.netflix.loadbalancer.DynamicServerListLoadBalancer类继承了com.netflix.loadbalancer.BaseLoadBalancer,因此DynamicServerListLoadBalancer类主要是对BaseLoadBalancer类功能进行扩展,DynamicServerListLoadBalancer类源码比较复杂,但是有几个点是比较重要的。

  • serverListImpl是DynamicServerListLoadBalancer中声明的ServerList类型的变量,ServerList接口中定义了两个方法

  1. volatile ServerList<T> serverListImpl;
  • getInitialListOfServers方法用于获取所有初始化服务列表

  • getUpdatedListOfServers方法用于获取更新的服务实例列表

  1. public interface ServerList<T extends Server> {
  2.     public List<T> getInitialListOfServers();
  3.     
  4.     public List<T> getUpdatedListOfServers();   
  5. }
  • ServerList接口有5个实现类,DynamicServerListLoadBalancer默认实现是DomainExtractingServerList,但是DomainExtractingServerList构造函数中传入的是DiscoveryEnabledNIWSServerList(可以看我下面Debug的图),因此可以看出重点类其实是DiscoveryEnabledNIWSServerList

  • DiscoveryEnabledNIWSServerList类中一个比较重要的方法是obtainServersViaDiscovery方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient从服务注册中心中获取具体的服务实例InstanceInfo

  1. private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
  2.         List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
  3.         if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
  4.             logger.warn("EurekaClient has not been initialized yet, returning an empty list");
  5.             return new ArrayList<DiscoveryEnabledServer>();
  6.         }
  7.         EurekaClient eurekaClient = eurekaClientProvider.get();
  8.         if (vipAddresses!=null){
  9.             for (String vipAddress : vipAddresses.split(",")) {
  10.                 // if targetRegion is null, it will be interpreted as the same region of client
  11.                 List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
  12.                 for (InstanceInfo ii : listOfInstanceInfo) {
  13.                     if (ii.getStatus().equals(InstanceStatus.UP)) {
  14.                         if(shouldUseOverridePort){
  15.                             if(logger.isDebugEnabled()){
  16.                                 logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
  17.                             }
  18.                             // copy is necessary since the InstanceInfo builder just uses the original reference,
  19.                             // and we don't want to corrupt the global eureka copy of the object which may be
  20.                             // used by other clients in our system
  21.                             InstanceInfo copy = new InstanceInfo(ii);
  22.                             if(isSecure){
  23.                                 ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
  24.                             }else{
  25.                                 ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
  26.                             }
  27.                         }
  28.                         DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
  29.                         serverList.add(des);
  30.                     }
  31.                 }
  32.                 if (serverList.size()>0 && prioritizeVipAddressBasedServers){
  33.                     break// if the current vipAddress has servers, we dont use subsequent vipAddress based servers
  34.                 }
  35.             }
  36.         }
  37.         return serverList;
  38.     }
  • DiscoveryEnabledNIWSServerList类中另一个比较重要点是定义了一个ServerListUpdater.UpdateAction更新器,该更新器用于更新服务信息。ServerListUpdater提供两个实现类com.netflix.niws.loadbalancer.EurekaNotificationServerListUpdater和com.netflix.loadbalancer.PollingServerListUpdater;其中EurekaNotificationServerListUpdater通过Eureka的事件监听机制来更新服务信息;而此处默认的是PollingServerListUpdater定时任务更新机制。

  • PollingServerListUpdater代码中可以看出定时任务延迟启动initialDelayMs为1秒,刷新频率refreshIntervalMs为30秒

  1. private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000// msecs;
  2. private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000// msecs;  
  3. public PollingServerListUpdater() {
  4.     this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
  5. }
  6. public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {
  7.     this.initialDelayMs = initialDelayMs;
  8.     this.refreshIntervalMs = refreshIntervalMs;
  9. }
  10. public synchronized void start(final UpdateAction updateAction) {
  11.     if (isActive.compareAndSet(falsetrue)) {
  12.         final Runnable wrapperRunnable = new Runnable() {
  13.             @Override
  14.             public void run() {
  15.                 if (!isActive.get()) {
  16.                     if (scheduledFuture != null) {
  17.                         scheduledFuture.cancel(true);
  18.                     }
  19.                     return;
  20.                 }
  21.                 try {
  22.                     updateAction.doUpdate();
  23.                     lastUpdated = System.currentTimeMillis();
  24.                 } catch (Exception e) {
  25.                     logger.warn("Failed one update cycle", e);
  26.                 }
  27.             }
  28.         };
  29.         scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
  30.                 wrapperRunnable,
  31.                 initialDelayMs,
  32.                 refreshIntervalMs,
  33.                 TimeUnit.MILLISECONDS
  34.         );
  35.     } else {
  36.         logger.info("Already active, no-op");
  37.     }
  38. }
  • 在DynamicServerListLoadBalancer定义了一个变量ServerListFilter,可以看到在updateListOfServers方法中,会判断filter是否为空,然后对getUpdatedListOfServers获取到的服务列表servers执行getFilteredListOfServers方法,其实就是对服务列表根据ServerListFilter接口的实现类逻辑进行过滤。

  1. volatile ServerListFilter<T> filter;
  2. public void updateListOfServers() {
  3.     List<T> servers = new ArrayList<T>();
  4.     if (serverListImpl != null) {
  5.         servers = serverListImpl.getUpdatedListOfServers();
  6.         LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
  7.                 getIdentifier(), servers);
  8.         if (filter != null) {
  9.             servers = filter.getFilteredListOfServers(servers);
  10.             LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
  11.                     getIdentifier(), servers);
  12.         }
  13.     }
  14.     updateAllServerList(servers);
  15. }
  • ServerListFilter的实现类如下所示,默认的实现类是DefaultNIWSServerListFilter,但是DefaultNIWSServerListFilter啥也没有,仅仅继承了ZoneAffinityServerListFilter;因此具体的功能还是在ZoneAffinityServerListFilter中实现,而ZoneAffinityServerListFilter主要提供的是对服务提供者所处的Zone和服务消费者所在的Zone进行比较,过滤掉不在一个Zone的实例。

4.5 ZoneAwareLoadBalancer

com.netflix.loadbalancer.ZoneAwareLoadBalancer是com.netflix.loadbalancer.DynamicServerListLoadBalancer的唯一子类,在DynamicServerListLoadBalancer中还有一个非常重要的方法没有实现,那就是chooseServer方法。chooseServer用于负载均衡器选择服务器进行调用,因此ZoneAwareLoadBalancer的出现就是解决这个问题。此外ZoneAwareLoadBalancer重写了setServerListForZones方法,setServerListForZones方法getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含server的zone

  1. protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {
  2.     super.setServerListForZones(zoneServersMap);
  3.     if (balancers == null) {
  4.         balancers = new ConcurrentHashMap<StringBaseLoadBalancer>();
  5.     }
  6.     for (Map.Entry<StringList<Server>> entry: zoneServersMap.entrySet()) {
  7.         String zone = entry.getKey().toLowerCase();
  8.         getLoadBalancer(zone).setServersList(entry.getValue());
  9.     }
  10.     // check if there is any zone that no longer has a server
  11.     // and set the list to empty so that the zone related metrics does not
  12.     // contain stale data
  13.     for (Map.Entry<StringBaseLoadBalancerexistingLBEntry: balancers.entrySet()) {
  14.         if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
  15.             existingLBEntry.getValue().setServersList(Collections.emptyList());
  16.         }
  17.     }

 👇🏻 关注公众号 我们一起进大厂👇🏻     

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发