MongoDB集群与SpringBoot WebFlux重复记录

huangapple go评论116阅读模式

MongoDB cluster with SpringBoot WebFlux duplicates records


我使用MongoDB集群,包括3个副本(1个主要副本和2个从属副本)。Mongo版本为5.0.4。同时,我使用SpringBoot WebFlux应用程序,并使用响应式MongoDB驱动程序。

在使用标准的Spring Data Repository保存文档时,我经常在数据库中看到重复数据。例如,我有设备和与之相关的命令。每个命令都有一个状态列表。


  1. spring:
  2. data:
  3. mongodb:
  4. uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&readPreference=secondary&replicaSet=rs0&minPoolSize=20


  1. @Data
  2. @Builder
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Document(collection = "command")
  6. public class CommandDocument {
  7. @Id
  8. private String id;
  9. private String command;
  10. @DocumentReference
  11. private DeviceInfoDocument deviceInfo;
  12. }


  1. @Data
  2. @Builder
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Document(collection = "commandStatus")
  6. public class CommandStatusDocument {
  7. @Id
  8. private String id;
  9. private String commandId;
  10. private Instant timestamp;
  11. private CommandStatus commandStatus;
  12. }


  1. @Override
  2. public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
  3. return deviceInfoService.findByDeviceId(request.getDeviceId())
  4. .flatMap(commandService::findByDevice)
  5. .doOnNext(command ->
  6. commandStatusService.createCommandStatusDocument(
  7. command,
  8. CommandStatus.SENT_TO_DEVICE))
  9. .subscribe()
  10. )
  11. .flatMap(command -> command.getStatuses()
  12. .map(statuses -> CommandGetResponse.from(command, statuses))
  13. );
  14. }


  1. @Override
  2. public Mono<CommandStatusDocument> createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
  3. var commandStatus = CommandStatusDocument.builder()
  4. .commandStatus(status)
  5. .commandId(commandDocument.getId())
  6. .timestamp(
  7. .build();
  8. return;
  9. }


  1. public interface CommandStatusRepository extends ReactiveMongoRepository<CommandStatusDocument, String> {}


  1. db.commandStatus.find({commandId: "64d2b794216a5a4984bbf0ae"})
  2. [
  3. {
  4. _id: ObjectId("64d2b794216a5a4984bbf0af"),
  5. commandId: '64d2b794216a5a4984bbf0ae',
  6. timestamp: ISODate("2023-08-08T21:45:56.283Z"),
  7. commandStatus: 'RECEIVED',
  8. _class: 'com.example.commandstatus.CommandStatusDocument'
  9. },
  10. {
  11. _id: ObjectId("64d2b7b85096f472ff71be83"),
  12. commandId: '64d2b794216a5a4984bbf0ae',
  13. timestamp: ISODate("2023-08-08T21:46:32.895Z"),
  14. commandStatus: 'SENT_TO_DEVICE',
  15. _class: 'com.example.commandstatus.CommandStatusDocument'
  16. },
  17. {
  18. _id: ObjectId("64d2b7b85096f472ff71be84"),
  19. commandId: '64d2b794216a5a4984bbf0ae',
  20. timestamp: ISODate("2023-08-08T21:46:32.899Z"),
  21. commandStatus: 'SENT_TO_DEVICE',
  22. _class: 'com.example.commandstatus.CommandStatusDocument'
  23. }
  24. ]


  1. .doOnNext(command ->
  2. commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE)
  3. .switchIfEmpty(commandStatusService.createCommandStatusDocument(
  4. command,
  5. CommandStatus.SENT_TO_DEVICE))
  6. .subscribe()
  7. )



I use mongodb cluster with 3 replicas (1 primary and 2 slaves). Mongo version 5.0.4.
Also SpringBoot WebFlux application with reactive mongodb driver.
I often see duplications in database when I save documents using standard Spring Data Repository.
For example: I have devices and commands for them. Each command has list of statuses.

Connection options:

  1. spring:
  2. data:
  3. mongodb:
  4. uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&amp;readPreference=secondary&amp;replicaSet=rs0&amp;minPoolSize=20


  1. @Data
  2. @Builder
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Document(collection = &quot;command&quot;)
  6. public class CommandDocument {
  7. @Id
  8. private String id;
  9. private String command;
  10. @DocumentReference
  11. private DeviceInfoDocument deviceInfo;
  12. }


  1. @Data
  2. @Builder
  3. @AllArgsConstructor
  4. @NoArgsConstructor
  5. @Document(collection = &quot;commandStatus&quot;)
  6. public class CommandStatusDocument {
  7. @Id
  8. private String id;
  9. private String commandId;
  10. private Instant timestamp;
  11. private CommandStatus commandStatus;


When I create command, I set status RECEIVED.
When I send command to device I need to add status SENT_TO_DEVICE.
The main chain of getting commands with statuses by device (very simplified):

  1. @Override
  2. public Flux&lt;CommandGetResponse&gt; getCommands(CommandGetRequest request) {
  3. return deviceInfoService.findByDeviceId(request.getDeviceId())
  4. .flatMap(commandService::findByDevice)
  5. .doOnNext(command -&gt;
  6. commandStatusService.createCommandStatusDocument(
  7. command,
  8. CommandStatus.SENT_TO_DEVICE))
  9. .subscribe()
  10. )
  11. .flatMap(command -&gt; command.getStatuses()
  12. .map(statuses -&gt; CommandGetResponse.from(command, statuses))
  13. );
  14. }

