如何确保在程序终止时,一个生成的 Go 协程能够完成对数组的处理?

huangapple go评论63阅读模式

How can I ensure that a spawned go routine finishes processing an array on program terminiation



package main

import (
	_ "time/tzdata"


type ChannelDetails struct {
	ChannelDetails MsgChannel
	LastUsed       time.Time
	Active         bool
	Queue          []OutputMessage

type OutputMessage struct {
	Config  MsgConfig `json:"config"`
	Message string    `json:"message"`

type MsgConfig struct {
	Channel MsgChannel `json:"channel"`

type MsgChannel struct {
	Id      int    `json:"id"`
	MntDate string `json:"mntDate"`
	Otype   string `json:"oType"`

var channels []ChannelDetails

func checkQueueDepths() int {
	var depth int = 0
	for _, c := range channels {
		depth += len(c.Queue)
	return depth

func TimeIn(t time.Time, name string) (time.Time, error) {
	loc, err := time.LoadLocation(name)
	if err == nil {
		t = t.In(loc)
	return t, err

func find(channel *MsgChannel) int {
	for i, c := range channels {
		if c.ChannelDetails.Id == channel.Id &&
			c.ChannelDetails.MntDate == channel.MntDate {
			return i
	return len(channels)

func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
	ret = make([]OutputMessage, len(queue)-count)
	deleted = make([]OutputMessage, count)
	copy(deleted, queue[0:count])
	copy(ret, queue[:0])
	copy(ret[0:], queue[0+count:])

func load(msg OutputMessage, logger *zap.Logger) {

	i := find(&msg.Config.Channel)

	if i == len(channels) {
		channels = append(channels, ChannelDetails{
			ChannelDetails: msg.Config.Channel,
			LastUsed:       time.Now(),
			Active:         false,
			Queue:          make([]OutputMessage, 0, 200),
	channels[i].LastUsed = time.Now()
	channels[i].Queue = append(channels[i].Queue, msg)
	if !channels[i].Active {
		channels[i].Active = true
		go process(&channels[i], logger)

func process(data *ChannelDetails, logger *zap.Logger) {
	for {
		// if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
		if len(data.Queue) == 0 &&
			time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
			data.Active = false
			logger.Info("deactivating routine as queue is empty")

		// if Queue has records, process
		if len(data.Queue) != 0 {
			drainStart, _ := TimeIn(time.Now(), "America/New_York")
			spliceCnt := len(data.Queue)
			if spliceCnt > 100 {
				spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
			items := []OutputMessage{}
			data.Queue, items = splice(data.Queue, spliceCnt)
			//process items ... will send array of items to a rest endpoint in another go routine
			drainEnd, _ := TimeIn(time.Now(), "America/New_York")
			logger.Info("processing records",
				zap.Int("numitems", len(items)),
				zap.String("start", drainStart.Format("2006-01-02T15:04:05.000-07:00")),
				zap.String("end", drainEnd.Format("2006-01-02T15:04:05.000-07:00")),


		time.Sleep(time.Millisecond * time.Duration(500))

func initZapLog() *zap.Logger {
	config := zap.NewProductionConfig()
	config.EncoderConfig.TimeKey = "timestamp"
	config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
	logger, _ := config.Build()
	return logger

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	logger := initZapLog()
	defer logger.Sync()

	test1 := `{
		"config": {
			"channel": {
				"id": 1,
				"mntDate": "2021-12-01",
				"oType": "test1"
		"message": "test message1"
	test2 := `{
		"config": {
			"channel": {
				"id": 2,
				"mntDate": "2021-12-01",
				"oType": "test2"
		"message": "test message2"
	var testMsg1 OutputMessage
	err := json.Unmarshal([]byte(test1), &testMsg1)
	if err != nil {
		logger.Panic("unable to unmarshall test1 data " + err.Error())
	var testMsg2 OutputMessage
	err = json.Unmarshal([]byte(test2), &testMsg2)
	if err != nil {
		logger.Panic("unable to unmarshall test2 data " + err.Error())

	exitCh := make(chan struct{})
	go func(ctx context.Context) {
		for {
			//original data is streamed from kafka
			load(testMsg1, logger)
			load(testMsg2, logger)

			time.Sleep(time.Millisecond * time.Duration(5))
			select {
			case <-ctx.Done():
				logger.Info("received done")
				var depthChk int
				for {
					depthChk = checkQueueDepths()
					if depthChk == 0 {
					} else {
						logger.Info("Still processing queues.  Msgs left: " + strconv.Itoa(depthChk))
					time.Sleep(100 * time.Millisecond)
				exitCh <- struct{}{}

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		depths := checkQueueDepths()
		logger.Info("You pressed ctrl + C. Queue depth is: " + strconv.Itoa(depths))


{"level":"info","timestamp":"2021-12-28T15:26:06.136-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":91,"start":"2021-12-28T15:26:06.136-05:00","end":"2021-12-28T15:26:06.136-05:00"}
{"level":"info","timestamp":"2021-12-28T15:26:06.636-0500","caller":"testgo/main.go:116","msg":"processing records","numitems":92,"start":"2021-12-28T15:26:06.636-05:00","end":"2021-12-28T15:26:06.636-05:00"}
^C{"level":"info","timestamp":"2021-12-28T15:26:06.780-0500","caller":"testgo/main.go:205","msg":"You pressed ctrl + C. Queue depth is: 2442"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:182","msg":"received done"}
{"level":"info","timestamp":"2021-12-28T15:26:06.783-0500","caller":"testgo/main.go:189","msg":"Still processing queues.  Msgs left: 2442"} --行无限重复

I am processing records from a kafka topic. The endpoint I need to send these records to supports sending an array of up to 100 records. the kafka records also contains information for performing the rest call (currently only 1 to 2 variations, but this will increase as the number of different record types are processed). I am currently loading a struct array of the unique configs when they are found, and each of these configs have their own queue array. For each config, I spawn a new go routine that will process any records in its queue on a timer (for example 100ms). This process works just fine currently. The issue I am having is when the program shuts down. I do not want to leave any unsent records in the queue and want to finish processing them before app shuts down. The below current code handles the interrupt and starts checking the queue depths, but once the interrupt happens, the queue count does not ever decrease, so the program will never terminate. Any thoughts would be appreciated.

package main
import (
_ &quot;time/tzdata&quot;
type ChannelDetails struct {
ChannelDetails MsgChannel
LastUsed       time.Time
Active         bool
Queue          []OutputMessage
type OutputMessage struct {
Config  MsgConfig `json:&quot;config&quot;`
Message string    `json:&quot;message&quot;`
type MsgConfig struct {
Channel MsgChannel `json:&quot;channel&quot;`
type MsgChannel struct {
Id      int    `json:&quot;id&quot;`
MntDate string `json:&quot;mntDate&quot;`
Otype   string `json:&quot;oType&quot;`
var channels []ChannelDetails
func checkQueueDepths() int {
var depth int = 0
for _, c := range channels {
depth += len(c.Queue)
return depth
func TimeIn(t time.Time, name string) (time.Time, error) {
loc, err := time.LoadLocation(name)
if err == nil {
t = t.In(loc)
return t, err
func find(channel *MsgChannel) int {
for i, c := range channels {
if c.ChannelDetails.Id == channel.Id &amp;&amp;
c.ChannelDetails.MntDate == channel.MntDate {
return i
return len(channels)
func splice(queue []OutputMessage, count int) (ret []OutputMessage, deleted []OutputMessage) {
ret = make([]OutputMessage, len(queue)-count)
deleted = make([]OutputMessage, count)
copy(deleted, queue[0:count])
copy(ret, queue[:0])
copy(ret[0:], queue[0+count:])
func load(msg OutputMessage, logger *zap.Logger) {
i := find(&amp;msg.Config.Channel)
if i == len(channels) {
channels = append(channels, ChannelDetails{
ChannelDetails: msg.Config.Channel,
LastUsed:       time.Now(),
Active:         false,
Queue:          make([]OutputMessage, 0, 200),
channels[i].LastUsed = time.Now()
channels[i].Queue = append(channels[i].Queue, msg)
if !channels[i].Active {
channels[i].Active = true
go process(&amp;channels[i], logger)
func process(data *ChannelDetails, logger *zap.Logger) {
for {
// if Queue is empty and not used for 5 minutes, flag as inActive and shut down go routine
if len(data.Queue) == 0 &amp;&amp;
time.Now().After(data.LastUsed.Add(time.Second*10)) { //reduced for example
data.Active = false
logger.Info(&quot;deactivating routine as queue is empty&quot;)
// if Queue has records, process
if len(data.Queue) != 0 {
drainStart, _ := TimeIn(time.Now(), &quot;America/New_York&quot;)
spliceCnt := len(data.Queue)
if spliceCnt &gt; 100 {
spliceCnt = 100 // rest api endpoint can only accept array up to 100 items
items := []OutputMessage{}
data.Queue, items = splice(data.Queue, spliceCnt)
//process items ... will send array of items to a rest endpoint in another go routine
drainEnd, _ := TimeIn(time.Now(), &quot;America/New_York&quot;)
logger.Info(&quot;processing records&quot;,
zap.Int(&quot;numitems&quot;, len(items)),
zap.String(&quot;start&quot;, drainStart.Format(&quot;2006-01-02T15:04:05.000-07:00&quot;)),
zap.String(&quot;end&quot;, drainEnd.Format(&quot;2006-01-02T15:04:05.000-07:00&quot;)),
time.Sleep(time.Millisecond * time.Duration(500))
func initZapLog() *zap.Logger {
config := zap.NewProductionConfig()
config.EncoderConfig.TimeKey = &quot;timestamp&quot;
config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
logger, _ := config.Build()
return logger
func main() {
ctx, cancel := context.WithCancel(context.Background())
logger := initZapLog()
defer logger.Sync()
test1 := `{
&quot;config&quot;: {
&quot;channel&quot;: {
&quot;id&quot;: 1,
&quot;mntDate&quot;: &quot;2021-12-01&quot;,
&quot;oType&quot;: &quot;test1&quot;
&quot;message&quot;: &quot;test message1&quot;
test2 := `{
&quot;config&quot;: {
&quot;channel&quot;: {
&quot;id&quot;: 2,
&quot;mntDate&quot;: &quot;2021-12-01&quot;,
&quot;oType&quot;: &quot;test2&quot;
&quot;message&quot;: &quot;test message2&quot;
var testMsg1 OutputMessage
err := json.Unmarshal([]byte(test1), &amp;testMsg1)
if err != nil {
logger.Panic(&quot;unable to unmarshall test1 data &quot; + err.Error())
var testMsg2 OutputMessage
err = json.Unmarshal([]byte(test2), &amp;testMsg2)
if err != nil {
logger.Panic(&quot;unable to unmarshall test2 data &quot; + err.Error())
exitCh := make(chan struct{})
go func(ctx context.Context) {
for {
//original data is streamed from kafka
load(testMsg1, logger)
load(testMsg2, logger)
time.Sleep(time.Millisecond * time.Duration(5))
select {
case &lt;-ctx.Done():
logger.Info(&quot;received done&quot;)
var depthChk int
for {
depthChk = checkQueueDepths()
if depthChk == 0 {
} else {
logger.Info(&quot;Still processing queues.  Msgs left: &quot; + strconv.Itoa(depthChk))
time.Sleep(100 * time.Millisecond)
exitCh &lt;- struct{}{}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
go func() {
depths := checkQueueDepths()
logger.Info(&quot;You pressed ctrl + C. Queue depth is: &quot; + strconv.Itoa(depths))

example logs:

{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.136-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:116&quot;,&quot;msg&quot;:&quot;processing records&quot;,&quot;numitems&quot;:91,&quot;start&quot;:&quot;2021-12-28T15:26:06.136-05:00&quot;,&quot;end&quot;:&quot;2021-12-28T15:26:06.136-05:00&quot;}
{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.636-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:116&quot;,&quot;msg&quot;:&quot;processing records&quot;,&quot;numitems&quot;:92,&quot;start&quot;:&quot;2021-12-28T15:26:06.636-05:00&quot;,&quot;end&quot;:&quot;2021-12-28T15:26:06.636-05:00&quot;}
^C{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.780-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:205&quot;,&quot;msg&quot;:&quot;You pressed ctrl + C. Queue depth is: 2442&quot;}
{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.783-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:182&quot;,&quot;msg&quot;:&quot;received done&quot;}
{&quot;level&quot;:&quot;info&quot;,&quot;timestamp&quot;:&quot;2021-12-28T15:26:06.783-0500&quot;,&quot;caller&quot;:&quot;testgo/main.go:189&quot;,&quot;msg&quot;:&quot;Still processing queues.  Msgs left: 2442&quot;} --line repeats forever


得分: 1

sync golang包(https://pkg.go.dev/sync)中有一个Wait组类型,它允许你在主例程返回之前等待一组goroutine完成。



The sync golang package https://pkg.go.dev/sync has the Wait group type that allows you to wait for a group of go routines to complete before the main routine returns.

The best usage example is in this blog post:


得分: 0

等待所有从主 goroutine 中生成的 goroutine 完成有两种方法。最简单的方法是在主 goroutine 的末尾添加以下代码:


<-exitCh 之后。


“从主 goroutine 中调用 Goexit 会终止该 goroutine,而不会使 func main 返回。由于 func main 没有返回,程序会继续执行其他 goroutine。如果所有其他 goroutine 退出,程序将崩溃。”

另一种方法是使用 waitgroup,可以将 waitgroup 视为一个计数器,其中有一个方法会在调用该方法的行上“等待”,直到计数器达到零:

var wg sync.WaitGroup // 声明 waitgroup

然后在每个要等待的 goroutine 内部添加/增加 waitgroup:

wg.Add() // 通常为每个生成的 goroutine 调用此方法

当你想要声明 goroutine 完成工作时,调用:

wg.Done() // 当你认为生成的例程已完成时调用此方法



wg.Wait() // 在此处等待,直到计数器达到零

代码将阻塞,直到使用 Add() 计数的 goroutine 数量,并使用 Done() 递减的数量达到零。


To 'wait' for all spawned goroutines from inside the main goroutine to finish, there's 2 ways to do this. The most simple would be to add a


to the end of your main goroutine, after <-exitCh

Simply, it does this:

"Calling Goexit from the main goroutine terminates that goroutine without func main returning. Since func main has not returned, the program continues execution of other goroutines. If all other goroutines exit, the program crashes."

The other way would be to use a waitgroup, think of a waitgroup as a counter, with a method where the program will 'wait' on the line where the method is called till the counter hits zero:

var wg sync.WaitGroup // declare the waitgroup

Then inside each goroutine that you are to wait on, you add/increment the waitgroup:

wg.Add() // you typically call this for each spawned goroutine

Then when you want to state that the goroutine has finished work, you call

wg.Done() // when you consider the spawned routine to be done call this

Which decrements the counter

Then where you want the code to 'wait' till the counter is zero, you add line:

wg.Wait() // wait here till counter hits zero

And the code will block till the number goroutines that are counted with Add() and decremented with Done() hits zero

  • 本文由 发表于 2021年12月29日 22:55:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/70521075.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
