企业管理系统定制开发【SpringCloud原理】OpenFeign整合Ribbon来实现负载均衡源码剖析

大家好,企业管理系统定制开发前面我已经剖析了的动企业管理系统定制开发态代理生成原理和Ribbon企业管理系统定制开发的运行原理,企业管理系统定制开发如果小伙伴们对OpenFeign企业管理系统定制开发的动态代理生成原理和Ribbon企业管理系统定制开发的运行原理不熟悉的同学,企业管理系统定制开发可以关注微信公众号 三友的java日记,企业管理系统定制开发可以通过菜单栏查看,企业管理系统定制开发我已经整理好了。企业管理系统定制开发这篇文章来继续剖析SpringCloud组件原理,来看一看OpenFeign企业管理系统定制开发是如何基于Ribbon企业管理系统定制开发来实现负载均衡的,企业管理系统定制开发两组件是如何协同工作的。

一、Feign企业管理系统定制开发动态代理调用实现rpc流程分析

通过Feign企业管理系统定制开发客户端接口的生成原理讲解,我们可以清楚的知道,Feign客户端接口的动态代理生成是基于JDK的动态代理来实现的,那么在所有的方法调用的时候最终都会走InvocationHandler接口的实现,默认就是ReflectiveFeign.FeignInvocationHandler,那我们接下来就来看看,FeignInvocationHandler是如何实现rpc调用的。

FeignInvocationHandler对于invoke方法的实现。

  1. private final Map<Method, MethodHandler> dispatch;
  2. @Override
  3. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  4. if ("equals".equals(method.getName())) {
  5. try {
  6. Object otherHandler =
  7. args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
  8. return equals(otherHandler);
  9. } catch (IllegalArgumentException e) {
  10. return false;
  11. }
  12. } else if ("hashCode".equals(method.getName())) {
  13. return hashCode();
  14. } else if ("toString".equals(method.getName())) {
  15. return toString();
  16. }
  17. return dispatch.get(method).invoke(args);
  18. }

前几个if判断很简单,就是判断是不是调用的方法是不是equals,hashCode,toString,因为这些方法的调是不需要走rpc调用的。

接下就是从dispatch获取要调用的方法对应的MethodHandler,然后调用MethodHandler的invoke方法。那MethodHandler是什么时候生成的呢?MethodHandler是在构建动态代理的时候生成的,不清楚的同学可以翻一下OpenFeign那篇文章最后关于生成动态代理的那部分源码。那MethodHandler作用是什么呢?你可以理解为最终rpc的调用都是基于这个MethodHandler来实现的,每个方法都有对应MethodHandler来实现rpc调用,接下来我们就来看一下MethodHandler的invoke方法的实现。

MethodHandler是个接口,有两个实现类,一个是DefaultMethodHandler,这个是处理接口中的默认方法的,另一个是SynchronousMethodHandler,这个是实现rpc调用的方法。接下来我们就看看SynchronousMethodHandler关于invoke方法的实现。

  1. @Override
  2. public Object invoke(Object[] argv) throws Throwable {
  3. RequestTemplate template = buildTemplateFromArgs.create(argv);
  4. Options options = findOptions(argv);
  5. Retryer retryer = this.retryer.clone();
  6. while (true) {
  7. try {
  8. return executeAndDecode(template, options);
  9. } catch (RetryableException e) {
  10. try {
  11. retryer.continueOrPropagate(e);
  12. } catch (RetryableException th) {
  13. Throwable cause = th.getCause();
  14. if (propagationPolicy == UNWRAP && cause != null) {
  15. throw cause;
  16. } else {
  17. throw th;
  18. }
  19. }
  20. if (logLevel != Logger.Level.NONE) {
  21. logger.logRetry(metadata.configKey(), logLevel);
  22. }
  23. continue;
  24. }
  25. }
  26. }

第一行通过方法的参数构建了一个RequestTemplate,RequestTemplate可以看成是组装http请求所需各种参数的封装,比如什么情头,body之类的都放在这里面。

