英文:
How to trace two asynchronous go routines with open telemetry
问题
我正在尝试使用Open Telemetry追踪具有两个Go协程的方法。第一个Go协程从Kafka读取消息并创建一个耗时较长的任务(可能需要1秒到1分钟不等)。然后,第二个Go协程监听已完成的任务。
正确的追踪方式是什么,以便我们知道第二个协程中的哪个任务结果对应于第一个协程中的哪个Kafka消息?
我猜想,在Go协程中创建的两个span必须通过相同的traceId进行关联。
func startListening(ctx context.Context) {
// 初始化kafka客户端
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for message := range kafkaEvents {
// 处理消息,创建长时间任务
// 在这里使用traceID创建span?
}
func waitForJobs(ctx) {
for results := range finishedJobs
// 处理结果
// 在这里使用traceID创建span?
}
}
非常感谢您的任何建议!
英文:
I am trying to trace a method that has two Go routines using Open Telemetry. The first Go routine reads from Kafka and creates a long-lasting job (can take anywhere from 1 second to 1 minute). Then, the second Go routine listens for the finished jobs.
What would be the correct way of doing the tracing so that we know which job result (in the second routine) corresponds to which kafka message (from the first routine)?
My guess is that the two spans created in the Go routines have to be linked via same traceId.
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for message := range kafkaEvents {
// process message, create long job
// create span here with traceID?
}
func waitForJobs(ctx) {
for results := range finishedJobs
// process result
// create span here with traceID?
}
}
Any suggestion is highly appreciated!
答案1
得分: 1
答案实际上比我想象的要简单。你需要在处理完成的作业时,将与该长作业相关的追踪信息进一步传递,并在解码时对其进行解码。
在我的情况下,因为我使用了一个文本traceparent
头部,因此使用了propagation.TextMapPropagator
的propagation.TraceContext{}
实现,我决定发送整个traceparent
头部(尽管我可能需要对tracestate
做同样的操作),然后在处理完成的作业时使用Extract
方法解码头部。但是为了使用Extract
方法,你需要实现propagation.TextMapCarrier
接口。
func startListening(ctx context.Context) {
// 初始化kafka客户端
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for msg := range kafkaEvents {
// 从traceparent头部中提取传入的追踪信息。示例见https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
// 创建span
tr := otel.Tracer("consumer")
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
semconv.MessagingOperationProcess,
))
defer span.End()
// 获取traceparent头部
carrier := otelsarama.NewConsumerMessageCarrier(&msg)
traceparentHeader := carrier.Get("traceparent")
// 处理消息,创建长作业并附加头部
jobs.enqueue{TraceparentHeader: traceparentHeader}
}
}
func waitForJobs(ctx) {
for result := range finishedJobs {
ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
attribute.String("jobName", result.JobName),
))
defer span.End()
// 做更多的工作
}
}
// PseudoCarrier实现了propagation.TextMapCarrier接口,以便在解析traceparent头部时可以使用propagation.Extract方法
type PseudoCarrier struct {
S string
}
func (c PseudoCarrier) Get(_ string) string {
return c.S
}
func (c PseudoCarrier) Set(string, string) {}
func (c PseudoCarrier) Keys() []string {
return []string{"traceparent"}
}
英文:
The answer was in fact easier than I thought. You need to pass further the tracing info attached to that long job and then decode it when processing the finished job.
In my case it, because I was using a text traceparent
header and therefore a propagation.TraceContext{}
implementation of propagation.TextMapPropagator
, I decided to send the whole traceparent
header (although I might need to do the same with tracestate
) and then make use of the Extract
method to decode the header when processing the finished job. But in order to use the Extract
method, you need to implement the propagation.TextMapCarrier
interface.
func startListening(ctx context.Context) {
// initialise kafka client
go kafkaConsumeMessages(ctx)
go waitForJob(ctx)
}
func kafkaConsumeMessages(ctx) {
for msg := range kafkaEvents {
// extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
// create span
tr := otel.Tracer("consumer")
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
semconv.MessagingOperationProcess,
))
defer span.End()
// get just the traceparent header
carrier := otelsarama.NewConsumerMessageCarrier(&msg)
traceparentHeader := carrier.Get("traceparent")
// process message, create long job and attach the header
jobs.enqueue{TraceparentHeader: traceparentHeader}
}
func waitForJobs(ctx) {
for result := range finishedJobs {
ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
attribute.String("jobName", result.JobName),
))
defer span.End()
// do more work
}
}
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
S string
}
func (c PseudoCarrier) Get(_ string) string {
return c.S
}
func (c PseudoCarrier) Set(string, string) {}
func (c PseudoCarrier) Keys() []string {
return []string{"traceparent"}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论