MongoDB集群与SpringBoot WebFlux重复记录问题

huangapple go评论101阅读模式
英文:

MongoDB cluster with SpringBoot WebFlux duplicates records

问题

我使用带有3个副本(1个主副本和2个从副本)的MongoDB集群。Mongo版本为5.0.4。
同时使用了SpringBoot WebFlux应用程序和响应式MongoDB驱动程序。
当我使用标准的Spring Data Repository保存文档时,经常会在数据库中看到重复的数据。
例如:我有设备和它们的命令。每个命令都有一个状态列表。

连接选项:

spring:
  data:
    mongodb:
      uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&readPreference=secondary&replicaSet=rs0&minPoolSize=20

CommandDocument:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "command")
public class CommandDocument {

    @Id
    private String id;
    private String command;

    @DocumentReference
    private DeviceInfoDocument deviceInfo;
}

CommandStatusDocument:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "commandStatus")
public class CommandStatusDocument {

    @Id
    private String id;
    private String commandId;
    private Instant timestamp;
    private CommandStatus commandStatus;

}

当我创建命令时,我将状态设置为RECEIVED。
当我将命令发送到设备时,我需要添加状态SENT_TO_DEVICE。
通过设备获取带有状态的命令的主要链路(非常简化):

@Override
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
            .flatMap(commandService::findByDevice)
            .doOnNext(command ->
                    commandStatusService.createCommandStatusDocument(
                                  command,
                                  CommandStatus.SENT_TO_DEVICE))
                            .subscribe()
            )
            .flatMap(command -> command.getStatuses()
                    .map(statuses -> CommandGetResponse.from(command, statuses))
            );
}

创建新的commandStatusDocument:

@Override
public Mono<CommandStatusDocument> createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
    var commandStatus = CommandStatusDocument.builder()
            .commandStatus(status)
            .commandId(commandDocument.getId())
            .timestamp(Instant.now())
            .build();

    return commandStatusRepository.save(commandStatus);
}

Repository:

public interface CommandStatusRepository extends ReactiveMongoRepository<CommandStatusDocument, String> {}

当设备获取命令列表时,我经常会看到一个命令有两个不同ID的SENT_TO_DEVICE状态,时间差很小。像这样:

db.commandStatus.find({commandId: "64d2b794216a5a4984bbf0ae"})
[
  {
    _id: ObjectId("64d2b794216a5a4984bbf0af"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:45:56.283Z"),
    commandStatus: 'RECEIVED',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  },
  {
    _id: ObjectId("64d2b7b85096f472ff71be83"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:46:32.895Z"),
    commandStatus: 'SENT_TO_DEVICE',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  },
  {
    _id: ObjectId("64d2b7b85096f472ff71be84"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:46:32.899Z"),
    commandStatus: 'SENT_TO_DEVICE',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  }
]

我尝试在保存之前添加对现有状态的检查,像这样:

.doOnNext(command ->
    commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE)                            .switchIfEmpty(commandStatusService.createCommandStatusDocument(
                                    command,
                                    CommandStatus.SENT_TO_DEVICE))
    .subscribe()
)

但是没有任何效果。这种行为的可能原因是什么?不仅仅是命令和状态,还有许多其他地方也会出现这种情况。这与复制相关还是与我的代码相关?

英文:

I use mongodb cluster with 3 replicas (1 primary and 2 slaves). Mongo version 5.0.4.
Also SpringBoot WebFlux application with reactive mongodb driver.
I often see duplications in database when I save documents using standard Spring Data Repository.
For example: I have devices and commands for them. Each command has list of statuses.

Connection options:

spring:
  data:
    mongodb:
      uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&readPreference=secondary&replicaSet=rs0&minPoolSize=20

CommandDocument:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "command")
public class CommandDocument {

    @Id
    private String id;
    private String command;

    @DocumentReference
    private DeviceInfoDocument deviceInfo;
}

CommandStatusDocument:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "commandStatus")
public class CommandStatusDocument {