第二行 Options options = findOptions(argv); 这个很有意思,Options主要是封装了发送请求是连接超时时间和读超时时间的配置,findOptions(argv)也就是先从参数里面找有没有Options,没有就返回构造SynchronousMethodHandler的入参时的Options,也就是说,连接超时时间和读超时时间可以从方法入参来传入,不过一般没有人这么玩。

第三行就是搞一个重试的组件,是可以实现重试的,一般不设置。

然后执行到executeAndDecode(template, options),进入这个方法

  1. Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
  2. Request request = targetRequest(template);
  3. if (logLevel != Logger.Level.NONE) {
  4. logger.logRequest(metadata.configKey(), logLevel, request);
  5. }
  6. Response response;
  7. long start = System.nanoTime();
  8. try {
  9. response = client.execute(request, options);
  10. } catch (IOException e) {
  11. if (logLevel != Logger.Level.NONE) {
  12. logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
  13. }
  14. throw errorExecuting(request, e);
  15. }
  16. long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
  17. boolean shouldClose = true;
  18. try {
  19. if (logLevel != Logger.Level.NONE) {
  20. response =
  21. logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime);
  22. }
  23. if (Response.class == metadata.returnType()) {
  24. if (response.body() == null) {
  25. return response;
  26. }
  27. if (response.body().length() == null ||
  28. response.body().length() > MAX_RESPONSE_BUFFER_SIZE) {
  29. shouldClose = false;
  30. return response;
  31. }
  32. // Ensure the response body is disconnected
  33. byte[] bodyData = Util.toByteArray(response.body().asInputStream());
  34. return response.toBuilder().body(bodyData).build();
  35. }
  36. if (response.status() >= 200 && response.status() < 300) {
  37. if (void.class == metadata.returnType()) {
  38. return null;
  39. } else {
  40. Object result = decode(response);
  41. shouldClose = closeAfterDecode;
  42. return result;
  43. }
  44. } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) {
  45. Object result = decode(response);
  46. shouldClose = closeAfterDecode;
  47. return result;
  48. } else {
  49. throw errorDecoder.decode(metadata.configKey(), response);
  50. }
  51. } catch (IOException e) {
  52. if (logLevel != Logger.Level.NONE) {
  53. logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime);
  54. }
  55. throw errorReading(request, response, e);
  56. } finally {
  57. if (shouldClose) {
  58. ensureClosed(response.body());
  59. }
  60. }
  61. }

首先调用了targetRequest方法,贴出源码

  1. Request targetRequest(RequestTemplate template) {
  2. for (RequestInterceptor interceptor : requestInterceptors) {
  3. interceptor.apply(template);
  4. }
  5. return target.apply(template);
  6. }

这个方法会遍历所有的拦截器RequestInterceptor,这是feign的一个扩展点,也就说再发送请求前,你仍然还有机会对请求的内容进行调整,比如说加个请求头,这也是很常见的一种方式,在微服务之间鉴权的时候使用。RequestInterceptor是在构建Feign.Builder的时候传进来的,Feign.Builder的组件都是通过ioc容器获取的,组件又是通过配置类来的,所以你需要的话就可以在配置类中声明RequestInterceptor对象。配置类有不同的优先级,按照自己的需求,可以在其中一个优先级使用,不过一般这种通用的东西,不是某个微服务特有的功能,一般选择在springboot启动中的容器中配置。

执行完targetRequest,回到executeAndDecode之后,会构建出一个Request,Request很好理解,就是一个请求,里面封装了http请求的东西。接下来就会调用Client的execute方法来执行请求,拿到响应,接下来就是基于处理这个响应,将响应数据封装成需要返回的参数,之后返回给调用方。

到这里,我们已经分析出接口的动态代理是如何运行的。其实就是通过每个方法对应的MethodHandler来实现的,MethodHandler主要就是拼接各种参数,组装成一个请求,随后交由Client接口的实现去发送请求。

二、LoadBalancerFeignClient

