如何确保在程序终止时,一个生成的 Go 协程能够完成对数组的处理?

huangapple go评论63阅读模式
英文:

How can I ensure that a spawned go routine finishes processing an array on program terminiation

问题

我正在处理来自Kafka主题的记录。我需要将这些记录发送到的端点支持发送最多100条记录的数组。Kafka记录还包含执行REST调用所需的信息(目前只有1到2种变体,但随着处理不同记录类型的数量增加,这个数量将增加)。当找到唯一配置时,我当前正在加载一个结构数组,并且每个配置都有自己的队列数组。对于每个配置,我会生成一个新的Go例程,以定时器(例如100毫秒)处理其队列中的任何记录。目前这个过程运行得很好。我遇到的问题是当程序关闭时。我不想在队列中留下任何未发送的记录,并希望在应用程序关闭之前完成处理它们。下面的当前代码处理中断并开始检查队列深度,但一旦发生中断,队列计数就不会减少,因此程序将永远不会终止。希望能得到你的想法。

package main

import (
	"context"
	"encoding/json"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"
	_ "time/tzdata"

	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
)

type ChannelDetails struct {
	ChannelDetails MsgChannel
	LastUsed       time.Time
	Active         bool
	Queue          []OutputMessage
}

type OutputMessage struct {
	Config  MsgConfig `json:"config"`
	Message string    `json:"message"`
}

type MsgConfig struct {
	Channel MsgChannel `json:"channel"`
}

type MsgChannel struct {
	Id      int    `json:"id"`
	MntDate string `json:"mntDate"`
	Otype   string `json:"oType"`
}

var channels []ChannelDetails

func checkQueueDepths() int {
	var depth int = 0
	for _, c := range channels {
		depth += len(c.Queue)
	}
	return depth
}

func TimeIn(t time.Time, name string) (time.Time, error) {
	loc, err := time.LoadLocation(name)
	if err == nil {
		t = t.In(loc)
	}
	return t, err
}

func find(channel *MsgChannel) int {
	for i, c := range channels {
		if c.ChannelDetails.Id == channel.Id &&
			c.ChannelDetails.MntDate == channel.MntDate {
			return i
		}
	}
	return len(channels)
}

func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
	ret = make([]OutputMessage, len(queue)-count)
	deleted = make([]OutputMessage, count)
	copy(deleted, queue[0:count])
	copy(ret, queue[:0])
	copy(ret[0:], queue[0+count:])
	return
}

func load(msg OutputMessage, logger *zap.Logger) {

	i := find(&msg.Config.Channel)

	if i == len(channels) {
		channels = append(channels, ChannelDetails{
			ChannelDetails: msg.Config.Channel,
			LastUsed:       time.Now(),
			Active:         false,
			Queue:          make([]OutputMessage, 0, 200),
		})
	}
	channels[i].LastUsed = time.Now()
	channels[i].Queue = append(channels[i].Queue, msg)
	if !channels[i].Active {
		channels[i].Active = true
		go process(&channels[i], logger)
	}
}

func process(data *ChannelDetails, logger *zap.Logger) {
	for {
		// if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
		if len(data.Queue) == 0 &&
			time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
			data.Active = false
			logger.Info("deactivating routine as queue is empty")
			break
		}

		// if Queue has records, process
		if len(data.Queue) != 0 {
			drainStart, _ := TimeIn(time.Now(), "America/New_York")
			spliceCnt := len(data.Queue)
			if spliceCnt > 100 {
				spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
			}
			items := []OutputMessage{}
			data.Queue, items = splice(data.Queue, spliceCnt)
			//process items ... will send array of items to a rest endpoint in another go routine
			drainEnd, _ := TimeIn(time.Now(), "America/New_York")
			logger.Info("processing records",
				zap.Int("numitems", len(items)),
				zap.String("start", drainStart.Format("2006-01-02T15:04:05.000-07:00")),
				zap.String("end", drainEnd.Format("2006-01-02T15:04:05.000-07:00")),
			)

		}

		time.Sleep(time.Millisecond * time.Duration(500))
	}
}