    @Id
    private String id;
    private String commandId;
    private Instant timestamp;
    private CommandStatus commandStatus;

}

When I create command, I set status RECEIVED.
When I send command to device I need to add status SENT_TO_DEVICE.
The main chain of getting commands with statuses by device (very simplified):

@Override
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
            .flatMap(commandService::findByDevice)
            .doOnNext(command ->
                    commandStatusService.createCommandStatusDocument(
                                  command,
                                  CommandStatus.SENT_TO_DEVICE))
                            .subscribe()
            )
            .flatMap(command -> command.getStatuses()
                    .map(statuses -> CommandGetResponse.from(command, statuses))
            );
}

Create a new commandStatusDocument:

@Override
public Mono<CommandStatusDocument> createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
    var commandStatus = CommandStatusDocument.builder()
            .commandStatus(status)
            .commandId(commandDocument.getId())
            .timestamp(Instant.now())
            .build();

    return commandStatusRepository.save(commandStatus);
}

Repository:

public interface CommandStatusRepository extends ReactiveMongoRepository<CommandStatusDocument, String> {}

When device gets list of commands I often see two statuses SENT_TO_DEVICE (with different IDs) for one command with small time difference. Like this:

db.commandStatus.find({commandId: "64d2b794216a5a4984bbf0ae"})
[
  {
    _id: ObjectId("64d2b794216a5a4984bbf0af"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:45:56.283Z"),
    commandStatus: 'RECEIVED',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  },
  {
    _id: ObjectId("64d2b7b85096f472ff71be83"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:46:32.895Z"),
    commandStatus: 'SENT_TO_DEVICE',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  },
  {
    _id: ObjectId("64d2b7b85096f472ff71be84"),
    commandId: '64d2b794216a5a4984bbf0ae',
    timestamp: ISODate("2023-08-08T21:46:32.899Z"),
    commandStatus: 'SENT_TO_DEVICE',
    _class: 'com.example.commandstatus.CommandStatusDocument'
  }
]

I tried to add check on existing status before save like this:

.doOnNext(command ->
    commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE)                            .switchIfEmpty(commandStatusService.createCommandStatusDocument(
                                    command,
                                    CommandStatus.SENT_TO_DEVICE))
    .subscribe()
)

But it doesn't take any effect. What is possible reason of this behaviour? There are many places when it happens, not only for commands and statuses.
Is it related to sort of replication things or my code?

答案1

得分: 1

public Flux getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command ->
commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.map(doc -> Tuples.of(doc, command))
)
.flatMap(tuple -> {
CommandDocument command = tuple.getT2();
return command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
});
}

或者你可以将所有内容都放在一个单独的flatMap中,像这样:

public Flux getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command -> {
Mono status = commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE));
Flux resp = command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
return status.thenMany(resp);
});
}

英文:
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
        .flatMap(commandService::findByDevice)
        .flatMap(command -> 
                commandStatusService.createCommandStatusDocument(
                              command,
                              CommandStatus.SENT_TO_DEVICE))
                  .map(doc -> Tuples.of(doc, command))
        )
        .flatMap(tuple -> {
              CommandDocument command = tuple.getT2();                   
              command.getStatuses()
                .map(statuses -> CommandGetResponse.from(command, statuses))
        });
}

Or you can probably move everything inside a single flatMap like:

public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
    return deviceInfoService.findByDeviceId(request.getDeviceId())
        .flatMap(commandService::findByDevice)
        .flatMap(command -> {
                Mono<CommandStatusDocument> status = commandStatusService.createCommandStatusDocument(
                              command,
                              CommandStatus.SENT_TO_DEVICE));
                Flux<CommandGetResponse> resp = command.getStatuses()
                    .map(statuses -> CommandGetResponse.from(command, statuses));
               return status.thenMany(resp);
        });
}

huangapple
  • 本文由 发表于 2023年8月9日 16:55:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76866097.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定