Quarkus Open Telemetry(分布式跟踪)在线程之间不传递 traceId。

huangapple go评论52阅读模式
英文:

Quarkus Open Telemetry (distributed tracing) does not carry over traceId across Threads

问题

这是我编写的一个示例应用程序,用于解释问题。下面是一个接收请求并在不同线程上执行一些操作的REST端点(也在下面的日志摘录中看到)。随着执行流程进入新线程,可以看到traceId没有复制过来。有什么解决这个问题的方法吗?我正在寻找现有的工具等,可以自动处理这个问题。例如,在Spring中:https://www.baeldung.com/spring-cloud-sleuth-single-application#4-spanning-runnables

GreetingResource.java

@Inject
@Channel("words-in-write")
Emitter<String> emitter;

private final Executor executor = Executors.newFixedThreadPool(1);

@GET
@Path("{primeNumber}")
@Produces(MediaType.TEXT_PLAIN)
public String checkPrimeAndPublish(@PathParam("primeNumber") String primeNumber) {
    log.info("Received request : {}", primeNumber);
    var val = Uni.createFrom().item(primeNumber)
        .emitOn(executor)
        .onItem().transformToUni(s -> {
            log.info("Converting to Long : {}", s);
            return Uni.createFrom().item(Long.valueOf(s));
        })
        .onItem().transformToUni(aLong -> {
            log.info("Checking if prime : {}", aLong);
            return Uni.createFrom().item(isPrime(aLong));
        });

    val.invoke(isPrime -> {
        if(isPrime){
            log.info("Begin : prime provided {}, sending to kafka : {}", primeNumber, primeNumber + " is a prime");
            emitter.send(KafkaRecord.of(primeNumber, primeNumber + " is a prime"));
        } else {
            log.info("Begin : prime not provided {}, sending to kafka : {}", primeNumber, primeNumber + " is not a prime");
            emitter.send(KafkaRecord.of(primeNumber, primeNumber + " is not a prime"));
        }
    }).await().indefinitely();

    return "Done";
}

日志:

13:20:06 INFO  traceId=e80b6aff03ae74b1ee04a1601bfc2105, parentId=, spanId=8c0a6a423d8c1052 [or.ac.GreetingResource] (executor-thread-0) Received request : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Converting to Long : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Checking if prime : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Begin : prime provided 7, sending to kafka : 7 is a prime
英文:

Here is a sample application I have written to explain the problem. Below is a rest end point that receives a request and on a different thread performs some operations. As you will see , the traceId is not copied over as soon as the execution goes over onto a new thread (see log extracts also below). What is the solution to this problem. I am looking for existing utilities etc where this can happen automatically. Example, in Spring : https://www.baeldung.com/spring-cloud-sleuth-single-application#4-spanning-runnables

GreetingResource.java

@Inject
@Channel(&quot;words-in-write&quot;)
Emitter&lt;String&gt; emitter;
private final Executor executor = Executors.newFixedThreadPool(1);
@GET
@Path(&quot;{primeNumber}&quot;)
@Produces(MediaType.TEXT_PLAIN)
public String checkPrimeAndPublish(@PathParam(&quot;primeNumber&quot;) String primeNumber) {
log.info(&quot;Received request : {}&quot;, primeNumber);
var val = Uni.createFrom().item(primeNumber)
.emitOn(executor)
.onItem().transformToUni(s -&gt; {
log.info(&quot;Converting to Long : {}&quot;, s);
return Uni.createFrom().item(Long.valueOf(s));
})
.onItem().transformToUni(aLong -&gt; {
log.info(&quot;Checking if prime : {}&quot;, aLong);
return Uni.createFrom().item(isPrime(aLong));
});
val.invoke(isPrime -&gt; {
if(isPrime){
log.info(&quot;Begin : prime provided {}, sending to kafka : {}&quot;, primeNumber, primeNumber + &quot; is a prime&quot;);
emitter.send(KafkaRecord.of(primeNumber, primeNumber + &quot; is a prime&quot;));
} else {
log.info(&quot;Begin : prime not provided {}, sending to kafka : {}&quot;, primeNumber, primeNumber + &quot; is not a prime&quot;);
emitter.send(KafkaRecord.of(primeNumber, primeNumber + &quot; is not a prime&quot;));
}
}).await().indefinitely();
return &quot;Done&quot;;
}

Logs :

13:20:06 INFO  traceId=e80b6aff03ae74b1ee04a1601bfc2105, parentId=, spanId=8c0a6a423d8c1052 [or.ac.GreetingResource] (executor-thread-0) Received request : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Converting to Long : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Checking if prime : 7
13:20:06 INFO  traceId=, parentId=, spanId= [or.ac.GreetingResource] (pool-12-thread-1) Begin : prime provided 7, sending to kafka : 7 is a prime

答案1

得分: 2

以下是翻译好的部分:

最简单的方法在这里解释:
https://quarkus.io/guides/context-propagation

在按照这个方法操作后,我修改了我的build.gradle文件以包括:

implementation &#39;io.quarkus:quarkus-smallrye-context-propagation&#39;

同时,我还修改了Mutiny代码以使用默认的工作线程池。

.emitOn(Infrastructure.getDefaultWorkerPool())

在这个更改之后,日志如下:

15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-0) 收到请求:7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) 转换为Long型:7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) 检查是否为素数:7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) 开始:提供了素数7,发送到kafka:7是素数
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [io.qu.ht.access-log] (executor-thread-0) &quot;127.0.0.1 主机:localhost:9099
英文:

The easiest way to achieve this is explained here :
https://quarkus.io/guides/context-propagation

Following this I changed my build.gradle to include :

implementation &#39;io.quarkus:quarkus-smallrye-context-propagation&#39;

as well as changed the Mutiny code to use the default worker thread pool.

.emitOn(Infrastructure.getDefaultWorkerPool())

And after this change the logs are as :

15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-0) Received request : 7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) Converting to Long : 7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) Checking if prime : 7
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [or.ac.GreetingResource] (executor-thread-1) Begin : prime provided 7, sending to kafka : 7 is a prime
15:01:59 INFO  traceId=2365984e0fb31fad66967ae302202b83, parentId=, spanId=927616b96dde9982 [io.qu.ht.access-log] (executor-thread-0) &quot;127.0.0.1 Host: localhost:9099

huangapple
  • 本文由 发表于 2023年7月10日 21:07:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76654066.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定