如何在Spring Webflux中手动创建并注入TraceContext到Reactor Context中?

huangapple go评论95阅读模式

How to manually create and inject TraceContext into Reactor Context in Spring Webflux?


背景: 我有一个运行在Spring Boot 3.1.0上的服务,通过REST和AMQP进行通信。在通过REST调用后,它使用reactor-rabbitmq将REST负载的内容发布到RabbitMQ队列,然后立即返回HTTP响应。

public class CalculationInitiateController {

    private final RequestMapper requestMapper;
    private final ResponseMapper responseMapper;
    private final CalculationInitializer initializer;

    @ResponseStatus(code = HttpStatus.CREATED)
    public Mono<CalculationInitResponseDto> initiateCalculation(@RequestBody @Valid CalculationInitiationRequestDto request) {
        return Mono.just(requestMapper.map(request))


public class CalculationInitializer {

    private final Sender sender;
    private final ObjectMapper objectMapper;
    private final EventPublisherProperties eventPublisherProperties;
    private final OutboundMessageFactory messageFactory;

    public Mono<Boolean> initialize(CalculationEvent calculationEvent) {
        log.info("Publishing internal {} event for calculation: {}", eventType, calculationEvent.request().calculationId());
        var bytePayload = objectMapper.writeValueAsBytes(calculationEvent);
        var outboundMessage = new OutboundMessage("", eventConfig.getRoutingKey(), messageProperties(eventConfig), bytePayload);
        return send(outboundMessage);

    // 其他方法...


public class EventMessageListener {

    private final EventMessageReceiverProperties eventMessageReceiverProperties;
    private final Receiver eventReceiver;
    private final ConsumeOptions consumeOptions;
    private final Tracer tracer;
    private final CalculationEventHandler calculationEventHandler;

    public void receiveMessages() {
        eventReceiver.consumeManualAck(eventMessageReceiverProperties.getQueue(), consumeOptions)
                .doFinally(s -> eventReceiver.close())

    // 其他方法...


public class CalculationEventHandler  {

    private final ObjectMapper objectMapper;
    private final CalculationErrorHandler calculationErrorHandler;
    private final CalculationEventProcessor calculationEventProcessor;
    private final EventMessageReceiverProperties eventMessageReceiverProperties;

    public Mono<Void> handle(AcknowledgableDelivery message) {
        try {
            var event = objectMapper.readValue(message.getBody(), CalculationEvent.class);
            return calculationEventProcessor.process(event)
                    .onErrorResume(t -> calculationErrorHandler.handleError(message, t, event))
        } catch (IOException e) {
            return Mono.error(e);


public class CalculationValidator {

    private final List<ValidationService> validationServices;

    public Mono<Void> validate(CalculationInput input) {
        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, validateWithEachService(input).toArray(Publisher[]::new)).then();

    // 其他方法...


public class ThresholdValidatorServiceImpl implements ValidatorService {

    private final WebClient webClient;
    private final String endpoint;
    private final ThresholdInputMapper mapper;

    public ThresholdValidatorServiceImpl(WebClient.Builder builder,
                                        ValidationServiceErrorFilterFactory errorFilterFactory,
                                        ThresholdInputMapper mapper,
                                        @Value("${integration.threshold-validator.url}") String gateway,
                                        @Value("${integration.threshold-validator.endpoint}") String endpoint) {
        this.webClient = builder
        this.endpoint = endpoint;
        this.mapper = mapper;

    public Mono<JsonNode> get(CalculationInput input) {
        return Mono.just(mapper.map(input))
                .map(body -> webClient.post()





  • io.micrometer:micrometer-tracing:1.1.2
  • io.micrometer:context-propagation:1.0.3
  • io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.6
  • org.springframework.boot:spring-boot-starter-webflux:3.1.0



Background: I have a service running on Spring Boot 3.1.0 which communicates via REST and AMQP. After it is invoked via REST, it publishes the REST payload's content to a RabbitMQ Queue using reactor-rabbitmq and immediately returns a HTTP response.

public class CalculationInitiateController {

    private final RequestMapper requestMapper;
    private final ResponseMapper responseMapper;
    private final CalculationInitializer initializer;

    @ResponseStatus(code = HttpStatus.CREATED)
    public Mono&lt;CalculationInitResponseDto&gt; initiateCalculation(@RequestBody @Valid CalculationInitiationRequestDto request) {
        return Mono.just(requestMapper.map(request))

Event publishing via reactor-rabbitmq:

public class CalculationInitializer {

    private final Sender sender;
    private final ObjectMapper objectMapper;
    private final EventPublisherProperties eventPublisherProperties;
    private final OutboundMessageFactory messageFactory;

    public Mono&lt;Boolean&gt; initialize(CalculationEvent calculationEvent) {
        log.info(&quot;Publishing internal {} event for calculation: {}&quot;, eventType, calculationEvent.request().calculationId());
	    var bytePayload = objectMapper.writeValueAsBytes(calculationEvent);
        var outboundMessage = new OutboundMessage(&quot;&quot;, eventConfig.getRoutingKey(), messageProperties(eventConfig), bytePayload);
        return send(outboundMessage);    

    private AMQP.BasicProperties messageProperties(EventPublisherProperties.EventConfig eventConfig) {
        var context = tracer.currentTraceContext().context();
        return new AMQP.BasicProperties.Builder()
                .headers(Map.of(EVENT_TYPE_HEADER, eventConfig.headerValue(), &quot;traceId&quot;, context.traceId(), &quot;spanId&quot;, context.spanId()))

    private Mono&lt;Boolean&gt; send(OutboundMessage outboundMessage) {
        return sender.sendWithPublishConfirms(Mono.just(outboundMessage)).next()

    private Mono&lt;Boolean&gt; checkIfAcknowledged(OutboundMessageResult&lt;OutboundMessage&gt; result) {
        if (result.isAck()) {
            return Mono.just(Boolean.TRUE);
        } else {
            log.warn(&quot;Message not Acknowledged !!&quot;);
            return Mono.error(new IllegalStateException(&quot;Did not receive ACK on message send&quot;));

A Rabbit receiver (also from reactor-rabbitmq) later on consumes the message and calls multiple APIs.

public class EventMessageListener {

    private final EventMessageReceiverProperties eventMessageReceiverProperties;
    private final Receiver eventReceiver;
    private final ConsumeOptions consumeOptions;
    private final Tracer tracer;
    private final CalculationEventHandler calculationEventHandler;

    public void receiveMessages() {
        eventReceiver.consumeManualAck(eventMessageReceiverProperties.getQueue(), consumeOptions)
                .doFinally(s -&gt; eventReceiver.close())

    private Mono&lt;Void&gt; handleMessage(AcknowledgableDelivery message) {
        var traceId = message.getProperties().getHeaders().get(&quot;traceId&quot;).toString();
        var spanId = message.getProperties().getHeaders().get(&quot;spanId&quot;).toString();
        var context = tracer.traceContextBuilder().traceId(traceId).spanId(spanId).build();
        return Mono.defer(() -&gt; calculationEventHandler.handle(message)
                .doOnSuccess(v -&gt; message.ack())
                .onErrorResume(ex -&gt; {
                    log.error(&quot;Failed to handle message {}&quot;, new String(message.getBody()));
                    log.error(&quot;Exception:&quot;, ex);
                    return Mono.empty();
                .contextWrite(Context.of(TraceContext.class, context));

This event is then processed:

public class CalculationEventHandler  {

    private final ObjectMapper objectMapper;
    private final CalculationErrorHandler calculationErrorHandler;
    private final CalculationEventProcessor calculationEventProcessor;
    private final EventMessageReceiverProperties eventMessageReceiverProperties;

    public Mono&lt;Void&gt; handle(AcknowledgableDelivery message) {
        try {
            var event = objectMapper.readValue(message.getBody(), CalculationEvent.class);
            return calculationEventProcessor.process(event)
                    .onErrorResume(t -&gt; calculationErrorHandler.handleError(message, t, event))
        } catch (IOException e) {
            return Mono.error(e);

Processing includes validating the payload using some external APIs which are called in multiple ValidationService implementations. I'm using Flux.mergeDelayError to wait for all the responses from API services to assemble all errors if multiple API calls failed:

public class CalculationValidator {

    private final List&lt;ValidationService&gt; validationServices;

    public Mono&lt;Void&gt; validate(CalculationInput input) {
        return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, validateWithEachService(input).toArray(Publisher[]::new)).then();

    private List&lt;Mono&lt;Void&gt;&gt; createAttributeRequests(CalculationInput input) {
        return validationServices.stream()
                .map(validationService -&gt; validationService.validate(input))

Here's what most of the ValidationService implementations look like:

public class ThresholdValidatorServiceImpl implements ValidatorService {

    private final WebClient webClient;
    private final String endpoint;
    private final ThresholdInputMapper mapper;

    public ThresholdValidatorServiceImpl(WebClient.Builder builder,
                                        ValidationServiceErrorFilterFactory errorFilterFactory,
                                        ThresholdInputMapper mapper,
                                        @Value(&quot;${integration.threshold-validator.url}&quot;) String gateway,
                                        @Value(&quot;${integration.threshold-validator.endpoint}&quot;) String endpoint) {
        this.webClient = builder
        this.endpoint = endpoint;
        this.mapper = mapper;

    public Mono&lt;JsonNode&gt; get(CalculationInput input) {
	    return Mono.just(mapper.map(input))
			    .map(body -&gt; webClient.post()

What I am after: I want to re-use the same traceId from the initial REST request by putting the traceId into the AMQP message properties and after consuming it via the Rabbit Receiver - inject the traceId into the Reactor Context so the API calls in each ValidationService via WebClient use the same traceId from the initial request.

Problem: I am able to save the traceId from the initial request and put it inside the AMQP message properties. After consuming the RabbitMQ message I'm fetching the traceId and writing it to the Reactor's Context using as TraceContext. My understanding is that this context will be used upstream in the reactive pipeline further on where the API calls are made. After doing multiple API calls the WebClient seems to generate a new traceId for each .exchange() which is not the behaviour I am expecting.

Question: Is this even possible to achieve? If yes, what would be the correct approach?

Dependencies used:

  • io.micrometer:micrometer-tracing:1.1.2
  • io.micrometer:context-propagation:1.0.3
  • io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.6
  • org.springframework.boot:spring-boot-starter-webflux:3.1.0

EDIT: Added some code for more clarity, updated descriptions.


得分: 2

Spring Boot 3.x 使用 Micrometer 跟踪。所有 Spring Boot 默认配置都与 "micrometer Observation API" 兼容。
WebFlux WebClient 需要期望一个带有键 "micrometer.observation" 的 Observation 对象,以读取当前上下文中设置的任何 Observation,否则它将启动一个新的 Observation,因此以下代码不起作用:

contextWrite(Context.of(TraceContext.class, context));


  1. 使用 Spring Cloud Binder for RabbitMQ。它支持响应式规范,包括从 MQ 接收消息作为 Flux。不需要任何代码即可使所有场景正常工作。例如,Spring WebFlux 将从请求头中读取跟踪/跨度,并将其添加到请求上下文中。它将在传出的远程调用中添加这些信息,包括 Web 调用和消息(RabbitMQ)。它将在接收到消息时读取这些标头,并为任何传出调用创建相同的跨度。所有场景都可以立即正常工作。如果使用 MDC(logback 等)进行调试,可以在日志中查看跟踪信息,因为它将跟踪信息添加到 MDC 上下文中,并处理大多数上下文切换的情况。

  2. 不要在上下文中设置 TraceContext,而是直接在上下文中写入特定的标识,如跟踪/跨度/父级等,然后在进行下游调用时将它们添加到正确的标头值中。不必担心上下文切换或哪个线程将执行 WebClient 调用。如果在主类中简单地设置 Hooks.enableAutomaticContextPropagation();,则可以以最小的开销传递上下文。

  3. 这有点复杂,但我想不出更简单的解决方案。创建一个自定义的 io.micrometer.observation.transport.ReceiverContext,并在其中创建一个新的 Observation。您不必担心从标头中读取跨度,因为默认的传播器将从消息标头中提取它。可以从 Spring RabbitMQ 消息监听器获取灵感来创建新的 ReceiverContext 类,类似于 org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext

// 在接收器中创建新的 observation
Observation observation = Observation.createNotStarted(...); // 解析 ObservationRegistry Bean 并传递自定义接收器上下文
// 在下面的代码中添加代码
observation.observe(() -> // 你的代码) // 这将被添加到 ThreadLocal 和上下文中

// PropagatingReceiverTracingObservationHandler 已经添加到注册处理程序,将处理此 observation,并从消息中提取所需的标头


Spring boot 3.x uses micrometer tracing. All Spring Boot default configurations works with `micrometer Observation API`.
webflux webclient expects Observation object with key &quot;micrometer.observation&quot; to read any Observation set in current context else it starts a new observation and hence following code does not work

contextWrite(Context.of(TraceContext.class, context));

Possible solutions to this problem

 1. Use Spring Cloud binder for RabbitMQ. It supports reactive specification including receiving messages from MQ as Flux. There is zero code required to make all your scenarios work. For example, Spring WebFlux will read trace/span from request header and add it to request context. It will add this to outgoing remote calls including web calls and messages (rabbitMQ). It will read these headers on incoming messages and create span from same for any outgoing calls. All your scenarios will work out of box. You can debug this in logs if using MDC (logback etc.) as it adds trace information in MDC context and takes care of most of scenarios of context switching.
 2. Instead of setting TraceContext in context, write specific ids like trace/span/parent etc directly in context and then add them to right header values when making downstream calls. Do not worry about context switches or which thread will execute webclient calls. If you simply set `Hooks.enableAutomaticContextPropagation();` in main class then with minimal overhead your context should be passed.

 3. This is bit complex but I could not think of any easier solution. Create a custom io.micrometer.observation.transport.ReceiverContext and create a new Observation on same. You will not have to worry about reading spans from header as default propogator will extract it from message header. Taking inspiration from Spring RabbitMQ Message Listener does

Create new ReceiverContext class similar to org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext

//Create new observation in receiver
Observation observation = Observation.createNotStarted(...); // Resolve ObservationRegistry Bean and pass custom receiver context
//Simply add code between
observation.observe(()->//your code) // this will be added to ThreadLocal and Context

// PropagatingReceiverTracingObservationHandler already added to registry handler will handle this observation and extract the required header from message


得分: 1


public class RabbitReceiverContext extends ReceiverContext<AcknowledgableDelivery> {

    private final String queue;
    private final String listenerId;

    public RabbitReceiverContext(AcknowledgableDelivery message, String queueName, String listenerId) {
        super((carrier, key) -> carrier.getProperties().getHeaders().getOrDefault(key, "").toString());
        this.listenerId = listenerId;
        this.queue = queueName;

    public String getListenerId() {
        return this.listenerId;

    public String getSource() {
        return this.queue;

然后,我能够将其传递给我在EventMessageListenerhandleMessage()方法中创建的Observation,请注意,它并没有直接运行,我还必须添加.contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))才能使其工作:

private Mono<Void> handleMessage(AcknowledgableDelivery message) {
    var observation = Observation.createNotStarted(
            () -> new RabbitReceiverContext(message, eventMessageReceiverProperties.getQueue(), eventReceiver.toString()), 
    return observation.observe(() -> 
            Mono.defer(() -> eventMessageDispatcher.dispatch(message)
                            .doOnSuccess(v -> message.ack())
                            .onErrorResume(ex -> {
                                log.error("Failed to handle message {}", new String(message.getBody()));
                                log.error("Exception:", ex);
                                return Mono.empty();
            ).contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))


private AMQP.BasicProperties messageProperties(EventPublisherProperties.EventConfig eventConfig) {
    var traceId = tracer.currentTraceContext().context().traceId();
    var spanId = tracer.currentTraceContext().context().spanId();
    var traceParent = String.format("00-%s-%s-00", traceId, spanId);
    return new AMQP.BasicProperties.Builder()
            .headers(Map.of(EVENT_TYPE_HEADER, eventConfig.headerValue(), "traceparent", traceParent))



I managed to get it working by following @Gaurav advice and creating my own custom ReceiverContext which looks like this:

public class RabbitReceiverContext extends ReceiverContext&lt;AcknowledgableDelivery&gt; {

    private final String queue;
    private final String listenerId;

    public RabbitReceiverContext(AcknowledgableDelivery message, String queueName, String listenerId) {
        super((carrier, key) -&gt; carrier.getProperties().getHeaders().getOrDefault(key, &quot;&quot;).toString());
        this.listenerId = listenerId;
        this.queue = queueName;

    public String getListenerId() {
        return this.listenerId;

    public String getSource() {
        return this.queue;

Then, I am able to pass it on to the Observation that I am creating in the EventMessageListener handleMessage() method, note - it did not work out of the box, I also had to add .contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation)) for it to work:

private Mono&lt;Void&gt; handleMessage(AcknowledgableDelivery message) {
    var observation = Observation.createNotStarted(
            () -&gt; new RabbitReceiverContext(message, eventMessageReceiverProperties.getQueue(), eventReceiver.toString()), 
    return observation.observe(() -&gt; 
            Mono.defer(() -&gt; eventMessageDispatcher.dispatch(message)
                            .doOnSuccess(v -&gt; message.ack())
                            .onErrorResume(ex -&gt; {
                                log.error(&quot;Failed to handle message {}&quot;, new String(message.getBody()));
                                log.error(&quot;Exception:&quot;, ex);
                                return Mono.empty();
            ).contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))

One more thing to note - I also had to adjust the message to publishing to construct a traceparent header, because micrometer is using W3C Context Propagation by default. Header construction is done like this in the CalculationInitializer:

private AMQP.BasicProperties messageProperties(EventPublisherProperties.EventConfig eventConfig) {
    var traceId = tracer.currentTraceContext().context().traceId();
    var spanId = tracer.currentTraceContext().context().spanId();
    var traceParent = String.format(&quot;00-%s-%s-00&quot;, traceId, spanId);
    return new AMQP.BasicProperties.Builder()
            .headers(Map.of(EVENT_TYPE_HEADER, eventConfig.headerValue(), &quot;traceparent&quot;, traceParent))

After these changes were implemented every WebClient exchange() is automatically propagating the traceId from the initial request.

  • 本文由 发表于 2023年7月7日 01:34:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76631278.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
