专业的编程技术博客社区

网站首页 > 博客文章 正文

深入理解Spring Cloud一(14)服务发现

baijin 2024-09-20 12:33:16 博客文章 3 ℃ 0 评论

本文介绍Feign调用过程中的服务发现,及调用过程

一、@EnableFeignClients

@Import(FeignClientsRegistrar.class)
public @interface EnableFeignClients

导入FeignClientsRegistrar类,FeignClientsRegistrar实现了ImportBeanDefinitionRegistrar接口。在Context创建过程中,会被ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars方法触发调用。

private void loadBeanDefinitionsFromRegistrars(Map<ImportBeanDefinitionRegistrar, AnnotationMetadata> registrars) {
   registrars.forEach((registrar, metadata) ->
         registrar.registerBeanDefinitions(metadata, this.registry, this.importBeanNameGenerator));
}

二、FeignClientsRegistrar

核心作用是找到有@FeignClient注解的类,创建FeignClientFactoryBean类型的BeanDefinition。

@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
      BeanDefinitionRegistry registry) {
   registerDefaultConfiguration(metadata, registry);
   registerFeignClients(metadata, registry);
}
private void registerFeignClient(BeanDefinitionRegistry registry,
      AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
   String className = annotationMetadata.getClassName();
   BeanDefinitionBuilder definition = BeanDefinitionBuilder
         .genericBeanDefinition(FeignClientFactoryBean.class);
   definition.addPropertyValue("url", getUrl(attributes));
   definition.addPropertyValue("path", getPath(attributes));
   String name = getName(attributes);
   definition.addPropertyValue("name", name);
   String contextId = getContextId(attributes);
   definition.addPropertyValue("contextId", contextId);
   definition.addPropertyValue("type", className);
 
   BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
         new String[] { alias });
   BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}

FeignClientFactoryBean是一个FactoryBean,我们看一下它的getObject方法。

@Override
public Object getObject() throws Exception {
   return getTarget();
}
<T> T getTarget() {
   FeignContext context = applicationContext.getBean(FeignContext.class);
   Feign.Builder builder = feign(context);

   if (!StringUtils.hasText(url)) {
      if (!name.startsWith("http")) {
         url = "http://" + name;
      }
      else {
         url = name;
      }
      url += cleanPath();
      return (T) loadBalance(builder, context,
            new HardCodedTarget<>(type, name, url));
   }
   if (StringUtils.hasText(url) && !url.startsWith("http")) {
      url = "http://" + url;
   }
   String url = this.url + cleanPath();
   Client client = getOptional(context, Client.class);
   if (client != null) {
      if (client instanceof LoadBalancerFeignClient) {
         // not load balancing because we have a url,
         // but ribbon is on the classpath, so unwrap
         client = ((LoadBalancerFeignClient) client).getDelegate();
      }
      if (client instanceof FeignBlockingLoadBalancerClient) {
         // not load balancing because we have a url,
         // but Spring Cloud LoadBalancer is on the classpath, so unwrap
         client = ((FeignBlockingLoadBalancerClient) client).getDelegate();
      }
      builder.client(client);
   }
   Targeter targeter = get(context, Targeter.class);
   return (T) targeter.target(this, builder, context,
         new HardCodedTarget<>(type, name, url));
}

通过targeter.target获取对象。

class DefaultTargeter implements Targeter {

   @Override
   public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign,
         FeignContext context, Target.HardCodedTarget<T> target) {
      return feign.target(target);
   }

}

最终我们找到通过ReflectiveFeign.newInstance(target)创建对象。

public <T> T target(Target<T> target) {
  return build().newInstance(target);
}

public Feign build() {
  Client client = Capability.enrich(this.client, capabilities);
  Retryer retryer = Capability.enrich(this.retryer, capabilities);
  List<RequestInterceptor> requestInterceptors = this.requestInterceptors.stream()
      .map(ri -> Capability.enrich(ri, capabilities))
      .collect(Collectors.toList());
  Logger logger = Capability.enrich(this.logger, capabilities);
  Contract contract = Capability.enrich(this.contract, capabilities);
  Options options = Capability.enrich(this.options, capabilities);
  Encoder encoder = Capability.enrich(this.encoder, capabilities);
  Decoder decoder = Capability.enrich(this.decoder, capabilities);
  InvocationHandlerFactory invocationHandlerFactory =
      Capability.enrich(this.invocationHandlerFactory, capabilities);
  QueryMapEncoder queryMapEncoder = Capability.enrich(this.queryMapEncoder, capabilities);

  SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
      new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
          logLevel, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
  ParseHandlersByName handlersByName =
      new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
          errorDecoder, synchronousMethodHandlerFactory);
  return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}

三、ReflectiveFeign

newInstance方法创建了一个代理对象。其中InvocationHandler对象是关键,因为代理对象的放到调用最终都是InvocationHandler来处理。