func initZapLog() *zap.Logger {
	config := zap.NewProductionConfig()
	config.EncoderConfig.TimeKey = "timestamp"
	config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
	logger, _ := config.Build()
	zap.ReplaceGlobals(logger)
	return logger
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	logger := initZapLog()
	defer logger.Sync()

	test1 := `{
		"config": {
			"channel": {
				"id": 1,
				"mntDate": "2021-12-01",
				"oType": "test1"
			}
		},
		"message": "test message1"
	}`
	test2 := `{
		"config": {
			"channel": {
				"id": 2,
				"mntDate": "2021-12-01",
				"oType": "test2"
			}
		},
		"message": "test message2"
	}`
	var testMsg1 OutputMessage
	err := json.Unmarshal([]byte(test1), &testMsg1)
	if err != nil {
		logger.Panic("unable to unmarshall test1 data " + err.Error())
	}
	var testMsg2 OutputMessage
	err = json.Unmarshal([]byte(test2), &testMsg2)
	if err != nil {
		logger.Panic("unable to unmarshall test2 data " + err.Error())
	}

	exitCh := make(chan struct{})
	go func(ctx context.Context) {
		for {
			//original data is streamed from kafka
			load(testMsg1, logger)
			load(testMsg2, logger)

			time.Sleep(time.Millisecond * time.Duration(5))
			select {
			case <-ctx.Done():
				logger.Info("received done")
				var depthChk int
				for {
					depthChk = checkQueueDepths()
					if depthChk == 0 {
						break
					} else {
						logger.Info("Still processing queues.  Msgs left: " + strconv.Itoa(depthChk))
					}
					time.Sleep(100 * time.Millisecond)
				}
				exitCh <- struct{}{}
				return
			default:
			}
		}
	}(ctx)

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		depths := checkQueueDepths()
		logger.Info("You pressed ctrl + C. Queue depth is: " + strconv.Itoa(depths))
		cancel()
	}()
	<-exitCh
}

示例日志:

{"level":"info","timestamp":"2021-12-28T15:26:06.136-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":91,"start":"2021-12-28T15:26:06.136-05:00","end":"2021-12-28T15:26:06.136-05:00"}
{"level":"info","timestamp":"2021-12-28T15:26:06.636-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":92,"start":"2021-12-28T15:26:06.636-05:00","end":"2021-12-28T15:26:06.636-05:00"}
^C{"level":"info","timestamp":"2021-12-28T15:26:06.780-0500","caller":"testgo/main.go:205","msg":"You pressed ctrl + C. Queue depth is: 2442"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:182","msg":"received done"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:189","msg":"Still processing queues.  Msgs left: 2442"} --行无限重复
英文:

I am processing records from a kafka topic. The endpoint I need to send these records to supports sending an array of up to 100 records. the kafka records also contains information for performing the rest call (currently only 1 to 2 variations, but this will increase as the number of different record types are processed). I am currently loading a struct array of the unique configs when they are found, and each of these configs have their own queue array. For each config, I spawn a new go routine that will process any records in its queue on a timer (for example 100ms). This process works just fine currently. The issue I am having is when the program shuts down. I do not want to leave any unsent records in the queue and want to finish processing them before app shuts down. The below current code handles the interrupt and starts checking the queue depths, but once the interrupt happens, the queue count does not ever decrease, so the program will never terminate. Any thoughts would be appreciated.