通过上面分析整个动态代理调用过程可以看出,Client是发送http请求的关键类。那么Client是什么玩意?还记得我在关于OpenFeign动态代理生成的那篇文章中留下的一个疑问么,当Feign客户端在构建动态代理的时候,填充很多组件到Feign.Builder中,其中有个组件就是Client的实现,我们并没有在FeignClientsConfiguration配置类中找到关于Client的对象的声明。不过当时我就提到了,这个组件的实现是要依赖负载均衡的,也就是这个组件是Feign用来整合的入口。

接下来,我们就着重看一下Client的实现,看看Feign是如何通过ribbon实现负载均衡的。

我们先来看一下Feign跟ribbon整合的配置类。

  1. @Import({ HttpClientFeignLoadBalancedConfiguration.class,
  2. OkHttpFeignLoadBalancedConfiguration.class,
  3. DefaultFeignLoadBalancedConfiguration.class })
  4. public class FeignRibbonClientAutoConfiguration {
  5. @Bean
  6. @Primary
  7. @ConditionalOnMissingBean
  8. @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
  9. public CachingSpringLoadBalancerFactory cachingLBClientFactory(
  10. SpringClientFactory factory) {
  11. return new CachingSpringLoadBalancerFactory(factory);
  12. }
  13. @Bean
  14. @Primary
  15. @ConditionalOnMissingBean
  16. @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")
  17. public CachingSpringLoadBalancerFactory retryabeCachingLBClientFactory(
  18. SpringClientFactory factory, LoadBalancedRetryFactory retryFactory) {
  19. return new CachingSpringLoadBalancerFactory(factory, retryFactory);
  20. }
  21. @Bean
  22. @ConditionalOnMissingBean
  23. public Request.Options feignRequestOptions() {
  24. return LoadBalancerFeignClient.DEFAULT_OPTIONS;
  25. }
  26. }

我们来分析一下,首先通过@Impot注解导入了三个配置类。

  • HttpClientFeignLoadBalancedConfiguration:基于HttpClient实现http调用的。
  • OkHttpFeignLoadBalancedConfiguration:基于OkHttp实现http调用的。
  • DefaultFeignLoadBalancedConfiguration:默认的,也就是Feign原生的发送http的实现。

这里我们看一下DefaultFeignLoadBalancedConfiguration配置类,因为默认就是这,HttpClientFeignLoadBalancedConfiguration和OkHttpFeignLoadBalancedConfiguration都需要有引入HttpClient和OkHttp依赖才会有用

  1. @Configuration(proxyBeanMethods = false)
  2. class DefaultFeignLoadBalancedConfiguration {
  3. @Bean
  4. @ConditionalOnMissingBean
  5. public Client feignClient(CachingSpringLoadBalancerFactory cachingFactory,
  6. SpringClientFactory clientFactory) {
  7. return new LoadBalancerFeignClient(new Client.Default(null, null), cachingFactory,
  8. clientFactory);
  9. }
  10. }

这个配置类很简单,声明了LoadBalancerFeignClient到spring容器,传入了三个参数,一个Client的实现,一个CachingSpringLoadBalancerFactory和一个SpringClientFactory。LoadBalancerFeignClient这个类实现了Client接口,也就数说我们在构建Feign.Builder填充的就是这个对象,也就是上面说feign的执行流程最后用来执行请求的Client的实现。

接下来我说一下入参的三个参数是什么意思。

  • Client.Default:就是Feign自己实现的Client,里面封装了真正发送http发送请求的功能,LoadBalancerFeignClient虽然也实现了Client接口,但是这个实现其实是为了整合Ribbon用的,并没有发送http的功能,所以需要有个可以发送http功能的实现。
  • CachingSpringLoadBalancerFactory:后面会说这个类的作用
  • SpringClientFactory:这个跟Feign里面的FeignContext的作用差不多,用来实现配置隔离的,当然,这个也在关于Ribbon的那篇文章有剖析过。