@Override
public <T> T newInstance(Target<T> target) {
  Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
  Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
  List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();

  for (Method method : target.type().getMethods()) {
    if (method.getDeclaringClass() == Object.class) {
      continue;
    } else if (Util.isDefault(method)) {
      DefaultMethodHandler handler = new DefaultMethodHandler(method);
      defaultMethodHandlers.add(handler);
      methodToHandler.put(method, handler);
    } else {
      methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
    }
  }
  InvocationHandler handler = factory.create(target, methodToHandler);
  T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
      new Class<?>[] {target.type()}, handler);

  for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
    defaultMethodHandler.bindTo(proxy);
  }
  return proxy;
}
static final class Default implements InvocationHandlerFactory {

  @Override
  public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) {
    return new ReflectiveFeign.FeignInvocationHandler(target, dispatch);
  }
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  if ("equals".equals(method.getName())) {
    try {
      Object otherHandler =
          args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null;
      return equals(otherHandler);
    } catch (IllegalArgumentException e) {
      return false;
    }
  } else if ("hashCode".equals(method.getName())) {
    return hashCode();
  } else if ("toString".equals(method.getName())) {
    return toString();
  }

  return dispatch.get(method).invoke(args);
}

最终InvocationHandler将方法调用路由到了MethodHandler上。

public MethodHandler create(Target<?> target,
                            MethodMetadata md,
                            RequestTemplate.Factory buildTemplateFromArgs,
                            Options options,
                            Decoder decoder,
                            ErrorDecoder errorDecoder) {
  return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger,
      logLevel, md, buildTemplateFromArgs, options, decoder,
      errorDecoder, decode404, closeAfterDecode, propagationPolicy, forceDecoding);
}

我们看一下SynchronousMethodHandler的invoke方法

@Override
public Object invoke(Object[] argv) throws Throwable {
  RequestTemplate template = buildTemplateFromArgs.create(argv);
  Options options = findOptions(argv);
  Retryer retryer = this.retryer.clone();
  while (true) {
    try {
      return executeAndDecode(template, options);
    } catch (RetryableException e) {
      
  }
}

executeAndDecode中创建request对象,使用client去执行请求。

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
  Request request = targetRequest(template);
  Response response;
  long start = System.nanoTime();
  try {
    response = client.execute(request, options);
    // ensure the request is set. TODO: remove in Feign 12
    response = response.toBuilder()
        .request(request)
        .requestTemplate(template)
        .build();
  } catch (IOException e) {
  }
  if (decoder != null)
    return decoder.decode(response, metadata.returnType());

  CompletableFuture<Object> resultFuture = new CompletableFuture<>();
  asyncResponseHandler.handleResponse(resultFuture, metadata.configKey(), response,
      metadata.returnType(),
      elapsedTime);

  try {
    if (!resultFuture.isDone())
      throw new IllegalStateException("Response handling not done");

    return resultFuture.join();
  } catch (CompletionException e) {
   
  }
}

四、LoadBalancerFeignClient

LoadBalancerFeignClient是Client接口的实现,获取一个FeignLoadBalancer

@Override
public Response execute(Request request, Request.Options options) throws IOException {
   try {
      URI asUri = URI.create(request.url());
      String clientName = asUri.getHost();
      URI uriWithoutHost = cleanUrl(request.url(), clientName);
      FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
            this.delegate, request, uriWithoutHost);

      IClientConfig requestConfig = getClientConfig(options, clientName);
      return lbClient(clientName)
            .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
   }
   catch (ClientException e) {
   }
}

然后调用executeWithLoadBalancer,这是一个模板方法。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
    try {
        return command.submit(
            new ServerOperation<T>() {
                @Override
                public Observable<T> call(Server server) {
                  
                }
            })
            .toBlocking()
            .single();
    } catch (Exception e) {
      
    }
    
}

LoadBalancerCommand的submit中核心功能是选择一个Server进行回调。

private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
        @Override
        public void call(Subscriber<? super Server> next) {
            try {
                Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                next.onNext(server);
                next.onCompleted();
            } catch (Exception e) {
                next.onError(e);
            }
        }
    });
}

最终负责选择服务的是ILoadBalancer。

ILoadBalancer lb = getLoadBalancer();
Server svc = lb.chooseServer(loadBalancerKey);

我们看一下BaseLoadBalancer,委托给IRule选择一个服务,默认是RoundRobinRule。

public Server chooseServer(Object key) {
    if (counter == null) {
        counter = createCounter();
    }
    counter.increment();
    if (rule == null) {
        return null;
    } else {
        try {
            return rule.choose(key);
        } catch (Exception e) {
            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
            return null;
        }
    }
}

选择服务后,把url替换成真实的服务地址,就可以进行请求了。

new ServerOperation<T>() {
    @Override
    public Observable<T> call(Server server) {
        URI finalUri = reconstructURIWithServer(server, request.getUri());
        S requestForServer = (S) request.replaceUri(finalUri);
        try {
            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
        } 
        catch (Exception e) {
            return Observable.error(e);
        }
    }
}

最终委托给AbstractLoadBalancingClient的实现类进行方法调用。FeignLoadBalancer是最简单原始的实现,不依赖其他jar。通过HttpURLConnection进行url的调用。



五、服务调用流程

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表