Goroutine Kafka 消费者

huangapple go评论110阅读模式
英文:

Goroutine Kafka Consumers

问题

我目前有一个程序,它创建了一个大小为1的工作组,然后调用startworker函数:

  1. package main
  2. import (
  3. "db_write_consumer/db"
  4. "db_write_consumer/worker"
  5. "os"
  6. "os/signal"
  7. "syscall"
  8. )
  9. func main() {
  10. sigchan := make(chan os.Signal, 1)
  11. signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
  12. mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
  13. workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
  14. for _, w := range workers {
  15. w_ := w
  16. worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
  17. }
  18. }

其中CreateGroup函数的实现如下:

  1. func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
  2. consumers := []*kafka.Consumer{}
  3. for i := 0; i < numWorkers; i++ {
  4. c := NewWorker(bootstrapServers, groupId)
  5. consumers = append(consumers, c)
  6. }
  7. return consumers
  8. }

而StartWorker函数的实现如下:

  1. func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
  2. _ = c.SubscribeTopics(topics, nil)
  3. fmt.Println(c)
  4. run := true
  5. for run {
  6. select {
  7. case sig := <-sigchan:
  8. fmt.Printf("Caught signal %v: terminating\n", sig)
  9. run = false
  10. default:
  11. ev, _ := c.ReadMessage(100)
  12. if ev == nil {
  13. continue
  14. }
  15. msg := &pb.Person{}
  16. proto.Unmarshal(ev.Value, msg)
  17. WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
  18. if ev.Headers != nil {
  19. fmt.Printf("%% Headers: %v\n", ev.Headers)
  20. }
  21. _, err := c.StoreMessage(ev)
  22. if err != nil {
  23. fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
  24. ev.TopicPartition)
  25. }
  26. }
  27. }
  28. fmt.Printf("Closing consumer\n")
  29. c.Close()
  30. }

这段代码对于工作组大小为1的情况运行良好,但是尝试将其应用于更大的工作组大小时失败了。我已经了解到,我需要将context.WithCancel(context.Background())传递给worker函数,但是我对如何设置waitgroup或goroutine来实际完成这项工作感到困惑。

英文:

I currently have a program that creates a workergroup of size 1, which then calls startworker:

  1. package main
  2. import (
  3. &quot;db_write_consumer/db&quot;
  4. &quot;db_write_consumer/worker&quot;
  5. &quot;os&quot;
  6. &quot;os/signal&quot;
  7. &quot;syscall&quot;
  8. )
  9. func main() {
  10. sigchan := make(chan os.Signal, 1)
  11. signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
  12. mySQLClient, _ := db.NewMySQLDBClient(&quot;root&quot;, &quot;&quot;, &quot;localhost&quot;, 3306, &quot;testbase&quot;)
  13. workers := worker.CreateGroup(&quot;localhost:9092&quot;, &quot;testgroup&quot;, 1)
  14. for _, w := range workers {
  15. w_ := w
  16. worker.StartWorker(w_, []string{&quot;test-topic&quot;}, sigchan, mySQLClient)
  17. }
  18. }

where CreateGroup is written:

  1. func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
  2. consumers := []*kafka.Consumer{}
  3. for i := 0; i &lt; numWorkers; i++ {
  4. c := NewWorker(bootstrapServers, groupId)
  5. consumers = append(consumers, c)
  6. }
  7. return consumers
  8. }

and Startworker is written:

  1. func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
  2. _ = c.SubscribeTopics(topics, nil)
  3. fmt.Println(c)
  4. run := true
  5. for run {
  6. select {
  7. case sig := &lt;-sigchan:
  8. fmt.Printf(&quot;Caught signal %v: terminating\n&quot;, sig)
  9. run = false
  10. default:
  11. ev, _ := c.ReadMessage(100)
  12. if ev == nil {
  13. continue
  14. }
  15. msg := &amp;pb.Person{}
  16. proto.Unmarshal(ev.Value, msg)
  17. WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
  18. if ev.Headers != nil {
  19. fmt.Printf(&quot;%% Headers: %v\n&quot;, ev.Headers)
  20. }
  21. _, err := c.StoreMessage(ev)
  22. if err != nil {
  23. fmt.Fprintf(os.Stderr, &quot;%% Error storing offset after message %s:\n&quot;,
  24. ev.TopicPartition)
  25. }
  26. }
  27. }
  28. fmt.Printf(&quot;Closing consumer\n&quot;)
  29. c.Close()
  30. }

this works fine for workergroup size 1, but every attempt to make this work for a larger workergroup size fails--all i've learned so far is that i'll want context.WithCancel(context.Background()) passed down into the worker funcs from main, but i'm lost with how to set up a waitgroup or goroutines to actually do this work

答案1

得分: 1

我理解你的问题是如何使用上下文(而不是sigchan)来管理工作人员的生命周期。最简单的方法是使用signal.NotifyContext - 这会给你一个上下文,当其中一个信号被发送时,上下文会被取消。所以main函数会变成这样:

  1. func main() {
  2. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
  3. defer stop()
  4. mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
  5. workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
  6. var wg sync.WaitGroup
  7. for _, w := range workers {
  8. w_ := w
  9. wg.Add(1)
  10. go func() {
  11. defer wg.Done()
  12. worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
  13. }()
  14. }
  15. wg.Wait()
  16. }

还要注意使用WaitGroup来避免在所有工作人员完成之前main函数退出。StartWorker函数会像这样:

  1. func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
  2. _ = c.SubscribeTopics(topics, nil)
  3. fmt.Println(c)
  4. for {
  5. select {
  6. case <-ctx.Done:
  7. return
  8. default:
  9. ...
英文:

I understand that your question is how to manage lifetime of workers using context (instead of sigchan). Easiest way is to use signal.NotifyContext - this gives you a context which gets cancelled when one of the signals is sent. So the main would become

  1. func main() {
  2. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
  3. defer stop()
  4. mySQLClient, _ := db.NewMySQLDBClient(&quot;root&quot;, &quot;&quot;, &quot;localhost&quot;, 3306, &quot;testbase&quot;)
  5. workers := worker.CreateGroup(&quot;localhost:9092&quot;, &quot;testgroup&quot;, 1)
  6. var wg sync.WaitGroup
  7. for _, w := range workers {
  8. w_ := w
  9. wg.Add(1)
  10. go func() {
  11. defer wg.Done()
  12. worker.StartWorker(ctx, w_, []string{&quot;test-topic&quot;}, mySQLClient)
  13. }()
  14. }
  15. wg.Wait()
  16. }

Note also the use of WaitGroup to avoid the main exiting before all the workers finish. And StartWorker would be like

  1. func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
  2. _ = c.SubscribeTopics(topics, nil)
  3. fmt.Println(c)
  4. for {
  5. select {
  6. case &lt;-ctx.Done:
  7. return
  8. default:
  9. ...

huangapple
  • 本文由 发表于 2022年12月30日 05:44:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/74956308.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定