英文:
How to manually create and inject TraceContext into Reactor Context in Spring Webflux?
问题
背景: 我有一个运行在Spring Boot 3.1.0上的服务,通过REST和AMQP进行通信。在通过REST调用后,它使用reactor-rabbitmq
将REST负载的内容发布到RabbitMQ队列,然后立即返回HTTP响应。
@RestController
@RequiredArgsConstructor
@RequestMapping("${api.baseurl}")
public class CalculationInitiateController {
private final RequestMapper requestMapper;
private final ResponseMapper responseMapper;
private final CalculationInitializer initializer;
@ResponseStatus(code = HttpStatus.CREATED)
@PostMapping("/initiate")
public Mono<CalculationInitResponseDto> initiateCalculation(@RequestBody @Valid CalculationInitiationRequestDto request) {
return Mono.just(requestMapper.map(request))
.flatMap(initializer::initiateCalculation)
.map(responseMapper::map);
}
}
通过reactor-rabbitmq
进行事件发布:
@Slf4j
@AllArgsConstructor
@Service
public class CalculationInitializer {
private final Sender sender;
private final ObjectMapper objectMapper;
private final EventPublisherProperties eventPublisherProperties;
private final OutboundMessageFactory messageFactory;
public Mono<Boolean> initialize(CalculationEvent calculationEvent) {
log.info("Publishing internal {} event for calculation: {}", eventType, calculationEvent.request().calculationId());
var bytePayload = objectMapper.writeValueAsBytes(calculationEvent);
var outboundMessage = new OutboundMessage("", eventConfig.getRoutingKey(), messageProperties(eventConfig), bytePayload);
return send(outboundMessage);
}
// 其他方法...
}
Rabbit接收器(也来自reactor-rabbitmq
)稍后会消耗消息并调用多个API。
@Slf4j
@Component
@AllArgsConstructor
public class EventMessageListener {
private final EventMessageReceiverProperties eventMessageReceiverProperties;
private final Receiver eventReceiver;
private final ConsumeOptions consumeOptions;
private final Tracer tracer;
private final CalculationEventHandler calculationEventHandler;
@EventListener(ApplicationReadyEvent.class)
public void receiveMessages() {
eventReceiver.consumeManualAck(eventMessageReceiverProperties.getQueue(), consumeOptions)
.flatMap(this::handleMessage)
.doFinally(s -> eventReceiver.close())
.subscribe();
}
// 其他方法...
}
然后处理此事件:
@Component
@AllArgsConstructor
public class CalculationEventHandler {
private final ObjectMapper objectMapper;
private final CalculationErrorHandler calculationErrorHandler;
private final CalculationEventProcessor calculationEventProcessor;
private final EventMessageReceiverProperties eventMessageReceiverProperties;
public Mono<Void> handle(AcknowledgableDelivery message) {
try {
var event = objectMapper.readValue(message.getBody(), CalculationEvent.class);
return calculationEventProcessor.process(event)
.onErrorResume(t -> calculationErrorHandler.handleError(message, t, event))
.then(Mono.empty());
} catch (IOException e) {
return Mono.error(e);
}
}
}
处理包括使用一些外部API验证负载,这些API在多个ValidationService
实现中调用。我使用Flux.mergeDelayError
等待来自API服务的所有响应,以汇总所有错误,如果多个API调用失败:
@Service
@AllArgsConstructor
public class CalculationValidator {
private final List<ValidationService> validationServices;
public Mono<Void> validate(CalculationInput input) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, validateWithEachService(input).toArray(Publisher[]::new)).then();
}
// 其他方法...
}
这里是大多数ValidationService
实现的样子:
@Service
public class ThresholdValidatorServiceImpl implements ValidatorService {
private final WebClient webClient;
private final String endpoint;
private final ThresholdInputMapper mapper;
public ThresholdValidatorServiceImpl(WebClient.Builder builder,
ValidationServiceErrorFilterFactory errorFilterFactory,
ThresholdInputMapper mapper,
@Value("${integration.threshold-validator.url}") String gateway,
@Value("${integration.threshold-validator.endpoint}") String endpoint) {
this.webClient = builder
.baseUrl(dpGateway)
.filter(errorFilterFactory.createFilterFor("threshold-validator"))
.build();
this.endpoint = endpoint;
this.mapper = mapper;
}
@Override
public Mono<JsonNode> get(CalculationInput input) {
return Mono.just(mapper.map(input))
.map(body -> webClient.post()
.uri(endpoint)
.content(MediaType.APPLICATION_JSON)
.bodyValue(body)
.retrieve()
.bodyToMono(Void.class));
}
}
我想要的是:我希望通过将traceId
放入AMQP消息属性中来重复使用初始REST请求的相同traceId
,并在通过Rabbit接收器消耗它后,将traceId
注入Reactor上下文,以便ValidationService
中的每个API调用都使用来自初始请求的相同traceId
。
问题:我能够保存初始请求的traceId
并将其放入AMQP消息属性中。在消耗RabbitMQ消息后,我提取traceId
并使用TraceContext
将其写入Reactor的上下文。我理解的是,这个上下文将在后面的响应式流水线中上游使用,其中进行API调用。在进行多个API调用后,WebClient
似乎为每个.exchange()
生成了一个新的traceId
,这不是我期望的行为。
问题:这个目标是否可以实现?如果可以,正确的方法是什么?
使用的依赖项:
io.micrometer:micrometer-tracing:1.1.2
io.micrometer:context-propagation:1.0.3
io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.6
org.springframework.boot:spring-boot-starter-webflux:3.1.0
编辑:为了更清晰,添加了一些代码,更新了描述。
英文:
Background: I have a service running on Spring Boot 3.1.0 which communicates via REST and AMQP. After it is invoked via REST, it publishes the REST payload's content to a RabbitMQ Queue using reactor-rabbitmq
and immediately returns a HTTP response.
@RestController
@RequiredArgsConstructor
@RequestMapping("${api.baseurl}")
public class CalculationInitiateController {
private final RequestMapper requestMapper;
private final ResponseMapper responseMapper;
private final CalculationInitializer initializer;
@ResponseStatus(code = HttpStatus.CREATED)
@PostMapping("/initiate")
public Mono<CalculationInitResponseDto> initiateCalculation(@RequestBody @Valid CalculationInitiationRequestDto request) {
return Mono.just(requestMapper.map(request))
.flatMap(initializer::initiateCalculation)
.map(responseMapper::map);
}
}
Event publishing via reactor-rabbitmq
:
@Slf4j
@AllArgsConstructor
@Service
public class CalculationInitializer {
private final Sender sender;
private final ObjectMapper objectMapper;
private final EventPublisherProperties eventPublisherProperties;
private final OutboundMessageFactory messageFactory;
public Mono<Boolean> initialize(CalculationEvent calculationEvent) {
log.info("Publishing internal {} event for calculation: {}", eventType, calculationEvent.request().calculationId());
var bytePayload = objectMapper.writeValueAsBytes(calculationEvent);
var outboundMessage = new OutboundMessage("", eventConfig.getRoutingKey(), messageProperties(eventConfig), bytePayload);
return send(outboundMessage);
}
private AMQP.BasicProperties messageProperties(EventPublisherProperties.EventConfig eventConfig) {
var context = tracer.currentTraceContext().context();
return new AMQP.BasicProperties.Builder()
.correlationId(context.traceId())
.headers(Map.of(EVENT_TYPE_HEADER, eventConfig.headerValue(), "traceId", context.traceId(), "spanId", context.spanId()))
.build();
}
private Mono<Boolean> send(OutboundMessage outboundMessage) {
return sender.sendWithPublishConfirms(Mono.just(outboundMessage)).next()
.flatMap(this::checkIfAcknowledged);
}
private Mono<Boolean> checkIfAcknowledged(OutboundMessageResult<OutboundMessage> result) {
if (result.isAck()) {
return Mono.just(Boolean.TRUE);
} else {
log.warn("Message not Acknowledged !!");
return Mono.error(new IllegalStateException("Did not receive ACK on message send"));
}
}
}
A Rabbit receiver (also from reactor-rabbitmq
) later on consumes the message and calls multiple APIs.
@Slf4j
@Component
@AllArgsConstructor
public class EventMessageListener {
private final EventMessageReceiverProperties eventMessageReceiverProperties;
private final Receiver eventReceiver;
private final ConsumeOptions consumeOptions;
private final Tracer tracer;
private final CalculationEventHandler calculationEventHandler;
@EventListener(ApplicationReadyEvent.class)
public void receiveMessages() {
eventReceiver.consumeManualAck(eventMessageReceiverProperties.getQueue(), consumeOptions)
.flatMap(this::handleMessage)
.doFinally(s -> eventReceiver.close())
.subscribe();
}
private Mono<Void> handleMessage(AcknowledgableDelivery message) {
var traceId = message.getProperties().getHeaders().get("traceId").toString();
var spanId = message.getProperties().getHeaders().get("spanId").toString();
var context = tracer.traceContextBuilder().traceId(traceId).spanId(spanId).build();
return Mono.defer(() -> calculationEventHandler.handle(message)
.doOnSuccess(v -> message.ack())
.onErrorResume(ex -> {
log.error("Failed to handle message {}", new String(message.getBody()));
log.error("Exception:", ex);
message.nack(false);
return Mono.empty();
}))
.contextWrite(Context.of(TraceContext.class, context));
}
}
This event is then processed:
@Component
@AllArgsConstructor
public class CalculationEventHandler {
private final ObjectMapper objectMapper;
private final CalculationErrorHandler calculationErrorHandler;
private final CalculationEventProcessor calculationEventProcessor;
private final EventMessageReceiverProperties eventMessageReceiverProperties;
public Mono<Void> handle(AcknowledgableDelivery message) {
try {
var event = objectMapper.readValue(message.getBody(), CalculationEvent.class);
return calculationEventProcessor.process(event)
.onErrorResume(t -> calculationErrorHandler.handleError(message, t, event))
.then(Mono.empty());
} catch (IOException e) {
return Mono.error(e);
}
}
}
Processing includes validating the payload using some external APIs which are called in multiple ValidationService
implementations. I'm using Flux.mergeDelayError
to wait for all the responses from API services to assemble all errors if multiple API calls failed:
@Service
@AllArgsConstructor
public class CalculationValidator {
private final List<ValidationService> validationServices;
public Mono<Void> validate(CalculationInput input) {
return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, validateWithEachService(input).toArray(Publisher[]::new)).then();
}
private List<Mono<Void>> createAttributeRequests(CalculationInput input) {
return validationServices.stream()
.map(validationService -> validationService.validate(input))
.toList();
}
}
Here's what most of the ValidationService
implementations look like:
@Service
public class ThresholdValidatorServiceImpl implements ValidatorService {
private final WebClient webClient;
private final String endpoint;
private final ThresholdInputMapper mapper;
public ThresholdValidatorServiceImpl(WebClient.Builder builder,
ValidationServiceErrorFilterFactory errorFilterFactory,
ThresholdInputMapper mapper,
@Value("${integration.threshold-validator.url}") String gateway,
@Value("${integration.threshold-validator.endpoint}") String endpoint) {
this.webClient = builder
.baseUrl(dpGateway)
.filter(errorFilterFactory.createFilterFor("threshold-validator"))
.build();
this.endpoint = endpoint;
this.mapper = mapper;
}
@Override
public Mono<JsonNode> get(CalculationInput input) {
return Mono.just(mapper.map(input))
.map(body -> webClient.post()
.uri(endpoint)
.content(MediaType.APPLICATION_JSON)
.bodyValue(body)
.retrieve()
.bodyToMono(Void.class));
}
}
What I am after: I want to re-use the same traceId
from the initial REST request by putting the traceId
into the AMQP message properties and after consuming it via the Rabbit Receiver - inject the traceId
into the Reactor Context so the API calls in each ValidationService
via WebClient
use the same traceId
from the initial request.
Problem: I am able to save the traceId
from the initial request and put it inside the AMQP message properties. After consuming the RabbitMQ message I'm fetching the traceId
and writing it to the Reactor's Context using as TraceContext
. My understanding is that this context will be used upstream in the reactive pipeline further on where the API calls are made. After doing multiple API calls the WebClient
seems to generate a new traceId
for each .exchange()
which is not the behaviour I am expecting.
Question: Is this even possible to achieve? If yes, what would be the correct approach?
Dependencies used:
io.micrometer:micrometer-tracing:1.1.2
io.micrometer:context-propagation:1.0.3
io.projectreactor.rabbitmq:reactor-rabbitmq:1.5.6
org.springframework.boot:spring-boot-starter-webflux:3.1.0
EDIT: Added some code for more clarity, updated descriptions.
答案1
得分: 2
Spring Boot 3.x 使用 Micrometer 跟踪。所有 Spring Boot 默认配置都与 "micrometer Observation API" 兼容。
WebFlux WebClient 需要期望一个带有键 "micrometer.observation" 的 Observation 对象,以读取当前上下文中设置的任何 Observation,否则它将启动一个新的 Observation,因此以下代码不起作用:
contextWrite(Context.of(TraceContext.class, context));
解决此问题的可能方案:
-
使用 Spring Cloud Binder for RabbitMQ。它支持响应式规范,包括从 MQ 接收消息作为 Flux。不需要任何代码即可使所有场景正常工作。例如,Spring WebFlux 将从请求头中读取跟踪/跨度,并将其添加到请求上下文中。它将在传出的远程调用中添加这些信息,包括 Web 调用和消息(RabbitMQ)。它将在接收到消息时读取这些标头,并为任何传出调用创建相同的跨度。所有场景都可以立即正常工作。如果使用 MDC(logback 等)进行调试,可以在日志中查看跟踪信息,因为它将跟踪信息添加到 MDC 上下文中,并处理大多数上下文切换的情况。
-
不要在上下文中设置 TraceContext,而是直接在上下文中写入特定的标识,如跟踪/跨度/父级等,然后在进行下游调用时将它们添加到正确的标头值中。不必担心上下文切换或哪个线程将执行 WebClient 调用。如果在主类中简单地设置
Hooks.enableAutomaticContextPropagation();
,则可以以最小的开销传递上下文。 -
这有点复杂,但我想不出更简单的解决方案。创建一个自定义的
io.micrometer.observation.transport.ReceiverContext
,并在其中创建一个新的 Observation。您不必担心从标头中读取跨度,因为默认的传播器将从消息标头中提取它。可以从 Spring RabbitMQ 消息监听器获取灵感来创建新的 ReceiverContext 类,类似于org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext
。
// 在接收器中创建新的 observation
Observation observation = Observation.createNotStarted(...); // 解析 ObservationRegistry Bean 并传递自定义接收器上下文
// 在下面的代码中添加代码
observation.observe(() -> // 你的代码) // 这将被添加到 ThreadLocal 和上下文中
// PropagatingReceiverTracingObservationHandler 已经添加到注册处理程序,将处理此 observation,并从消息中提取所需的标头
<details>
<summary>英文:</summary>
Spring boot 3.x uses micrometer tracing. All Spring Boot default configurations works with `micrometer Observation API`.
webflux webclient expects Observation object with key "micrometer.observation" to read any Observation set in current context else it starts a new observation and hence following code does not work
contextWrite(Context.of(TraceContext.class, context));
Possible solutions to this problem
1. Use Spring Cloud binder for RabbitMQ. It supports reactive specification including receiving messages from MQ as Flux. There is zero code required to make all your scenarios work. For example, Spring WebFlux will read trace/span from request header and add it to request context. It will add this to outgoing remote calls including web calls and messages (rabbitMQ). It will read these headers on incoming messages and create span from same for any outgoing calls. All your scenarios will work out of box. You can debug this in logs if using MDC (logback etc.) as it adds trace information in MDC context and takes care of most of scenarios of context switching.
2. Instead of setting TraceContext in context, write specific ids like trace/span/parent etc directly in context and then add them to right header values when making downstream calls. Do not worry about context switches or which thread will execute webclient calls. If you simply set `Hooks.enableAutomaticContextPropagation();` in main class then with minimal overhead your context should be passed.
3. This is bit complex but I could not think of any easier solution. Create a custom io.micrometer.observation.transport.ReceiverContext and create a new Observation on same. You will not have to worry about reading spans from header as default propogator will extract it from message header. Taking inspiration from Spring RabbitMQ Message Listener does
Create new ReceiverContext class similar to org.springframework.amqp.rabbit.support.micrometer.RabbitMessageReceiverContext
//Create new observation in receiver
Observation observation = Observation.createNotStarted(...); // Resolve ObservationRegistry Bean and pass custom receiver context
//Simply add code between
observation.observe(()->//your code) // this will be added to ThreadLocal and Context
// PropagatingReceiverTracingObservationHandler already added to registry handler will handle this observation and extract the required header from message
答案2
得分: 1
我成功地按照@Gaurav的建议并创建了自定义的ReceiverContext
来使其工作,代码如下:
public class RabbitReceiverContext extends ReceiverContext<AcknowledgableDelivery> {
private final String queue;
private final String listenerId;
public RabbitReceiverContext(AcknowledgableDelivery message, String queueName, String listenerId) {
super((carrier, key) -> carrier.getProperties().getHeaders().getOrDefault(key, "").toString());
setCarrier(message);
this.listenerId = listenerId;
this.queue = queueName;
setRemoteServiceName("RabbitMQ");
}
public String getListenerId() {
return this.listenerId;
}
public String getSource() {
return this.queue;
}
}
然后,我能够将其传递给我在EventMessageListener
的handleMessage()
方法中创建的Observation
,请注意,它并没有直接运行,我还必须添加.contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))
才能使其工作:
private Mono<Void> handleMessage(AcknowledgableDelivery message) {
var observation = Observation.createNotStarted(
"calculation",
() -> new RabbitReceiverContext(message, eventMessageReceiverProperties.getQueue(), eventReceiver.toString()),
observationRegistry
);
return observation.observe(() ->
Mono.defer(() -> eventMessageDispatcher.dispatch(message)
.doOnSuccess(v -> message.ack())
.onErrorResume(ex -> {
log.error("Failed to handle message {}", new String(message.getBody()));
log.error("Exception:", ex);
message.nack(false);
return Mono.empty();
})
).contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))
);
}
还需要注意的一点是,我还必须调整消息的发布以构建traceparent
标头,因为micrometer
默认使用W3C上下文传播。标头构造如下在CalculationInitializer
中完成:
private AMQP.BasicProperties messageProperties(EventPublisherProperties.EventConfig eventConfig) {
var traceId = tracer.currentTraceContext().context().traceId();
var spanId = tracer.currentTraceContext().context().spanId();
var traceParent = String.format("00-%s-%s-00", traceId, spanId);
return new AMQP.BasicProperties.Builder()
.headers(Map.of(EVENT_TYPE_HEADER, eventConfig.headerValue(), "traceparent", traceParent))
.build();
}
在实施了这些更改后,每个WebClient
的exchange()
都会自动传播来自初始请求的traceId。
英文:
I managed to get it working by following @Gaurav advice and creating my own custom ReceiverContext
which looks like this:
public class RabbitReceiverContext extends ReceiverContext<AcknowledgableDelivery> {
private final String queue;
private final String listenerId;
public RabbitReceiverContext(AcknowledgableDelivery message, String queueName, String listenerId) {
super((carrier, key) -> carrier.getProperties().getHeaders().getOrDefault(key, "").toString());
setCarrier(message);
this.listenerId = listenerId;
this.queue = queueName;
setRemoteServiceName("RabbitMQ");
}
public String getListenerId() {
return this.listenerId;
}
public String getSource() {
return this.queue;
}
}
Then, I am able to pass it on to the Observation
that I am creating in the EventMessageListener
handleMessage()
method, note - it did not work out of the box, I also had to add .contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))
for it to work:
private Mono<Void> handleMessage(AcknowledgableDelivery message) {
var observation = Observation.createNotStarted(
"calculation",
() -> new RabbitReceiverContext(message, eventMessageReceiverProperties.getQueue(), eventReceiver.toString()),
observationRegistry
);
return observation.observe(() ->
Mono.defer(() -> eventMessageDispatcher.dispatch(message)
.doOnSuccess(v -> message.ack())
.onErrorResume(ex -> {
log.error("Failed to handle message {}", new String(message.getBody()));
log.error("Exception:", ex);
message.nack(false);
return Mono.empty();
})
).contextWrite(Context.of(ObservationThreadLocalAccessor.KEY, observation))
);
}
One more thing to note - I also had to adjust the message to publishing to construct a traceparent
header, because micrometer
is using W3C Context Propagation by default. Header construction is done like this in the CalculationInitializer
:
private AMQP.BasicProperties messageProperties(EventPublisherProperties.EventConfig eventConfig) {
var traceId = tracer.currentTraceContext().context().traceId();
var spanId = tracer.currentTraceContext().context().spanId();
var traceParent = String.format("00-%s-%s-00", traceId, spanId);
return new AMQP.BasicProperties.Builder()
.headers(Map.of(EVENT_TYPE_HEADER, eventConfig.headerValue(), "traceparent", traceParent))
.build();
}
After these changes were implemented every WebClient
exchange()
is automatically propagating the traceId from the initial request.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论