Create a new commandStatusDocument:

  1. @Override
  2. public Mono&lt;CommandStatusDocument&gt; createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
  3. var commandStatus = CommandStatusDocument.builder()
  4. .commandStatus(status)
  5. .commandId(commandDocument.getId())
  6. .timestamp(
  7. .build();
  8. return;
  9. }


  1. public interface CommandStatusRepository extends ReactiveMongoRepository&lt;CommandStatusDocument, String&gt; {}

When device gets list of commands I often see two statuses SENT_TO_DEVICE (with different IDs) for one command with small time difference. Like this:

  1. db.commandStatus.find({commandId: &quot;64d2b794216a5a4984bbf0ae&quot;})
  2. [
  3. {
  4. _id: ObjectId(&quot;64d2b794216a5a4984bbf0af&quot;),
  5. commandId: &#39;64d2b794216a5a4984bbf0ae&#39;,
  6. timestamp: ISODate(&quot;2023-08-08T21:45:56.283Z&quot;),
  7. commandStatus: &#39;RECEIVED&#39;,
  8. _class: &#39;com.example.commandstatus.CommandStatusDocument&#39;
  9. },
  10. {
  11. _id: ObjectId(&quot;64d2b7b85096f472ff71be83&quot;),
  12. commandId: &#39;64d2b794216a5a4984bbf0ae&#39;,
  13. timestamp: ISODate(&quot;2023-08-08T21:46:32.895Z&quot;),
  14. commandStatus: &#39;SENT_TO_DEVICE&#39;,
  15. _class: &#39;com.example.commandstatus.CommandStatusDocument&#39;
  16. },
  17. {
  18. _id: ObjectId(&quot;64d2b7b85096f472ff71be84&quot;),
  19. commandId: &#39;64d2b794216a5a4984bbf0ae&#39;,
  20. timestamp: ISODate(&quot;2023-08-08T21:46:32.899Z&quot;),
  21. commandStatus: &#39;SENT_TO_DEVICE&#39;,
  22. _class: &#39;com.example.commandstatus.CommandStatusDocument&#39;
  23. }
  24. ]

I tried to add check on existing status before save like this:

  1. .doOnNext(command -&gt;
  2. commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE) .switchIfEmpty(commandStatusService.createCommandStatusDocument(
  3. command,
  4. CommandStatus.SENT_TO_DEVICE))
  5. .subscribe()
  6. )

But it doesn't take any effect. What is possible reason of this behaviour? There are many places when it happens, not only for commands and statuses.
Is it related to sort of replication things or my code?


得分: 1

public Flux getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(command ->
.map(doc -> Tuples.of(doc, command))
.flatMap(tuple -> {
CommandDocument command = tuple.getT2();
return command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));

Or you can probably move everything inside a single flatMap like:

public Flux getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(command -> {
Mono status = commandStatusService.createCommandStatusDocument(
Flux resp = command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
return status.thenMany(resp);

  1. public Flux&lt;CommandGetResponse&gt; getCommands(CommandGetRequest request) {
  2. return deviceInfoService.findByDeviceId(request.getDeviceId())
  3. .flatMap(commandService::findByDevice)
  4. .flatMap(command -&gt;
  5. commandStatusService.createCommandStatusDocument(
  6. command,
  7. CommandStatus.SENT_TO_DEVICE))
  8. .map(doc -&gt; Tuples.of(doc, command))
  9. )
  10. .flatMap(tuple -&gt; {
  11. CommandDocument command = tuple.getT2();
  12. command.getStatuses()
  13. .map(statuses -&gt; CommandGetResponse.from(command, statuses))
  14. });
  15. }

Or you can probably move everything inside a single flatMap like:

  1. public Flux&lt;CommandGetResponse&gt; getCommands(CommandGetRequest request) {
  2. return deviceInfoService.findByDeviceId(request.getDeviceId())
  3. .flatMap(commandService::findByDevice)
  4. .flatMap(command -&gt; {
  5. Mono&lt;CommandStatusDocument&gt; status = commandStatusService.createCommandStatusDocument(
  6. command,
  7. CommandStatus.SENT_TO_DEVICE));
  8. Flux&lt;CommandGetResponse&gt; resp = command.getStatuses()
  9. .map(statuses -&gt; CommandGetResponse.from(command, statuses));
  10. return status.thenMany(resp);
  11. });
  12. }

  • 本文由 发表于 2023年8月9日 16:55:48
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