package main
import (
&quot;context&quot;
&quot;encoding/json&quot;
&quot;os&quot;
&quot;os/signal&quot;
&quot;strconv&quot;
&quot;syscall&quot;
&quot;time&quot;
_ &quot;time/tzdata&quot;
&quot;go.uber.org/zap&quot;
&quot;go.uber.org/zap/zapcore&quot;
)
type ChannelDetails struct {
ChannelDetails MsgChannel
LastUsed       time.Time
Active         bool
Queue          []OutputMessage
}
type OutputMessage struct {
Config  MsgConfig `json:&quot;config&quot;`
Message string    `json:&quot;message&quot;`
}
type MsgConfig struct {
Channel MsgChannel `json:&quot;channel&quot;`
}
type MsgChannel struct {
Id      int    `json:&quot;id&quot;`
MntDate string `json:&quot;mntDate&quot;`
Otype   string `json:&quot;oType&quot;`
}
var channels []ChannelDetails
func checkQueueDepths() int {
var depth int = 0
for _, c := range channels {
depth += len(c.Queue)
}
return depth
}
func TimeIn(t time.Time, name string) (time.Time, error) {
loc, err := time.LoadLocation(name)
if err == nil {
t = t.In(loc)
}
return t, err
}
func find(channel *MsgChannel) int {
for i, c := range channels {
if c.ChannelDetails.Id == channel.Id &amp;&amp;
c.ChannelDetails.MntDate == channel.MntDate {
return i
}
}
return len(channels)
}
func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
ret = make([]OutputMessage, len(queue)-count)
deleted = make([]OutputMessage, count)
copy(deleted, queue[0:count])
copy(ret, queue[:0])
copy(ret[0:], queue[0+count:])
return
}
func load(msg OutputMessage, logger *zap.Logger) {
i := find(&amp;msg.Config.Channel)
if i == len(channels) {
channels = append(channels, ChannelDetails{
ChannelDetails: msg.Config.Channel,
LastUsed:       time.Now(),
Active:         false,
Queue:          make([]OutputMessage, 0, 200),
})
}
channels[i].LastUsed = time.Now()
channels[i].Queue = append(channels[i].Queue, msg)
if !channels[i].Active {
channels[i].Active = true
go process(&amp;channels[i], logger)
}
}
func process(data *ChannelDetails, logger *zap.Logger) {
for {
// if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
if len(data.Queue) == 0 &amp;&amp;
time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
data.Active = false
logger.Info(&quot;deactivating routine as queue is empty&quot;)
break
}
// if Queue has records, process
if len(data.Queue) != 0 {
drainStart, _ := TimeIn(time.Now(), &quot;America/New_York&quot;)
spliceCnt := len(data.Queue)
if spliceCnt &gt; 100 {
spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
}
items := []OutputMessage{}
data.Queue, items = splice(data.Queue, spliceCnt)
//process items ... will send array of items to a rest endpoint in another go routine
drainEnd, _ := TimeIn(time.Now(), &quot;America/New_York&quot;)
logger.Info(&quot;processing records&quot;,
zap.Int(&quot;numitems&quot;, len(items)),
zap.String(&quot;start&quot;, drainStart.Format(&quot;2006-01-02T15:04:05.000-07:00&quot;)),
zap.String(&quot;end&quot;, drainEnd.Format(&quot;2006-01-02T15:04:05.000-07:00&quot;)),
)
}
time.Sleep(time.Millisecond * time.Duration(500))
}
}
func initZapLog() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.TimeKey = &quot;timestamp&quot;
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, _ := config.Build()
zap.ReplaceGlobals(logger)
return logger
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
logger := initZapLog()
defer logger.Sync()
test1 := `{
&quot;config&quot;: {
&quot;channel&quot;: {
&quot;id&quot;: 1,
&quot;mntDate&quot;: &quot;2021-12-01&quot;,
&quot;oType&quot;: &quot;test1&quot;
}
},
&quot;message&quot;: &quot;test message1&quot;
}`
test2 := `{
&quot;config&quot;: {
&quot;channel&quot;: {
&quot;id&quot;: 2,
&quot;mntDate&quot;: &quot;2021-12-01&quot;,
&quot;oType&quot;: &quot;test2&quot;
}
},
&quot;message&quot;: &quot;test message2&quot;
}`
var testMsg1 OutputMessage
err := json.Unmarshal([]byte(test1), &amp;testMsg1)
if err != nil {
logger.Panic(&quot;unable to unmarshall test1 data &quot; + err.Error())
}
var testMsg2 OutputMessage
err = json.Unmarshal([]byte(test2), &amp;testMsg2)
if err != nil {
logger.Panic(&quot;unable to unmarshall test2 data &quot; + err.Error())
}
exitCh := make(chan struct{})
go func(ctx context.Context) {
for {
//original data is streamed from kafka
load(testMsg1, logger)
load(testMsg2, logger)
time.Sleep(time.Millisecond * time.Duration(5))
select {
case &lt;-ctx.Done():
logger.Info(&quot;received done&quot;)
var depthChk int
for {
depthChk = checkQueueDepths()
if depthChk == 0 {
break
} else {
logger.Info(&quot;Still processing queues.  Msgs left: &quot; + strconv.Itoa(depthChk))
}
time.Sleep(100 * time.Millisecond)
}
exitCh &lt;- struct{}{}
return
default:
}
}
}(ctx)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
&lt;-sigs
depths := checkQueueDepths()
logger.Info(&quot;You pressed ctrl + C. Queue depth is: &quot; + strconv.Itoa(depths))
cancel()
}()
&lt;-exitCh
}

