MongoDB集群与SpringBoot WebFlux重复记录

huangapple go评论85阅读模式
英文:

MongoDB cluster with SpringBoot WebFlux duplicates records

问题

我使用MongoDB集群,包括3个副本(1个主要副本和2个从属副本)。Mongo版本为5.0.4。同时,我使用SpringBoot WebFlux应用程序,并使用响应式MongoDB驱动程序。

在使用标准的Spring Data Repository保存文档时,我经常在数据库中看到重复数据。例如,我有设备和与之相关的命令。每个命令都有一个状态列表。

连接选项:

spring:
  data:
    mongodb:
      uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&readPreference=secondary&replicaSet=rs0&minPoolSize=20

CommandDocument(命令文档):

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "command")
public class CommandDocument {

    @Id
    private String id;
    private String command;

    @DocumentReference
    private DeviceInfoDocument deviceInfo;
}

CommandStatusDocument(命令状态文档):

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "commandStatus")
public class CommandStatusDocument {

    @Id
    private String id;
    private String commandId;
    private Instant timestamp;
    private CommandStatus commandStatus;
}

当我创建命令时,我将状态设置为“RECEIVED”。当我将命令发送到设备时,我需要添加状态“SENT_TO_DEVICE”。通过以下方式获取设备的命令及其状态的主要链路(非常简化):

@Override
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
            .flatMap(commandService::findByDevice)
            .doOnNext(command ->
                    commandStatusService.createCommandStatusDocument(
                                  command,
                                  CommandStatus.SENT_TO_DEVICE))
                            .subscribe()
            )
            .flatMap(command -> command.getStatuses()
                    .map(statuses -> CommandGetResponse.from(command, statuses))
            );
}

创建新的CommandStatusDocument(命令状态文档):

@Override
public Mono<CommandStatusDocument> createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
    var commandStatus = CommandStatusDocument.builder()
            .commandStatus(status)
            .commandId(commandDocument.getId())
            .timestamp(Instant.now())
            .build();

    return commandStatusRepository.save(commandStatus);
}

Repository(仓库):

public interface CommandStatusRepository extends ReactiveMongoRepository<CommandStatusDocument, String> {}

当设备获取命令列表时,我经常看到一个命令具有不同ID的两个“SENT_TO_DEVICE”状态,时间差很小。就像这样:

db.commandStatus.find({commandId: "64d2b794216a5a4984bbf0ae"})
[
  {
    _id: ObjectId("64d2b794216a5a4984bbf0af"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:45:56.283Z"),
    commandStatus: 'RECEIVED',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  },
  {
    _id: ObjectId("64d2b7b85096f472ff71be83"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:46:32.895Z"),
    commandStatus: 'SENT_TO_DEVICE',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  },
  {
    _id: ObjectId("64d2b7b85096f472ff71be84"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:46:32.899Z"),
    commandStatus: 'SENT_TO_DEVICE',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  }
]

我尝试在保存之前添加对现有状态的检查,如下所示:

.doOnNext(command ->
    commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE)
        .switchIfEmpty(commandStatusService.createCommandStatusDocument(
                                    command,
                                    CommandStatus.SENT_TO_DEVICE))
    .subscribe()
)

但它没有产生任何效果。这种行为的可能原因是什么?发生这种情况的地方很多,不仅仅是命令和状态。这与复制机制或我的代码有关吗?

英文:

I use mongodb cluster with 3 replicas (1 primary and 2 slaves). Mongo version 5.0.4.
Also SpringBoot WebFlux application with reactive mongodb driver.
I often see duplications in database when I save documents using standard Spring Data Repository.
For example: I have devices and commands for them. Each command has list of statuses.

Connection options:

spring:
  data:
    mongodb:
      uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&amp;readPreference=secondary&amp;replicaSet=rs0&amp;minPoolSize=20

CommandDocument:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = &quot;command&quot;)
public class CommandDocument {

    @Id
    private String id;
    private String command;

    @DocumentReference
    private DeviceInfoDocument deviceInfo;
}

CommandStatusDocument:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = &quot;commandStatus&quot;)
public class CommandStatusDocument {

    @Id
    private String id;
    private String commandId;
    private Instant timestamp;
    private CommandStatus commandStatus;

}

When I create command, I set status RECEIVED.
When I send command to device I need to add status SENT_TO_DEVICE.
The main chain of getting commands with statuses by device (very simplified):

