How to trace two asynchronous go routines with open telemetry

huangapple go评论82阅读模式
英文:

How to trace two asynchronous go routines with open telemetry

问题

我正在尝试使用Open Telemetry追踪具有两个Go协程的方法。第一个Go协程从Kafka读取消息并创建一个耗时较长的任务(可能需要1秒到1分钟不等)。然后,第二个Go协程监听已完成的任务。

正确的追踪方式是什么,以便我们知道第二个协程中的哪个任务结果对应于第一个协程中的哪个Kafka消息?

我猜想,在Go协程中创建的两个span必须通过相同的traceId进行关联。

func startListening(ctx context.Context) {
  // 初始化kafka客户端

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for message := range kafkaEvents {
    // 处理消息,创建长时间任务
    // 在这里使用traceID创建span?
  }  

func waitForJobs(ctx) {
  for results := range finishedJobs
    // 处理结果
    // 在这里使用traceID创建span?
  }
}

非常感谢您的任何建议!

英文:

I am trying to trace a method that has two Go routines using Open Telemetry. The first Go routine reads from Kafka and creates a long-lasting job (can take anywhere from 1 second to 1 minute). Then, the second Go routine listens for the finished jobs.

What would be the correct way of doing the tracing so that we know which job result (in the second routine) corresponds to which kafka message (from the first routine)?

My guess is that the two spans created in the Go routines have to be linked via same traceId.

func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for message := range kafkaEvents {
    // process message, create long job
    // create span here with traceID?
  }  

func waitForJobs(ctx) {
  for results := range finishedJobs
    // process result
    // create span here with traceID?
  }
}

Any suggestion is highly appreciated!

答案1

得分: 1

答案实际上比我想象的要简单。你需要在处理完成的作业时,将与该长作业相关的追踪信息进一步传递,并在解码时对其进行解码。

在我的情况下,因为我使用了一个文本traceparent头部,因此使用了propagation.TextMapPropagatorpropagation.TraceContext{}实现,我决定发送整个traceparent头部(尽管我可能需要对tracestate做同样的操作),然后在处理完成的作业时使用Extract方法解码头部。但是为了使用Extract方法,你需要实现propagation.TextMapCarrier接口。

func startListening(ctx context.Context) {
  // 初始化kafka客户端

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for msg := range kafkaEvents {
    // 从traceparent头部中提取传入的追踪信息。示例见https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
    ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))

    // 创建span
    tr := otel.Tracer("consumer")
	_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
		semconv.MessagingOperationProcess,
	))
	defer span.End()
   
    // 获取traceparent头部
    carrier := otelsarama.NewConsumerMessageCarrier(&msg)
    traceparentHeader := carrier.Get("traceparent")

    // 处理消息,创建长作业并附加头部
    jobs.enqueue{TraceparentHeader: traceparentHeader}
  }  
}

func waitForJobs(ctx) {
  for result := range finishedJobs {
    ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
    ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
		attribute.String("jobName", result.JobName),
    ))
    defer span.End()
 
    // 做更多的工作
  }
}
// PseudoCarrier实现了propagation.TextMapCarrier接口,以便在解析traceparent头部时可以使用propagation.Extract方法
type PseudoCarrier struct {
	S string
}

func (c PseudoCarrier) Get(_ string) string {
	return c.S
}

func (c PseudoCarrier) Set(string, string) {}

func (c PseudoCarrier) Keys() []string {
	return []string{"traceparent"}
}
英文:

The answer was in fact easier than I thought. You need to pass further the tracing info attached to that long job and then decode it when processing the finished job.

In my case it, because I was using a text traceparent header and therefore a propagation.TraceContext{} implementation of propagation.TextMapPropagator, I decided to send the whole traceparent header (although I might need to do the same with tracestate) and then make use of the Extract method to decode the header when processing the finished job. But in order to use the Extract method, you need to implement the propagation.TextMapCarrier interface.

func startListening(ctx context.Context) {
  // initialise kafka client

  go kafkaConsumeMessages(ctx)
  go waitForJob(ctx)
}

func kafkaConsumeMessages(ctx) {
  for msg := range kafkaEvents {
    // extract incoming tracing info from traceparent header. Example at https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/instrumentation/github.com/Shopify/sarama/otelsarama/example/consumer/consumer.go#L84
    ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))

    // create span 
    tr := otel.Tracer("consumer")
	_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
		semconv.MessagingOperationProcess,
	))
	defer span.End()
   
    // get just the traceparent header
    carrier := otelsarama.NewConsumerMessageCarrier(&msg)
    traceparentHeader := carrier.Get("traceparent")

    // process message, create long job and attach the header
    jobs.enqueue{TraceparentHeader: traceparentHeader}
  }  

func waitForJobs(ctx) {
  for result := range finishedJobs {
    ctx = otel.GetTextMapPropagator().Extract(ctx, models.PseudoCarrier{S: result.TraceparentHeader})
    ctx, span := tr.Start(ctx, "process result", trace.WithAttributes(
		attribute.String("jobName", result.JobName),
    ))
    defer span.End()
 
    // do more work 
  }
}
// PseudoCarrier implements the propagation.TextMapCarrier interface so we can use the propagation.Extract method when parsing the traceparent header
type PseudoCarrier struct {
	S string
}

func (c PseudoCarrier) Get(_ string) string {
	return c.S
}

func (c PseudoCarrier) Set(string, string) {}

func (c PseudoCarrier) Keys() []string {
	return []string{"traceparent"}
}

huangapple
  • 本文由 发表于 2021年12月22日 00:28:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/70438619.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定