网络知识 娱乐 修复 Spring Cloud Gateway 项目中无法通过 Skywalking 追踪 WebClient 调用的问题

修复 Spring Cloud Gateway 项目中无法通过 Skywalking 追踪 WebClient 调用的问题

解决 Spring Cloud Gateway 项目中无法追踪 WebClient 调用的问题

问题描述

Skywalking 通过 java agent 的方式为 java 应用带来无侵入的分布式链路采集。

在微服务架构中, Spring Cloud Gateway 做为业务网关, 一般需要自定义 Filter ,调用其它服务接口验证用户身份或判断权限。 Gateway 进程配置了 Skywalking Agent(8.8.0) , 但在 Filter 中使用 WebClient 调用远程服务, 可能导致生成多个调用链路, 无法正确跟踪。

Skywalking Jave Agent 采集链路信息原理

排查问题之前, 先了解下 Skywalking Jave Agent 是如何采集链路信息的。

Plugin Development Guide

单进程内同步调用 trace 状态维护

Skywalking Jave Agent 通过 org.apache.skywalking.apm.agent.core.context.ContextManager 来管理 Trace 上下文。

通过 ContextManager#createEntrySpanContextManager#createLocalSpanContextManager#createExistSpan 等方法来创建一个 Span。

  • EntrySpan 表示一次远程被调跨度
  • LocalSpan 表示一次进程内本地跨度
  • ExistSpan 表示一次远程主调跨度

当创建 Span 时, 如果链路上下文 TraceContext 还没有创建, 会先创建 Trace , 并把 TracerContext 存到 ContextManager 管理的 ThreadLocal ContextManager.CONTEXT 中。 新创建的 Span 会使用 TracerContext 的上下文信息。

因为 TracerContext 存在 ThreadLocal 中, 所以在同一个线程中创建的多个 Span 会使用到同一个 TracerContext 串起来。

单进程内异步调用 trace 状态维护

当使用 Spring WebFlux 或 Vert.x 等异步框架时, 一次调用事务的逻辑可能调度在不同的线程中。

因为 ContextManager 使用 ThreadLocal 来维持 TracerContext, 那么在一次调用事务链中每次创建 Span , 可能对应不同的 TracerContext. 最终在 Skyawalking 控制台中出现多个链路。

比如 Spring Mvc 接收到 Http 请求时, 创建了一个 EntrySpan, 在接下来的业务逻辑中需要调用一个远程服务, 那么需要创建一个 ExitSpan , 但在创建 ExitSpan 时由于多次异步调用, 已经切到别的线程上, ContextManager 获取不到原来的 TracerContext, 便新建了一个, 此时便出现 EntrySpan 与 ExitSpan 不属于同一个 Trace 的情况。

针对异步调用, Skywalking Agent 提供了 ContextSnapshot 用于在线程间共享 TracerContext.

在实现异步框架的插件时, 当创建第一个 Span 后, 需要使用 ContextManager.capture() 获取到 ContextSnapshot, 并放置到异步框架本身的上下文来传递。

而后, 再创建后续的 Span 时, 需要从框架的上下文中获取 ContextSnapshot , 再使用 ContextManger.continued 方法把 ContextSnapshot 恢复到当前 Span 中。

跨进程调用 trace 状态传递

Skywalking 根据不同的网络协议或框架(比如 Http Header 或 Kafka Message Header), 来传递链路上下文。 实现步聚如下:

  1. 主调端创建一个 ExitSpan, 通过 ContextManger.inject(ContextCarrier carrier) 把上下文信息注入到 carrier 中, 通过 carrier 可以获取到需要传递的 Hearder 信息, 再把 Header 信息注入到对应调用框架中(比如 HttpRequest)。
  2. 被调方从框架中取得 Header 信息封装成 ContextCarrier, 再使用 carrier 调用 ContextManager#createEntrySpan 来创建 EntrySpan 便能把主调跟被调的 Trace 上下文串起来。

Spring WebFlux Webclient 插件实现逻辑及问题重现。

既然是 WebClient 调用会导致生成多个 Trace , 那么直接查看 spring-webflux-5.x-webclient-plugin 插件的代码(8.8.0 版本)。