其实大家可以自行去看OkHttpFeignLoadBalancedConfiguration和HttpClientFeignLoadBalancedConfiguration,其实他们配置跟DefaultFeignLoadBalancedConfiguration是一样的,声明的对象都是LoadBalancerFeignClient,只不过将Client.Default换成了基于HttpClient和OkHttp的实现,也就是发送http请求使用的工具不一样。

FeignRibbonClientAutoConfiguration除了导入配置类还声明了CachingSpringLoadBalancerFactory,只不过一种是带基于spring实现的重试功能的,一种是不带的,主要看有没有引入spring重试功能的包,所以上面构建LoadBalancerFeignClient注入的CachingSpringLoadBalancerFactory就是在这声明的。

这里就说完了Feign整合ribbon的配置类FeignRibbonClientAutoConfiguration,我们也找到了构造Feign.Builder的实现LoadBalancerFeignClient,接下来就来剖析LoadBalancerFeignClient的实现。

  1. public class LoadBalancerFeignClient implements Client {
  2. static final Request.Options DEFAULT_OPTIONS = new Request.Options();
  3. private final Client delegate;
  4. private CachingSpringLoadBalancerFactory lbClientFactory;
  5. private SpringClientFactory clientFactory;
  6. public LoadBalancerFeignClient(Client delegate,
  7. CachingSpringLoadBalancerFactory lbClientFactory,
  8. SpringClientFactory clientFactory) {
  9. this.delegate = delegate;
  10. this.lbClientFactory = lbClientFactory;
  11. this.clientFactory = clientFactory;
  12. }
  13. static URI cleanUrl(String originalUrl, String host) {
  14. String newUrl = originalUrl;
  15. if (originalUrl.startsWith("https://")) {
  16. newUrl = originalUrl.substring(0, 8)
  17. + originalUrl.substring(8 + host.length());
  18. }
  19. else if (originalUrl.startsWith("http")) {
  20. newUrl = originalUrl.substring(0, 7)
  21. + originalUrl.substring(7 + host.length());
  22. }
  23. StringBuffer buffer = new StringBuffer(newUrl);
  24. if ((newUrl.startsWith("https://") && newUrl.length() == 8)
  25. || (newUrl.startsWith("http://") && newUrl.length() == 7)) {
  26. buffer.append("/");
  27. }
  28. return URI.create(buffer.toString());
  29. }
  30. @Override
  31. public Response execute(Request request, Request.Options options) throws IOException {
  32. try {
  33. URI asUri = URI.create(request.url());
  34. String clientName = asUri.getHost();
  35. URI uriWithoutHost = cleanUrl(request.url(), clientName);
  36. FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
  37. this.delegate, request, uriWithoutHost);
  38. IClientConfig requestConfig = getClientConfig(options, clientName);
  39. return lbClient(clientName)
  40. .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
  41. }
  42. catch (ClientException e) {
  43. IOException io = findIOException(e);
  44. if (io != null) {
  45. throw io;
  46. }
  47. throw new RuntimeException(e);
  48. }
  49. }
  50. IClientConfig getClientConfig(Request.Options options, String clientName) {
  51. IClientConfig requestConfig;
  52. if (options == DEFAULT_OPTIONS) {
  53. requestConfig = this.clientFactory.getClientConfig(clientName);
  54. }
  55. else {
  56. requestConfig = new FeignOptionsClientConfig(options);
  57. }
  58. return requestConfig;
  59. }
  60. protected IOException findIOException(Throwable t) {
  61. if (t == null) {
  62. return null;
  63. }
  64. if (t instanceof IOException) {
  65. return (IOException) t;
  66. }
  67. return findIOException(t.getCause());
  68. }
  69. public Client getDelegate() {
  70. return this.delegate;
  71. }
  72. private FeignLoadBalancer lbClient(String clientName) {
  73. return this.lbClientFactory.create(clientName);
  74. }
  75. static class FeignOptionsClientConfig extends DefaultClientConfigImpl {
  76. FeignOptionsClientConfig(Request.Options options) {
  77. setProperty(CommonClientConfigKey.ConnectTimeout,
  78. options.connectTimeoutMillis());
  79. setProperty(CommonClientConfigKey.ReadTimeout, options.readTimeoutMillis());
  80. }
  81. @Override
  82. public void loadProperties(String clientName) {
  83. }
  84. @Override
  85. public void loadDefaultValues() {
  86. }
  87. }
  88. }

