英文:
How to determine which goroutine is blocking execution?
问题
我有一个小的解析器,将找到的数据写入Postgres数据库,作为数据库框架,我使用https://github.com/jackc/pgx。
我将解析后的数据写入来自各个goroutine的无缓冲通道。
我有一个特殊的goroutine,从该通道读取数据并将其写入数据库。
我正在调试一个应用程序,有时在此之后它会永远挂起(可能是等待数据库池中的空闲连接)。
如何确定哪个goroutine阻塞了执行?
我听说有一个pprof,但我从未使用过。
谢谢。
最小示例:
我有一个类似这样的结构体:
ParsingResults struct {
parser DataParser
data []*common.Data
err error
}
在单独的goroutine中,我像这样初始化无缓冲通道:
results = make(chan *ParsingResults)
然后我启动各种goroutine,在其中运行解析器:
go fetcher.Parse(results)
每个解析器收集数据并将其传递到通道,像这样:
var (
results chan<- *ParsingResults
pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
return
}
time.Sleep(p.provider.DelayBetweenPages)
然后在单独的goroutine中启动这样一个函数:
func (fetcher *Fetcher) waitForResults(ctx context.Context) {
for {
select {
case results := <-fetcher.resultsChannel:
provider := results.parser.GetProvider()
if results.err != nil {
common.Logger.Errorw("failed to fetch data from provider",
"provider", provider.Url,
"error", results.err)
continue
}
data := fetcher.removeDuplicates(results.data)
common.Logger.Infow("fetched some data",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
_, err := fetcher.Repo.SaveFetchedData(ctx, data)
if err != nil {
common.Logger.Errorw("failed to save fetched data",
"provider", provider.Url,
"error", err)
continue
}
common.Logger.Infow("fetched data were saved successfully",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
case <-ctx.Done():
return
default:
common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
}
}
}
数据存储在以下函数中的数据库中:
func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
if len(rows) == 0 {
return 0, nil
}
baseQB := sq.Insert(db.DataTableName).
Columns(saveFetchedDataCols...).
PlaceholderFormat(sq.Dollar)
batch := &pgx.Batch{}
for _, p := range rows {
curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
curQuery, curArgs, err := curQB.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to generate SQL query: %w", err)
}
batch.Queue(curQuery, curArgs...)
}
br := repo.pool.SendBatch(ctx, batch)
ct, err := br.Exec()
if err != nil {
return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
}
return ct.RowsAffected(), nil
}
英文:
all.
I have a small parser that writes found data to Postgres, as database framework I use https://github.com/jackc/pgx.
I write parsed data to an unbuffered channel from various goroutines.
I have special goroutine where I read data from this channel and write it to the database.
I'm debugging an application and it hangs forever sometime after (perhaps waiting for a free connection to a database in the pool).
How to determine which goroutine is blocking execution?
I've heard that there is a pprof, but I never used it.
Thanks.
minimal example:
I've struct like this
ParsingResults struct {
parser DataParser
data []*common.Data
err error
}
in separate goroutine I init unbuffered channel like this:
results = make(chan *ParsingResults)
then I start various goroutines, where I run parsers:
go fetcher.Parse(results)
each parser gathers data and passes it to the channel like this:
var (
results chan<- *ParsingResults
pageResults *ParsingResults
)
results <- pageResults
if pageResults.err != nil {
return
}
time.Sleep(p.provider.DelayBetweenPages)
and in a separate goroutine such a function is launched:
func (fetcher *Fetcher) waitForResults(ctx context.Context) {
for {
select {
case results := <-fetcher.resultsChannel:
provider := results.parser.GetProvider()
if results.err != nil {
common.Logger.Errorw("failed to fetch data from provider",
"provider", provider.Url,
"error", results.err)
continue
}
data := fetcher.removeDuplicates(results.data)
common.Logger.Infow("fetched some data",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
_, err := fetcher.Repo.SaveFetchedData(ctx, data)
if err != nil {
common.Logger.Errorw("failed to save fetched data",
"provider", provider.Url,
"error", err)
continue
}
common.Logger.Infow("fetched data were saved successfully",
"provider", provider.Url,
"rows_count", len(results.data),
"unique_rows_count", len(data))
case <-ctx.Done():
return
default:
common.Logger.Infow("for debugging's sake! waiting for some data to arrive!")
}
}
}
the data is stored in the database in this function:
func (repo *Repository) SaveFetchedData(ctx context.Context, rows []*common.Data) (int64, error) {
if len(rows) == 0 {
return 0, nil
}
baseQB := sq.Insert(db.DataTableName).
Columns(saveFetchedDataCols...).
PlaceholderFormat(sq.Dollar)
batch := &pgx.Batch{}
for _, p := range rows {
curQB := baseQB.Values(p.Row1, p.Row2, sq.Expr("NOW()"))
curQuery, curArgs, err := curQB.ToSql()
if err != nil {
return 0, fmt.Errorf("failed to generate SQL query: %w", err)
}
batch.Queue(curQuery, curArgs...)
}
br := repo.pool.SendBatch(ctx, batch)
ct, err := br.Exec()
if err != nil {
return 0, fmt.Errorf("failed to run SQL query batch: %w", err)
}
return ct.RowsAffected(), nil
}
答案1
得分: 1
我在pprof中检查了完整的goroutine堆栈。所以错误是在处理批量请求的结果后,我没有释放连接池中的连接。因此,通过了10个请求,连接池完全填满,执行线程被阻塞。伙计们,你们是最棒的。感谢你们的帮助。
英文:
I checked out full goroutine stack in pprof. So the error was that I did not release the connection from the pool after processing the result of the batch request.
Therefore, 10 requests passed, the pool was completely filled and the execution thread was blocked. Guys, y'all are the best. Thanks for the help.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论