英文:
Processing csv files with a queue using Kafka to insert to postgres database and return status message
问题
我正在寻找一些处理 CSV 文件并使用队列机制(Kafka)将其插入数据库的最佳实践/建议。
这是我将要做的事情:
-
创建一个名为"Service Request"的新 SQL 表,用于存储用户请求的信息,例如:
- RequestID(请求ID)
- Status(状态)
- Payload(负载)
- Response(响应)
如你所见,我有一个名为"status"的字段,用于指示请求是"成功"还是"失败"。
-
当用户上传 CSV 文件时,按照以下流程进行处理:
- 用户提交一个 CSV 文件。
- 验证 CSV 文件,确保它使用了正确的模板。
- 将 CSV 文件上传到 Google Cloud Storage,并在"Service Request"表中创建一个新记录,包括 RequestID 和 Payload(CSV 文件的 URL)。
- 读取 CSV 文件中的所有记录,并将它们作为消息发送到 Kafka 主题(使用 JSON 负载)。
在消费者端:
- 监听主题上的所有传入消息(消费消息)。
- 处理所有的消息。
- 如果出现错误,创建一个 CSV 文件来存储失败的消息原因。
- 如果 RequestID 为 XXX 的所有消息都处理完毕,则更新"status"并将响应设置为包含 CSV 文件中错误列表的文件。
以下是问题:
我如何知道 RequestID 为 XXX 的所有消息都已消费完毕,以便我可以更新"status"?
我正在使用:
Go 语言 + confluent-kafka-go 库。
更新:
经过一些研究,我发现可以通过实现"GroupByKey"来使用 Kafka Stream。在 Go 语言中是否可以实现这一点?我在 confluent-kafka-go 中找不到 Kafka Stream API。
英文:
I looking for some best practices/advice to handle processing CSV file for inserting into the database with a queue mechanism (Kafka)
So here what i will do :
Create a new SQL table Service Request
to store information of the user request like :
RequestID, Status, Payload, Response
as you can see i have field status
to indicate the request is succeed or failed
So here is the flow when user upload CSV File :
- Users submit a CSV file
- Validate the CSV File to make sure it used the correct template
- Upload CSV File to Google Cloud Storage and then create a new record on the table
Service Request
with RequestID and the Payload is URL of CSV File - Read all records on CSV File and send Queue to Kafka topic (with JSON payload)
On the consumer side :
- Listen all incoming Queue of the topic (Consume the Queue)
- Processing all the Queue
- If there is an error create a CSV file to store why this Queue Failed
- If all Queue of the RequestID XXX is finished then updated the
status
and set the response with a CSV file error list
> So here is the question :
How do I know all Queue of the RequestID
> XXX is all consumed and I can update the status
?
I am using :
Go + confluent-kafka-go library
Updates
After doing some research, I discovered that it should used Kafka Stream by implementing GroupByKey
, is that possible to do that in Go ? i cant find the kafka stream api from confluent-kafka-go
答案1
得分: 1
我是一个Kafka的新手,所以也许不是最适合给出建议的人,但我的初步反应是强制按顺序进行消息处理。在生产者端,你可以指示最后一条消息。在消费者端,你可以读取指示器,一旦到达最后一条消息,就可以更新Status
字段。请注意,强制消息顺序可能会对系统吞吐量产生影响。
有一篇有用的阅读材料可以参考:https://medium.com/latentview-data-services/how-to-use-apache-kafka-to-guarantee-message-ordering-ac2d00da6c22
另一种方法是使用Postgres作为分布式锁,并在其中跟踪进度。例如,假设你有一个跟踪表,其中包含列:RequestId
、RecordsProcessed
、RecordsGenerated
。每次消费一条消息时,你可以锁定行或表,并递增RecordsProcessed
列。一旦处理完所有记录,就可以相应地更新Status
。
英文:
I am a Kafka novice, so I may not be the best person to be giving out advice, but my initial reaction would be to force message processing to occur "in order". On the producer side, you'd indicate the last message. On the consumer side, you'd read the indicator and once you reach the last message, you'd update the Status
field. Keep in mind that forcing message order may have implications on system throughput.
Useful reading is available at https://medium.com/latentview-data-services/how-to-use-apache-kafka-to-guarantee-message-ordering-ac2d00da6c22
Another approach would be to use Postgres as a distributed lock and track progress there. For example, let's say you have a tracking table with the columns: RequestId
, RecordsProcessed
, RecordsGenerated
. You'd lock the row or table and increment the RecordsProcessed
column each time you consumed a message. Once you've processed all the records, you'd update the Status
accordingly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论