通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.WebFluxWebClientInstrumentation 可以看到插件通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.WebFluxWebClientInterceptor 拦截了 org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction#exchange 方法。

那么继续查看 WebFluxWebClientInterceptor 的代码:

@Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
        //..

        
        ClientRequest request = (ClientRequest) allArguments[0];
        final ContextCarrier contextCarrier = new ContextCarrier();

        URI uri = request.url();
        final String requestURIString = getRequestURIString(uri);
        final String operationName = requestURIString;
        final String remotePeer = getIPAndPort(uri);
        
        // 直接创建 ExitSpan , 没用使用 ContextManager.continued 来恢复上下文
        AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remotePeer);

        //...

        //user async interface
        span.prepareForAsync();
        ContextManager.stopSpan();
        context.setContext(span);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
        // fix the problem that allArgument[0] may be null
        if (allArguments[0] == null) {
            return ret;
        }
        Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
        AbstractSpan span = (AbstractSpan) context.getContext();
        return ret1.doOnSuccess(clientResponse -> {
            //...
        }).doOnError(error -> {
            span.log(error);
        }).doFinally(s -> {
            span.asyncFinish();
        });
    }

可以看到 WebFluxWebClientInterceptor#beforeMethod 中直接创建 ExitSpan , 并没有使用 ContextManger.continued 来恢复上下文。 因为在Spring WebFlux 基于 Reactor 异步框架 , 那么创建当前 Span 与前置创建 EntrySpan 不在同个线程中, 两个 Span 属于两个不同的 Trace.

Bug 复现, 创建一个 Spring WebFlux 项目, 编写测试 Controller 如下

@SpringBootApplication
@RestController
public class SpringWebfluxProjectApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringWebfluxProjectApplication.class, args);
    }

    @GetMapping("test")
    public Mono<String> hello() {
        return WebClient.create("http://localhost:8080/foo")
                .get()
                .retrieve()
                .bodyToMono(String.class)
                .flatMap(s -> {
                    return WebClient.create("http://localhost:8080/bar")
                            .get()
                            .retrieve()
                            .bodyToMono(String.class);
                });

    }

    @GetMapping("foo")
    public Mono<String> baidu(ServerWebExchange exchange) {
        return Mono.just("foo");

    }

    @GetMapping("bar")
    public Mono<String> qq(ServerWebExchange exchange) throws InterruptedException {
        return Mono.just("bar").delayElement(Duration.ofMillis(100));

    }

}

配置好skywalking agent 相关 JVM 参数, 运行项目, 请求 http://localhost:8080/test , 查看 skywalking 面板, 确实生成了多个 Span , 但每个 Span 的 TraceId 都不一样。

skywalking-webclient-bug.png

解决方案

基于上节分析, 根本问题在于在创建 ExitSpan 时没有恢复上下文, 那么需有找到一个方法获取到上游的 ContextSnapshot 并恢复即可。

Spring Webflux 基于 Reactor 框架 , 可以通过 Reactor Context 来传递 ContextSnapshot.

Skywalking 默认插件中包含 mvc-annotation-5.x-plugin , 查看对应代码, 发现该插件通过拦截 Spring Mvc 相关注解方法, 在注解方法前创建 EntrySpan , 使用同步的方式,且拦截方法返回值不一定是 Mono 或 Flux , 难于在这个地方把 ContextSnapshot 放入 Reactor Context 中。 在 optional-plugin 还有 spring-webflux-5.x-plugin 插件, 该插件通过拦截 org.springframework.web.reactive.DispatcherHandler#handle 来创建 EntrySpan, DispatcherHandler#handle 返回 Mono , 可以在这里插入 ContextSnapshot.

具体实现如下:

