英文:
MongoDB cluster with SpringBoot WebFlux duplicates records
问题
我使用带有3个副本(1个主副本和2个从副本)的MongoDB集群。Mongo版本为5.0.4。
同时使用了SpringBoot WebFlux应用程序和响应式MongoDB驱动程序。
当我使用标准的Spring Data Repository保存文档时,经常会在数据库中看到重复的数据。
例如:我有设备和它们的命令。每个命令都有一个状态列表。
连接选项:
spring:
data:
mongodb:
uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&readPreference=secondary&replicaSet=rs0&minPoolSize=20
CommandDocument:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "command")
public class CommandDocument {
@Id
private String id;
private String command;
@DocumentReference
private DeviceInfoDocument deviceInfo;
}
CommandStatusDocument:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "commandStatus")
public class CommandStatusDocument {
@Id
private String id;
private String commandId;
private Instant timestamp;
private CommandStatus commandStatus;
}
当我创建命令时,我将状态设置为RECEIVED。
当我将命令发送到设备时,我需要添加状态SENT_TO_DEVICE。
通过设备获取带有状态的命令的主要链路(非常简化):
@Override
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.doOnNext(command ->
commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.subscribe()
)
.flatMap(command -> command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses))
);
}
创建新的commandStatusDocument:
@Override
public Mono<CommandStatusDocument> createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
var commandStatus = CommandStatusDocument.builder()
.commandStatus(status)
.commandId(commandDocument.getId())
.timestamp(Instant.now())
.build();
return commandStatusRepository.save(commandStatus);
}
Repository:
public interface CommandStatusRepository extends ReactiveMongoRepository<CommandStatusDocument, String> {}
当设备获取命令列表时,我经常会看到一个命令有两个不同ID的SENT_TO_DEVICE状态,时间差很小。像这样:
db.commandStatus.find({commandId: "64d2b794216a5a4984bbf0ae"})
[
{
_id: ObjectId("64d2b794216a5a4984bbf0af"),
commandId: '64d2b794216a5a4984bbf0ae',
timestamp: ISODate("2023-08-08T21:45:56.283Z"),
commandStatus: 'RECEIVED',
_class: 'com.example.commandstatus.CommandStatusDocument'
},
{
_id: ObjectId("64d2b7b85096f472ff71be83"),
commandId: '64d2b794216a5a4984bbf0ae',
timestamp: ISODate("2023-08-08T21:46:32.895Z"),
commandStatus: 'SENT_TO_DEVICE',
_class: 'com.example.commandstatus.CommandStatusDocument'
},
{
_id: ObjectId("64d2b7b85096f472ff71be84"),
commandId: '64d2b794216a5a4984bbf0ae',
timestamp: ISODate("2023-08-08T21:46:32.899Z"),
commandStatus: 'SENT_TO_DEVICE',
_class: 'com.example.commandstatus.CommandStatusDocument'
}
]
我尝试在保存之前添加对现有状态的检查,像这样:
.doOnNext(command ->
commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE) .switchIfEmpty(commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.subscribe()
)
但是没有任何效果。这种行为的可能原因是什么?不仅仅是命令和状态,还有许多其他地方也会出现这种情况。这与复制相关还是与我的代码相关?
英文:
I use mongodb cluster with 3 replicas (1 primary and 2 slaves). Mongo version 5.0.4.
Also SpringBoot WebFlux application with reactive mongodb driver.
I often see duplications in database when I save documents using standard Spring Data Repository.
For example: I have devices and commands for them. Each command has list of statuses.
Connection options:
spring:
data:
mongodb:
uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@${MONGO_CLUSTER}/${DATABASE_NAME}?authSource=admin&readPreference=secondary&replicaSet=rs0&minPoolSize=20
CommandDocument:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "command")
public class CommandDocument {
@Id
private String id;
private String command;
@DocumentReference
private DeviceInfoDocument deviceInfo;
}
CommandStatusDocument:
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Document(collection = "commandStatus")
public class CommandStatusDocument {
@Id
private String id;
private String commandId;
private Instant timestamp;
private CommandStatus commandStatus;
}
When I create command, I set status RECEIVED.
When I send command to device I need to add status SENT_TO_DEVICE.
The main chain of getting commands with statuses by device (very simplified):
@Override
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.doOnNext(command ->
commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.subscribe()
)
.flatMap(command -> command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses))
);
}
Create a new commandStatusDocument:
@Override
public Mono<CommandStatusDocument> createCommandStatusDocument(CommandDocument commandDocument, CommandStatus status) {
var commandStatus = CommandStatusDocument.builder()
.commandStatus(status)
.commandId(commandDocument.getId())
.timestamp(Instant.now())
.build();
return commandStatusRepository.save(commandStatus);
}
Repository:
public interface CommandStatusRepository extends ReactiveMongoRepository<CommandStatusDocument, String> {}
When device gets list of commands I often see two statuses SENT_TO_DEVICE (with different IDs) for one command with small time difference. Like this:
db.commandStatus.find({commandId: "64d2b794216a5a4984bbf0ae"})
[
{
_id: ObjectId("64d2b794216a5a4984bbf0af"),
commandId: '64d2b794216a5a4984bbf0ae',
timestamp: ISODate("2023-08-08T21:45:56.283Z"),
commandStatus: 'RECEIVED',
_class: 'com.example.commandstatus.CommandStatusDocument'
},
{
_id: ObjectId("64d2b7b85096f472ff71be83"),
commandId: '64d2b794216a5a4984bbf0ae',
timestamp: ISODate("2023-08-08T21:46:32.895Z"),
commandStatus: 'SENT_TO_DEVICE',
_class: 'com.example.commandstatus.CommandStatusDocument'
},
{
_id: ObjectId("64d2b7b85096f472ff71be84"),
commandId: '64d2b794216a5a4984bbf0ae',
timestamp: ISODate("2023-08-08T21:46:32.899Z"),
commandStatus: 'SENT_TO_DEVICE',
_class: 'com.example.commandstatus.CommandStatusDocument'
}
]
I tried to add check on existing status before save like this:
.doOnNext(command ->
commandStatusService.findByStatus(command, CommandStatus.SENT_TO_DEVICE) .switchIfEmpty(commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.subscribe()
)
But it doesn't take any effect. What is possible reason of this behaviour? There are many places when it happens, not only for commands and statuses.
Is it related to sort of replication things or my code?
答案1
得分: 1
public Flux
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command ->
commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.map(doc -> Tuples.of(doc, command))
)
.flatMap(tuple -> {
CommandDocument command = tuple.getT2();
return command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
});
}
或者你可以将所有内容都放在一个单独的flatMap
中,像这样:
public Flux
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command -> {
Mono
command,
CommandStatus.SENT_TO_DEVICE));
Flux
.map(statuses -> CommandGetResponse.from(command, statuses));
return status.thenMany(resp);
});
}
英文:
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command ->
commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE))
.map(doc -> Tuples.of(doc, command))
)
.flatMap(tuple -> {
CommandDocument command = tuple.getT2();
command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses))
});
}
Or you can probably move everything inside a single flatMap
like:
public Flux<CommandGetResponse> getCommands(CommandGetRequest request) {
return deviceInfoService.findByDeviceId(request.getDeviceId())
.flatMap(commandService::findByDevice)
.flatMap(command -> {
Mono<CommandStatusDocument> status = commandStatusService.createCommandStatusDocument(
command,
CommandStatus.SENT_TO_DEVICE));
Flux<CommandGetResponse> resp = command.getStatuses()
.map(statuses -> CommandGetResponse.from(command, statuses));
return status.thenMany(resp);
});
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论