Goroutine Kafka 消费者

huangapple go评论82阅读模式
英文:

Goroutine Kafka Consumers

问题

我目前有一个程序,它创建了一个大小为1的工作组,然后调用startworker函数:

package main

import (
    "db_write_consumer/db"
    "db_write_consumer/worker"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
    workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
    for _, w := range workers {
        w_ := w
        worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
    }
}

其中CreateGroup函数的实现如下:

func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
    consumers := []*kafka.Consumer{}
    for i := 0; i < numWorkers; i++ {
        c := NewWorker(bootstrapServers, groupId)
        consumers = append(consumers, c)
    }
    return consumers
}

而StartWorker函数的实现如下:

func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    run := true
    for run {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev, _ := c.ReadMessage(100)
            if ev == nil {
                continue
            }
            msg := &pb.Person{}
            proto.Unmarshal(ev.Value, msg)
            WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
            if ev.Headers != nil {
                fmt.Printf("%% Headers: %v\n", ev.Headers)
            }
            _, err := c.StoreMessage(ev)
            if err != nil {
                fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
                    ev.TopicPartition)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}

这段代码对于工作组大小为1的情况运行良好,但是尝试将其应用于更大的工作组大小时失败了。我已经了解到,我需要将context.WithCancel(context.Background())传递给worker函数,但是我对如何设置waitgroup或goroutine来实际完成这项工作感到困惑。

英文:

I currently have a program that creates a workergroup of size 1, which then calls startworker:

package main

import (
    &quot;db_write_consumer/db&quot;
    &quot;db_write_consumer/worker&quot;
    &quot;os&quot;
    &quot;os/signal&quot;
    &quot;syscall&quot;
)

func main() {
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    mySQLClient, _ := db.NewMySQLDBClient(&quot;root&quot;, &quot;&quot;, &quot;localhost&quot;, 3306, &quot;testbase&quot;)
    workers := worker.CreateGroup(&quot;localhost:9092&quot;, &quot;testgroup&quot;, 1)
    for _, w := range workers {
        w_ := w
        worker.StartWorker(w_, []string{&quot;test-topic&quot;}, sigchan, mySQLClient)
    }
}

where CreateGroup is written:

func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
    consumers := []*kafka.Consumer{}
    for i := 0; i &lt; numWorkers; i++ {
        c := NewWorker(bootstrapServers, groupId)
        consumers = append(consumers, c)
    }
    return consumers
}

and Startworker is written:

func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    run := true
    for run {
        select {
        case sig := &lt;-sigchan:
            fmt.Printf(&quot;Caught signal %v: terminating\n&quot;, sig)
            run = false
        default:
            ev, _ := c.ReadMessage(100)
            if ev == nil {
                continue
            }
            msg := &amp;pb.Person{}
            proto.Unmarshal(ev.Value, msg)
            WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
            if ev.Headers != nil {
                fmt.Printf(&quot;%% Headers: %v\n&quot;, ev.Headers)
            }
            _, err := c.StoreMessage(ev)
            if err != nil {
                fmt.Fprintf(os.Stderr, &quot;%% Error storing offset after message %s:\n&quot;,
                    ev.TopicPartition)
            }
        }
    }

    fmt.Printf(&quot;Closing consumer\n&quot;)
    c.Close()
}

this works fine for workergroup size 1, but every attempt to make this work for a larger workergroup size fails--all i've learned so far is that i'll want context.WithCancel(context.Background()) passed down into the worker funcs from main, but i'm lost with how to set up a waitgroup or goroutines to actually do this work

答案1

得分: 1

我理解你的问题是如何使用上下文(而不是sigchan)来管理工作人员的生命周期。最简单的方法是使用signal.NotifyContext - 这会给你一个上下文,当其中一个信号被发送时,上下文会被取消。所以main函数会变成这样:

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
    workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
    var wg sync.WaitGroup
    for _, w := range workers {
        w_ := w
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
        }()
    }
    wg.Wait()
}

还要注意使用WaitGroup来避免在所有工作人员完成之前main函数退出。StartWorker函数会像这样:

func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    for {
        select {
        case <-ctx.Done:
            return
        default:
        ...
英文:

I understand that your question is how to manage lifetime of workers using context (instead of sigchan). Easiest way is to use signal.NotifyContext - this gives you a context which gets cancelled when one of the signals is sent. So the main would become

func main() {
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

    mySQLClient, _ := db.NewMySQLDBClient(&quot;root&quot;, &quot;&quot;, &quot;localhost&quot;, 3306, &quot;testbase&quot;)
    workers := worker.CreateGroup(&quot;localhost:9092&quot;, &quot;testgroup&quot;, 1)
    var wg sync.WaitGroup
    for _, w := range workers {
        w_ := w
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker.StartWorker(ctx, w_, []string{&quot;test-topic&quot;}, mySQLClient)
        }()
    }
    wg.Wait()
}

Note also the use of WaitGroup to avoid the main exiting before all the workers finish. And StartWorker would be like

func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    for {
        select {
        case &lt;-ctx.Done:
            return
        default:
        ...

huangapple
  • 本文由 发表于 2022年12月30日 05:44:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/74956308.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定