//org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor#afterMethod
 @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {

        ServerWebExchange exchange = (ServerWebExchange) allArguments[0];

        AbstractSpan span = (AbstractSpan) exchange.getAttributes().get("SKYWALKING_SPAN");
        Mono<Void> monoReturn = (Mono<Void>) ret;

        
        // add skywalking context snapshot to reactor context.
        EnhancedInstance instance = getInstance(allArguments[0]);
        if (instance != null && instance.getSkyWalkingDynamicField() != null) {
            monoReturn = monoReturn.subscriberContext(
                    c -> c.put("SKYWALKING_CONTEXT_SNAPSHOT", instance.getSkyWalkingDynamicField()));
        }

        return monoReturn.doFinally(s -> {

            if (span != null) {
                maybeSetPattern(span, exchange);
                try {

                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support
                    if (httpStatus != null) {
                        Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
                        if (httpStatus.isError()) {
                            span.errorOccurred();
                        }
                    }
                } finally {
                    span.asyncFinish();
                }
            }
        });
    }

个性 WebFluxWebClientInterceptor 从 Reactor Context 中获取 ContextSnapshot :

public class WebFluxWebClientInterceptor implements InstanceMethodsAroundInterceptorV2 {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
        // before method 中无法获取 Reactor 上下文 , 原逻辑直接删除掉
        // ExchangeFunctions$DefaultExchangeFunction 中只是构建 Reactor 链条, 并末真正执行, 所以原来逻辑可以推迟到 subscriberContext 中获取上下文后再执行。
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
        // fix the problem that allArgument[0] may be null
        if (allArguments[0] == null) {
            return ret;
        }
        Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
        // 从 Reactor Context 中获取 ContextSnapshot 
        return Mono.subscriberContext().flatMap(ctx -> {

            ClientRequest request = (ClientRequest) allArguments[0];
            URI uri = request.url();
            final String operationName = getRequestURIString(uri);
            final String remotePeer = getIPAndPort(uri);
            AbstractSpan span = ContextManager.createExitSpan(operationName, remotePeer);

            // get ContextSnapshot from reactor context,  the snapshot is set to reactor context by any other plugin
            // such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
            final Optional<Object> optional = ctx.getOrEmpty("SKYWALKING_CONTEXT_SNAPSHOT");
            optional.ifPresent(snapshot -> ContextManager.continued((ContextSnapshot) snapshot));

            //set components name
            span.setComponent(ComponentsDefine.SPRING_WEBCLIENT);
            Tags.URL.set(span, uri.toString());
            Tags.HTTP.METHOD.set(span, request.method().toString());
            SpanLayer.asHttp(span);

            final ContextCarrier contextCarrier = new ContextCarrier();
            ContextManager.inject(contextCarrier);
            if (request instanceof EnhancedInstance) {
                ((EnhancedInstance) request).setSkyWalkingDynamicField(contextCarrier);
            }

            //user async interface
            span.prepareForAsync();
            ContextManager.stopSpan();
            return ret1.doOnSuccess(clientResponse -> {
                HttpStatus httpStatus = clientResponse.statusCode();
                if (httpStatus != null) {
                    Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
                    if (httpStatus.isError()) {
                        span.errorOccurred();
                    }
                }
            }).doOnError(span::log).doFinally(s -> {
                span.asyncFinish();
            });
        });
    }
    
}

重新编译插件后把 spring-webflux-5.x-plugin 及 spring-webflux-5.x-webclient-plugin 两个插件拷到 Skywalking Agent plugin 目录下, 重新运行测试代码, 可以发现问题解决, 所有调用都串起来。

webclient_trace_fixed.png

修复代码已合并到 skywalking-java 主干(#114), 预计将在 8.10.0 版本中发布。

注意1: 因为 spring-webflux-5.x-plugin 是在 optional-plugins 目录中, 需要手工拷到 plugins 目录。

而 Spring Cloud Gateway 工程需要手工拷 gateway-3.x-plugin。

注意2: Srping MVC 插件 apm-springmvc-annotation-5.x-plugin 默认生效, 当与 spring-webflux-5.x-plugin 同时存在时, 一次调用会生成两个 EntrySpan, 而且 mvc 插件生成 EntrySpan 虽然与 Webclient 生成的 ExitSpan 能用同个 TraceId 串起来了, 但仍然没有 Parent/Child 关系, 介意的话在 Spring Webflux 工程中把 spring-webflux-5.x-plugin 移出 ${agetn/path}/plugin 目录。


参考

  1. Plugin Development Guide
  2. Reactor Context