英文:
Get resume token with MongoDB Java driver before first document received in ChangeStream?
问题
这个问题类似于 https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change,但适用于Java驱动程序。据我所知,这对于确保所有文档至少被处理一次非常关键。
例如,假设我有一个变更流(C),它订阅文档并根据每个文档的内容发送电子邮件。但如果电子邮件发送失败或服务器在电子邮件发送之前崩溃,那么恢复令牌(R)将不会被持久化。当应用程序再次启动时,它将“监视”而没有恢复令牌,因此文档将被遗漏,没有电子邮件被发送。
是否有一种支持的方法可以在收到第一个更改文档之前获取ChangeStream的恢复令牌,以减轻上述问题的描述?
从我所了解的MongoDB规范中,驱动程序必须公开一种机制,以检索与自动恢复使用的相同的恢复令牌。
但是,我似乎找不到使用Java API来执行此操作的方法。这是否可能,或者是否有推荐的解决方法?
请注意,我非常希望不要使用基于时间戳的startAtOperationTime
,因为时间是脆弱的,服务器和客户端的时钟可能会发生变化。
英文:
This question is similar to https://stackoverflow.com/questions/48665409/how-do-i-resume-a-mongodb-changestream-at-the-first-document-and-not-just-change but for the Java driver. This is crucial, afaik, if one needs to make sure that all documents are processed at least once.
For example, let's say that I have a change stream (C) that subscribes to documents and sends an email based on the contents of each document. But if the email sending fails or the server crashes before the email could be sent then the resume token (R) will not have been persisted. When the application is started up again it'll "watch" without a resume token and thus the document will be missed and no email sent.
Is there a supported way to get the resume token of a ChangeStream before the first change document has been received to mitigate the issue described above?
From what I can tell from the MongoDB specification this must be supported by drivers:
> Drivers MUST expose a mechanism to retrieve the same resume token that would be used to automatically resume.
But I cannot seem to find a way to do this using the Java API. Is this possible or is there a recommended workaround?
Note that I would very much prefer not to use startAtOperationTime
which is based on a timestamp since time is fragile and clocks may be changed both on the server and client.
答案1
得分: 1
在一个兼容4.2版本的驱动程序中,实施规范中的“必须公开恢复令牌”的规定,每当更改流执行getMore操作时,会发生以下两种情况之一:
- 要么至少返回一个文档,每个文档都包含一个在该文档中的恢复令牌,或者
- 不返回任何文档,此时4.0.7+服务器仍然提供postBatchResumeToken。
据我回忆,Java中的更改流具有tryNext方法,您需要调用该方法以检索postBatchResumeToken,而不会阻止应用程序。检索当前恢复令牌(与文档关联的一个或postBatchResumeToken)的机制是特定于驱动程序的。
https://mongodb.github.io/mongo-java-driver/4.0/apidocs/mongodb-driver-sync/com/mongodb/client/MongoChangeStreamCursor.html 是我能找到的最接近的文档,不过我认为您应该使用tryNext而不是next,如果tryNext不返回任何文档,仍然需要读取当前的恢复令牌来推进更改流中的位置。
https://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming-a-change-stream 可能对恢复令牌跟踪有所帮助,尽管这不包括try_next(Ruby驱动程序也实现了该方法),但可能需要。
这将允许您在接收任何文档之前正确恢复更改流。您将在处理文档之后存储恢复令牌,因此您需要足够快地取得进展,以避免从操作日志中掉下来,但postBatchResumeToken处理了长时间没有更改的情况。
如果您没有任何恢复令牌,仍然需要在一开始的时间戳处启动更改流 - https://mongodb.github.io/mongo-java-driver/4.0/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html 提供了startAtOperationTime作为我认为您会使用的方法。如果您的驱动程序公开了当前由驱动程序跟踪的集群时间,则可以提供该时间。
英文:
In a 4.2-compatible driver implementing the "must expose resume token" provision of the specification, each time the change stream executes a getMore, one of two things happens:
- Either at least one document is returned, with each document containing a resume token at that document, or
- No documents are returned, in which case postBatchResumeToken is still provided by 4.0.7+ servers.
As I recall in Java change streams have a tryNext method, you need to call that to retrieve postBatchResumeToken without blocking the application. The mechanism for retrieving the current resume token (either one associated with a document or postBatchResumeToken) is driver-specific.
https://mongodb.github.io/mongo-java-driver/4.0/apidocs/mongodb-driver-sync/com/mongodb/client/MongoChangeStreamCursor.html is the closest documentation I can find, except I believe you would use tryNext instead of next, and if tryNext doesn't return any documents you would still read the current resume token to advance your position in the change stream.
https://docs.mongodb.com/ruby-driver/master/tutorials/ruby-driver-change-streams/#resuming-a-change-stream may be helpful as far as resume token tracking in general although this doesn't include try_next (which Ruby driver also implements) as would be needed.
This would allow you to correctly resume change stream before it received any documents. You would store the resume token after processing documents, so you need to make progress quickly enough that you don't fall off the oplog, but postBatchResumeToken handles the case of there not being any changes for a long time without falling off the oplog.
You still need to start the change stream at a timestamp in the very beginning, if you do not have any resume tokens - https://mongodb.github.io/mongo-java-driver/4.0/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html gives startAtOperationTime as the method I'd expect you would use. You could potentially provide the current clusterTime as tracked by the driver, if your driver exposes that.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论