英文:
Goroutine Kafka Consumers
问题
我目前有一个程序,它创建了一个大小为1的工作组,然后调用startworker函数:
package main
import (
"db_write_consumer/db"
"db_write_consumer/worker"
"os"
"os/signal"
"syscall"
)
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
for _, w := range workers {
w_ := w
worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
}
}
其中CreateGroup函数的实现如下:
func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
consumers := []*kafka.Consumer{}
for i := 0; i < numWorkers; i++ {
c := NewWorker(bootstrapServers, groupId)
consumers = append(consumers, c)
}
return consumers
}
而StartWorker函数的实现如下:
func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev, _ := c.ReadMessage(100)
if ev == nil {
continue
}
msg := &pb.Person{}
proto.Unmarshal(ev.Value, msg)
WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
if ev.Headers != nil {
fmt.Printf("%% Headers: %v\n", ev.Headers)
}
_, err := c.StoreMessage(ev)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
ev.TopicPartition)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
这段代码对于工作组大小为1的情况运行良好,但是尝试将其应用于更大的工作组大小时失败了。我已经了解到,我需要将context.WithCancel(context.Background())
传递给worker函数,但是我对如何设置waitgroup或goroutine来实际完成这项工作感到困惑。
英文:
I currently have a program that creates a workergroup of size 1, which then calls startworker:
package main
import (
"db_write_consumer/db"
"db_write_consumer/worker"
"os"
"os/signal"
"syscall"
)
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
for _, w := range workers {
w_ := w
worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
}
}
where CreateGroup is written:
func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
consumers := []*kafka.Consumer{}
for i := 0; i < numWorkers; i++ {
c := NewWorker(bootstrapServers, groupId)
consumers = append(consumers, c)
}
return consumers
}
and Startworker is written:
func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev, _ := c.ReadMessage(100)
if ev == nil {
continue
}
msg := &pb.Person{}
proto.Unmarshal(ev.Value, msg)
WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
if ev.Headers != nil {
fmt.Printf("%% Headers: %v\n", ev.Headers)
}
_, err := c.StoreMessage(ev)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
ev.TopicPartition)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
this works fine for workergroup size 1, but every attempt to make this work for a larger workergroup size fails--all i've learned so far is that i'll want context.WithCancel(context.Background())
passed down into the worker funcs from main, but i'm lost with how to set up a waitgroup or goroutines to actually do this work
答案1
得分: 1
我理解你的问题是如何使用上下文(而不是sigchan
)来管理工作人员的生命周期。最简单的方法是使用signal.NotifyContext - 这会给你一个上下文,当其中一个信号被发送时,上下文会被取消。所以main
函数会变成这样:
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
var wg sync.WaitGroup
for _, w := range workers {
w_ := w
wg.Add(1)
go func() {
defer wg.Done()
worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
}()
}
wg.Wait()
}
还要注意使用WaitGroup
来避免在所有工作人员完成之前main
函数退出。StartWorker
函数会像这样:
func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
for {
select {
case <-ctx.Done:
return
default:
...
英文:
I understand that your question is how to manage lifetime of workers using context (instead of sigchan
). Easiest way is to use signal.NotifyContext - this gives you a context which gets cancelled when one of the signals is sent. So the main would become
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
var wg sync.WaitGroup
for _, w := range workers {
w_ := w
wg.Add(1)
go func() {
defer wg.Done()
worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
}()
}
wg.Wait()
}
Note also the use of WaitGroup to avoid the main
exiting before all the workers finish. And StartWorker
would be like
func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
for {
select {
case <-ctx.Done:
return
default:
...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论