在动态代理调用的那里我们得出一个结论,那就是最后会调用Client接口的execute方法的实现,所以我们就看一下execute方法的实现,这里就是一堆操作,从请求的URL中拿到了clientName,也就是服务名。

为什么可以拿到服务名?

其实很简单,OpenFeign构建动态代理的时候,传入了一个HardCodedTarget,当时说在构建HardCodedTarget的时候传入了一个url,那个url当时说了其实就是http://服务名,所以到这里,虽然有具体的请求接口的路径,但是还是类似 http://服务名/api/sayHello这种,所以可以通过路径拿到你锁请求的服务名。

拿到服务名之后,再拿到了一个配置类IClientConfig,最后调用lbClient,我们看一下lbClient的方法实现。

  1. private FeignLoadBalancer lbClient(String clientName) {
  2. return this.lbClientFactory.create(clientName);
  3. }

就是调用CachingSpringLoadBalancerFactory的create方法

  1. public FeignLoadBalancer create(String clientName) {
  2. FeignLoadBalancer client = this.cache.get(clientName);
  3. if (client != null) {
  4. return client;
  5. }
  6. IClientConfig config = this.factory.getClientConfig(clientName);
  7. ILoadBalancer lb = this.factory.getLoadBalancer(clientName);
  8. ServerIntrospector serverIntrospector = this.factory.getInstance(clientName,
  9. ServerIntrospector.class);
  10. client = this.loadBalancedRetryFactory != null
  11. ? new RetryableFeignLoadBalancer(lb, config, serverIntrospector,
  12. this.loadBalancedRetryFactory)
  13. : new FeignLoadBalancer(lb, config, serverIntrospector);
  14. this.cache.put(clientName, client);
  15. return client;
  16. }

这个方法先根据服务名从缓存中获取一个FeignLoadBalancer,获取不到就创建一个。

创建的过程就是从每个服务对应的容器中获取到IClientConfig和ILoadBalancer。Ribbon那篇文章都讲过这些核心类,这里不再赘述。

默认就是创建不带spring重试功能的FeignLoadBalancer,放入缓存,最后返回这个FeignLoadBalancer。所以第一次来肯定没有,需要构建,也就是最终一定会返回FeignLoadBalancer,所以我们通过lbClient方法拿到的是FeignLoadBalancer。从这里可以看出CachingSpringLoadBalancerFactory是构建FeignLoadBalancer的工厂类,只不过先从缓存中查找,找不到再创建FeignLoadBalancer。

拿到FeignLoadBalancer之后就会调用executeWithLoadBalancer,接收到Response之后直接返回。

三、FeignLoadBalancer