@Override
public Flux&lt;CommandGetResponse&gt; getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
            .flatMap(commandService::findByDevice)
            .doOnNext(command -&gt;
                    commandStatusService.createCommandStatusDocument(
                                  command,
                                  CommandStatus.SENT_TO_DEVICE))
                            .subscribe()
            )
            .flatMap(command -&gt; command.getStatuses()
                    .map(statuses -&gt; CommandGetResponse.from(command, statuses))
            );
}

Create a new commandStatusDocument:

@Override
public Mono&lt;CommandStatusDocument&gt; createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
    var commandStatus = CommandStatusDocument.builder()
            .commandStatus(status)
            .commandId(commandDocument.getId())
            .timestamp(Instant.now())
            .build();

    return commandStatusRepository.save(commandStatus);
}

Repository:

public interface CommandStatusRepository extends ReactiveMongoRepository&lt;CommandStatusDocument, String&gt; {}

When device gets list of commands I often see two statuses SENT_TO_DEVICE (with different IDs) for one command with small time difference. Like this:

db.commandStatus.find({commandId: &quot;64d2b794216a5a4984bbf0ae&quot;})
[
  {
    _id: ObjectId(&quot;64d2b794216a5a4984bbf0af&quot;),
    commandId: &#39;64d2b794216a5a4984bbf0ae&#39;,
    timestamp: ISODate(&quot;2023-08-08T21:45:56.283Z&quot;),
    commandStatus: &#39;RECEIVED&#39;,
    _class: &#39;com.example.commandstatus.CommandStatusDocument&#39;
  },
  {
    _id: ObjectId(&quot;64d2b7b85096f472ff71be83&quot;),
    commandId: &#39;64d2b794216a5a4984bbf0ae&#39;,
    timestamp: ISODate(&quot;2023-08-08T21:46:32.895Z&quot;),
    commandStatus: &#39;SENT_TO_DEVICE&#39;,
    _class: &#39;com.example.commandstatus.CommandStatusDocument&#39;
  },
  {
    _id: ObjectId(&quot;64d2b7b85096f472ff71be84&quot;),
    commandId: &#39;64d2b794216a5a4984bbf0ae&#39;,
    timestamp: ISODate(&quot;2023-08-08T21:46:32.899Z&quot;),
    commandStatus: &#39;SENT_TO_DEVICE&#39;,
    _class: &#39;com.example.commandstatus.CommandStatusDocument&#39;
  }
]

I tried to add check on existing status before save like this:

.doOnNext(command -&gt;
    commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE)                            .switchIfEmpty(commandStatusService.createCommandStatusDocument(
                                    command,
                                    CommandStatus.SENT_TO_DEVICE))
    .subscribe()
)

But it doesn't take any effect. What is possible reason of this behaviour? There are many places when it happens, not only for commands and statuses.
Is it related to sort of replication things or my code?

答案1

得分: 1

public Flux getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command ->
commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE)
.map(doc -> Tuples.of(doc, command))
)
.flatMap(tuple -> {
CommandDocument command = tuple.getT2();
return command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
});
}

Or you can probably move everything inside a single flatMap like:

public Flux getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command -> {
Mono status = commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE);
Flux resp = command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
return status.thenMany(resp);
});
}

英文:
public Flux&lt;CommandGetResponse&gt; getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
        .flatMap(commandService::findByDevice)
        .flatMap(command -&gt; 
                commandStatusService.createCommandStatusDocument(
                              command,
                              CommandStatus.SENT_TO_DEVICE))
                  .map(doc -&gt; Tuples.of(doc, command))
        )
        .flatMap(tuple -&gt; {
              CommandDocument command = tuple.getT2();                   
              command.getStatuses()
                .map(statuses -&gt; CommandGetResponse.from(command, statuses))
        });
}

Or you can probably move everything inside a single flatMap like:

public Flux&lt;CommandGetResponse&gt; getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
        .flatMap(commandService::findByDevice)
        .flatMap(command -&gt; {
                Mono&lt;CommandStatusDocument&gt; status = commandStatusService.createCommandStatusDocument(
                              command,
                              CommandStatus.SENT_TO_DEVICE));
                Flux&lt;CommandGetResponse&gt; resp = command.getStatuses()
                    .map(statuses -&gt; CommandGetResponse.from(command, statuses));
               return status.thenMany(resp);
        });
}

huangapple
  • 本文由 发表于 2023年8月9日 16:55:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76866097-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定