如何确定哪个goroutine正在阻塞执行?

huangapple go评论77阅读模式
英文:

How to determine which goroutine is blocking execution?

问题

我有一个小的解析器,将找到的数据写入Postgres数据库,作为数据库框架,我使用https://github.com/jackc/pgx。

我将解析后的数据写入来自各个goroutine的无缓冲通道。

我有一个特殊的goroutine,从该通道读取数据并将其写入数据库。

我正在调试一个应用程序,有时在此之后它会永远挂起(可能是等待数据库池中的空闲连接)。

如何确定哪个goroutine阻塞了执行?

我听说有一个pprof,但我从未使用过。
谢谢。

最小示例:
我有一个类似这样的结构体:

ParsingResults struct {
	parser  DataParser
	data []*common.Data
	err     error
}

在单独的goroutine中,我像这样初始化无缓冲通道:

results = make(chan *ParsingResults)

然后我启动各种goroutine,在其中运行解析器:

go fetcher.Parse(results)

每个解析器收集数据并将其传递到通道,像这样:

var (
	results chan<- *ParsingResults
	pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
	return
}

time.Sleep(p.provider.DelayBetweenPages)

然后在单独的goroutine中启动这样一个函数:

func (fetcher *Fetcher) waitForResults(ctx context.Context) {
	for {
		select {
		case results := <-fetcher.resultsChannel:
			provider := results.parser.GetProvider()
			if results.err != nil {
				common.Logger.Errorw("failed to fetch data from provider",
					"provider", provider.Url,
					"error", results.err)
				continue
			}
			data := fetcher.removeDuplicates(results.data)
			common.Logger.Infow("fetched some data",
				"provider", provider.Url,
				"rows_count", len(results.data),
				"unique_rows_count", len(data))
			_, err := fetcher.Repo.SaveFetchedData(ctx, data)
			if err != nil {
				common.Logger.Errorw("failed to save fetched data",
					"provider", provider.Url,
					"error", err)
				continue
			}
			common.Logger.Infow("fetched data were saved successfully",
				"provider", provider.Url,
				"rows_count", len(results.data),
				"unique_rows_count", len(data))
		case <-ctx.Done():
			return
		default:
			common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
		}
	}
}

数据存储在以下函数中的数据库中:

func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
	if len(rows) == 0 {
		return 0, nil
	}

	baseQB := sq.Insert(db.DataTableName).
		Columns(saveFetchedDataCols...).
		PlaceholderFormat(sq.Dollar)

	batch := &pgx.Batch{}
	for _, p := range rows {
		curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
		curQuery, curArgs, err := curQB.ToSql()

		if err != nil {
			return 0, fmt.Errorf("failed to generate SQL query: %w", err)
		}
		batch.Queue(curQuery, curArgs...)
	}

	br := repo.pool.SendBatch(ctx, batch)
	ct, err := br.Exec()
	if err != nil {
		return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
	}

	return ct.RowsAffected(), nil
}
英文:

all.

I have a small parser that writes found data to Postgres, as database framework I use https://github.com/jackc/pgx.

I write parsed data to an unbuffered channel from various goroutines.

I have special goroutine where I read data from this channel and write it to the database.

I'm debugging an application and it hangs forever sometime after (perhaps waiting for a free connection to a database in the pool).

How to determine which goroutine is blocking execution?

I've heard that there is a pprof, but I never used it.
Thanks.

minimal example:
I've struct like this

ParsingResults struct {
	parser  DataParser
	data []*common.Data
	err     error
}

in separate goroutine I init unbuffered channel like this:

results = make(chan *ParsingResults)

then I start various goroutines, where I run parsers:

go fetcher.Parse(results)

each parser gathers data and passes it to the channel like this:

var (
	results chan<- *ParsingResults
	pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
	return
}

time.Sleep(p.provider.DelayBetweenPages)

and in a separate goroutine such a function is launched:

func (fetcher *Fetcher) waitForResults(ctx context.Context) {
	for {
		select {
		case results := <-fetcher.resultsChannel:
			provider := results.parser.GetProvider()
			if results.err != nil {
				common.Logger.Errorw("failed to fetch data from provider",
					"provider", provider.Url,
					"error", results.err)
				continue
			}
			data := fetcher.removeDuplicates(results.data)
			common.Logger.Infow("fetched some data",
				"provider", provider.Url,
				"rows_count", len(results.data),
				"unique_rows_count", len(data))
			_, err := fetcher.Repo.SaveFetchedData(ctx, data)
			if err != nil {
				common.Logger.Errorw("failed to save fetched data",
					"provider", provider.Url,
					"error", err)
				continue
			}
			common.Logger.Infow("fetched data were saved successfully",
				"provider", provider.Url,
				"rows_count", len(results.data),
				"unique_rows_count", len(data))
		case <-ctx.Done():
			return
		default:
			common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
		}
	}
}

the data is stored in the database in this function:

func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
	if len(rows) == 0 {
		return 0, nil
	}

	baseQB := sq.Insert(db.DataTableName).
		Columns(saveFetchedDataCols...).
		PlaceholderFormat(sq.Dollar)

	batch := &pgx.Batch{}
	for _, p := range rows {
		curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
		curQuery, curArgs, err := curQB.ToSql()

		if err != nil {
			return 0, fmt.Errorf("failed to generate SQL query: %w", err)
		}
		batch.Queue(curQuery, curArgs...)
	}

	br := repo.pool.SendBatch(ctx, batch)
	ct, err := br.Exec()
	if err != nil {
		return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
	}

	return ct.RowsAffected(), nil
}

答案1

得分: 1

我在pprof中检查了完整的goroutine堆栈。所以错误是在处理批量请求的结果后,我没有释放连接池中的连接。因此,通过了10个请求,连接池完全填满,执行线程被阻塞。伙计们,你们是最棒的。感谢你们的帮助。

英文:

I checked out full goroutine stack in pprof. So the error was that I did not release the connection from the pool after processing the result of the batch request.
Therefore, 10 requests passed, the pool was completely filled and the execution thread was blocked. Guys, y'all are the best. Thanks for the help.

huangapple
  • 本文由 发表于 2022年2月26日 15:07:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/71274742.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定