那么这个FeignLoadBalancer又是啥呢?这里放上FeignLoadBalancer核心源码。

  1. public class FeignLoadBalancer extends
  2. AbstractLoadBalancerAwareClient<FeignLoadBalancer.RibbonRequest, FeignLoadBalancer.RibbonResponse> {
  3. private final RibbonProperties ribbon;
  4. protected int connectTimeout;
  5. protected int readTimeout;
  6. protected IClientConfig clientConfig;
  7. protected ServerIntrospector serverIntrospector;
  8. public FeignLoadBalancer(ILoadBalancer lb, IClientConfig clientConfig,
  9. ServerIntrospector serverIntrospector) {
  10. super(lb, clientConfig);
  11. this.setRetryHandler(RetryHandler.DEFAULT);
  12. this.clientConfig = clientConfig;
  13. this.ribbon = RibbonProperties.from(clientConfig);
  14. RibbonProperties ribbon = this.ribbon;
  15. this.connectTimeout = ribbon.getConnectTimeout();
  16. this.readTimeout = ribbon.getReadTimeout();
  17. this.serverIntrospector = serverIntrospector;
  18. }
  19. @Override
  20. public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)
  21. throws IOException {
  22. Request.Options options;
  23. if (configOverride != null) {
  24. RibbonProperties override = RibbonProperties.from(configOverride);
  25. options = new Request.Options(override.connectTimeout(this.connectTimeout),
  26. override.readTimeout(this.readTimeout));
  27. }
  28. else {
  29. options = new Request.Options(this.connectTimeout, this.readTimeout);
  30. }
  31. Response response = request.client().execute(request.toRequest(), options);
  32. return new RibbonResponse(request.getUri(), response);
  33. }
  34. }FeignLoadBalancer继承自AbstractLoadBalancerAwareClient,AbstractLoadBalancerAwareClient又是啥玩意?看过我写的关于Ribbon核心组件已经运行原理的那篇文章小伙伴肯定知道,AbstractLoadBalancerAwareClient类主要作用是通过ILoadBalancer组件获取一个Server,然后基于这个Server重构了URI,也就是将你的请求路径http://服务名/api/sayHello转换成类似http://192.168.1.101:8088/api/sayHello这种路径,也就是将原服务名替换成服务所在的某一台机器ip和端口,替换之后就交由子类实现的exceut方法来发送http请求。

所以我们知道调用executeWithLoadBalancer之后,就会重构请求路径,将服务名替换成某个具体的服务器所在的ip和端口,之后交给子类execute来处理,对于这里来说,也就是FeignLoadBalancer的execute方法,因为FeignLoadBalancer继承AbstractLoadBalancerAwareClient。

直接定位到execute方法最核心的一行代码

Response response = request.client().execute(request.toRequest(), options);

request.client()就会拿到构建LoadBalancerFeignClient传入的那个Client的实现,我提到过,这个Client的实现是具体发送请求的实现,默认的就是Client.Default类(不是默认就有可能是基于HttpClient或者是OkHttp的实现)。所以这行代码就是基于这个Client就成功的发送了Http请求,拿到响应,然后将这个Response 封装成一个RibbonResponse返回,最后就返回给MethodHandler,然后解析响应,封装成方法的返回值返回给调用者。

好了,其实到这里就完全知道Feign是如何整合Ribbon的,LoadBalancerFeignClient其实是OpenFeign适配Ribbon的入口,FeignLoadBalancer才是真正实现选择负载均衡,发送http请求的组件,因为他继承了AbstractLoadBalancerAwareClient。

为了大家能够清楚的知道整个动态代理的调用过程,我在Ribbon的那张图的基础上,加上Feign的调用链路。

通过这张图,我们可以清楚地看出OpenFeign、Ribbon以及注册中心之间的协同关系。

四、总结

到这里,我通过三篇文章,算上Nacos那两篇,总共五篇文章完整的讲述了在微服务架构中,OpenFeign、Ribbon、Nacos(当然其它注册中心也可以)这三个组件协同工作的核心源码和流程。这里我再用简洁的话来总结一下他们的协同工作原理,OpenFeign在进行rpc调用的时候,由于不知道服务具体在哪台机器上,所以需要Ribbon这个负载均衡组件从服务所在的机器列表中选择一个,Ribbon中服务所在的机器列表是从注册中心拉取的,Ribbon提供了一个ServerList接口,注册中心实现之后,Ribbon就可以获取到服务所在的机器列表,这就是这三个组件最基本的原理。希望通过这五篇文章,小伙伴们可以对微服务架构的最基本的原理有一定的了解,同时也对OpenFeign、Ribbon、Nacos源码有一定的认识。

以上就是本篇文章的全部内容,如果你有什么不懂或者想要交流的地方,可以关注我的个人的微信公众号 三友的java日记 或者添加我的微信 ZZYNKXJH 联系我,我们下篇文章再见。

如果觉得这篇文章对你有所帮助,还请帮忙点赞、在看、转发一下,码字不易,非常感谢!

 

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发