example logs:

{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.136-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:116&quot;,&quot;msg&quot;:&quot;processing records&quot;,&quot;numitems&quot;:91,&quot;start&quot;:&quot;2021-12-28T15:26:06.136-05:00&quot;,&quot;end&quot;:&quot;2021-12-28T15:26:06.136-05:00&quot;}
{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.636-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:116&quot;,&quot;msg&quot;:&quot;processing records&quot;,&quot;numitems&quot;:92,&quot;start&quot;:&quot;2021-12-28T15:26:06.636-05:00&quot;,&quot;end&quot;:&quot;2021-12-28T15:26:06.636-05:00&quot;}
^C{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.780-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:205&quot;,&quot;msg&quot;:&quot;You pressed ctrl + C. Queue depth is: 2442&quot;}
{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.783-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:182&quot;,&quot;msg&quot;:&quot;received done&quot;}
{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.783-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:189&quot;,&quot;msg&quot;:&quot;Still processing queues.  Msgs left: 2442&quot;} --line repeats forever

答案1

得分: 1

sync golang包(https://pkg.go.dev/sync)中有一个Wait组类型,它允许你在主例程返回之前等待一组goroutine完成。

最佳使用示例在这篇博文中:
https://go.dev/blog/pipelines

英文:

The sync golang package https://pkg.go.dev/sync has the Wait group type that allows you to wait for a group of go routines to complete before the main routine returns.

The best usage example is in this blog post:
https://go.dev/blog/pipelines

答案2

得分: 0

等待所有从主 goroutine 中生成的 goroutine 完成有两种方法。最简单的方法是在主 goroutine 的末尾添加以下代码:

runtime.Goexit()

<-exitCh 之后。

简单来说,这样做的作用是:

“从主 goroutine 中调用 Goexit 会终止该 goroutine,而不会使 func main 返回。由于 func main 没有返回,程序会继续执行其他 goroutine。如果所有其他 goroutine 退出,程序将崩溃。”

另一种方法是使用 waitgroup,可以将 waitgroup 视为一个计数器,其中有一个方法会在调用该方法的行上“等待”,直到计数器达到零:

var wg sync.WaitGroup // 声明 waitgroup

然后在每个要等待的 goroutine 内部添加/增加 waitgroup:

wg.Add() // 通常为每个生成的 goroutine 调用此方法

当你想要声明 goroutine 完成工作时,调用:

wg.Done() // 当你认为生成的例程已完成时调用此方法

这会递减计数器。

然后,在你希望代码“等待”直到计数器为零的位置,添加以下代码行:

wg.Wait() // 在此处等待,直到计数器达到零

代码将阻塞,直到使用 Add() 计数的 goroutine 数量,并使用 Done() 递减的数量达到零。

英文:

To 'wait' for all spawned goroutines from inside the main goroutine to finish, there's 2 ways to do this. The most simple would be to add a

runtime.Goexit() 

to the end of your main goroutine, after <-exitCh

Simply, it does this:

"Calling Goexit from the main goroutine terminates that goroutine without func main returning. Since func main has not returned, the program continues execution of other goroutines. If all other goroutines exit, the program crashes."

The other way would be to use a waitgroup, think of a waitgroup as a counter, with a method where the program will 'wait' on the line where the method is called till the counter hits zero:

var wg sync.WaitGroup // declare the waitgroup

Then inside each goroutine that you are to wait on, you add/increment the waitgroup:

wg.Add() // you typically call this for each spawned goroutine

Then when you want to state that the goroutine has finished work, you call

wg.Done() // when you consider the spawned routine to be done call this

Which decrements the counter

Then where you want the code to 'wait' till the counter is zero, you add line:

wg.Wait() // wait here till counter hits zero

And the code will block till the number goroutines that are counted with Add() and decremented with Done() hits zero

huangapple
  • 本文由 发表于 2021年12月29日 22:55:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/70521075.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定