How to handle errors when reading from Kafka and writing to PostgreSQL in Go?

huangapple go评论81阅读模式
英文:

How to handle errors when reading from Kafka and writing to PostgreSQL in Go?

问题

我正在构建一个Go应用程序,它从Kafka主题中读取消息并将其写入PostgreSQL数据库。

我已经设置了一个循环,使用kafka.Reader从Kafka中读取消息,并使用sql.DB将它们插入数据库。如果在读取消息或将其插入数据库时出现错误,我会记录错误并继续处理下一条消息。

然而,我不确定在将数据插入PostgreSQL数据库后提交Kafka消息时如何处理出现的错误。具体来说,如果手动提交导致错误,我应该怎么办?我应该重试提交操作吗?我应该记录错误并继续处理下一条消息吗?处理这些类型错误的最佳实践是什么?

for {
    kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
    if err != nil {
        fmt.Printf("从Kafka读取消息失败:%s\n", err)
        continue
    }

    _, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
    if err != nil {
        fmt.Printf("将payload插入数据库失败:%s\n", err)
        continue
    }

    // 如果提交操作失败,我该怎么办?
    err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
    if err != nil {
        // 处理这个错误的最佳实践是什么?
    }
}
英文:

I'm building a Go application that reads messages from a Kafka topic and writes them to a PostgreSQL database.

I've set up a loop that reads messages from Kafka using a kafka.Reader and inserts them into the database using a sql.DB. If there's an error reading a message or inserting it into the database, I log the error and continue to the next message.

However, I'm not sure how to handle errors that occur when committing the Kafka message after inserting the data into the PostgreSQL database. Specifically, if the manual commit causes an error, what should I do? Should I retry the commit operation? Should I log the error and continue to the next message? What's the best practice for handling these types of errors?

for {
		kafkaMessage, err := kafkaReader.ReadMessage(context.Background())
		if err != nil {
			fmt.Printf("Failed to read message from Kafka: %s\n", err)
			continue
		}

		_, err = db.Exec("INSERT INTO mytable (payload) VALUES ($1)", kafkaMessage.Value)
		if err != nil {
			fmt.Printf("Failed to insert payload into database: %s\n", err)
			continue
		}

		// What should I do if the commit operation fails?
		err = kafkaReader.CommitMessages(context.Background(), kafkaMessage)
		if err != nil {
			// What's the best practice for handling this error?
		}
	}

答案1

得分: 1

当面对错误时,只需在for循环的下一次迭代中使用continue

如果由于任何原因无法提交Kafka消息,Kafka将在下一次reader.ReadMessage(ctx)中返回相同的消息。

但为了确保你的代码不会无谓地继续执行相同的失败任务多次,耗尽资源,用相同的错误消息淹没日志等,可以在每个错误后使用简单的sleep,或者如果确实需要的话,为你的函数使用断路器逻辑。

if err != nil {
   log.Errorf("...", ...)
   time.Sleep(5 * time.Second)
   continue
}
英文:

When facing an error just continue to the next iteration of the for loop.

If it fails to commit the Kafka message due to any reason, Kafka will return the same message again in the next reader.ReadMessage(ctx).

But to make sure that your code does not continue futilely to do the same failing job many times, exhaust your resource, flooding the logs with the same error messages, etc. use a simple sleep after each error, or if really needed use a circuit breaker logic for your function.

if err != nil {
   log.Errorf("...", ...)
   time.Sleep(5 * time.Second)
   continue
}

答案2

得分: 0

一般来说,业务逻辑应该依赖于数据的最后一层的一致性。对于你的情况,我假设数据将持久化在数据库中,这是唯一重要的事情,那么你应该围绕数据库设计一个一致性模型,找出数据属性并设计一个适当的业务流程,以帮助你保证数据库的最终一致性。

对于你的问题,处理错误并不重要,如果在Kafka中发生了意外情况,你可以忽略它,Kafka应该继续处理下一条消息,如果提交(实际上)成功,则继续前进,如果提交(实际上)失败,则停留在当前偏移量。这就是所谓的“至少一次”传递,因此你的业务逻辑应该正确处理重复的消息。

如果你想要“仅一次”传递,那就更加复杂了,我不建议业务逻辑依赖于仅一次传递。例如,如果在将记录插入数据库后,网络在将响应传输回应用程序时失败了,你会认为是在数据库提交之后还是之前网络丢失了?Kafka可以在你的请求到达Kafka服务器时保证仅一次传递,但在Kafka服务器范围之外,它无法提供太多帮助。

英文:

Generally speaking, business logic should rely on the consistency at the last level of your data. For your case, I assume the data will persist in DB, and this is the only thing that matters, then you should design a consistency model around your DB, find the data properties and design a proper business flow that can help you guarantee the eventual consistency at your DB.

For your question, it really doesn't matter whether you handle the error, there's something unexpected happened at Kafka, you ignore it or what ever, Kafka should continue to the next message if the commit (actually) succeeded, or stay at the current offset if the commit (actually) failed. This is the so-called at-least-once delivery, so you business logic should handle the duplicate message properly.

If you want exactly-once delivery, that's much tricker, and I don't suggest business logic relying on exactly-once delivery. For example, what will happen if the DB inserted the record but network failed when response transmitting back to your application? Will you assume that the network was lost after DB commit or before DB commit? Kafka can guarantee exactly-once delivery when your request arrived at the Kafka server, but outside the Kafka server's scope, it can't help you much.

huangapple
  • 本文由 发表于 2023年7月17日 20:28:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/76704461.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定