英文:
How to Reactively Emit Events with Dynamic Delay using Mutiny in Java?
问题
如何使用 Mutiny 定期轮询数据资源以获取新事件,并延迟每个事件的发射时间一段动态时间,使事件的日期属性恰好为一秒前?
数据资源中的每个项目都包含一个代表录入时间的日期属性,目标是在录入时间后一秒发射每个事件。我考虑使用 Uni<List
private LocalDateTime timeOfLastQuery;
public Multi<Event> getNewEvents() {
final var eventStream = Multi.createFrom().iterable(getNewEvents(timeOfLastQuery))
//todo delay item by millisToOneSecondOld
.onItem().transform(mapper::toEvent);
timeOfLastQuery = LocalDateTime.now();
return eventStream;
}
private Long millisToOneSecondOld(LocalDateTime timeOfEntry) {
final var aSecondAgo = LocalDateTime.now().minusSeconds(1);
final var duration = Duration.between(aSecondAgo, timeOfEntry);
return coerceAtLeast(duration.toMillis(),0);
}
public static long coerceAtLeast(long x, long minimum) {
return Math.max(x, minimum);
}
英文:
How can I use Mutiny to periodically poll for new events from a data resource, while delaying the emission of each event by a dynamic amount of time such that the event's date property is exactly one second ago?
Each item in the data resource contains a date property that represents the time of entry, and the goal is to emit each event one second after its entry time. I am considering using a Uni<List<Events>> (database queries are "oneshot" operations), but I would prefer Multi<Event> to combine streams in the parent class.
private LocalDateTime timeOfLastQuery;
public Multi<Event> getNewEvents() {
final var eventStream = Multi.createFrom().iterable(getNewEvents(timeOfLastQuery))
//todo delay item by millisToOneSecondOld
.onItem().transform(mapper::toEvent);
timeOfLastQuery = LocalDateTime.now();
return eventStream;
}
private Long millisToOneSecondOld(LocalDateTime timeOfEntry) {
final var aSecondAgo = LocalDateTime.now().minusSeconds(1);
final var duration = Duration.between(aSecondAgo, timeOfEntry);
return coerceAtLeast(duration.toMillis(),0);
}
public static long coerceAtLeast(long x, long minimum) {
return Math.max(x, minimum);
}
答案1
得分: 1
你可以按以下方式实现延迟:
.call(ignored -> Uni.createFrom().nullItem().onItem().delayIt().by(getDuration()))
当你获得该项时,它会订阅我创建的 Uni,并在传播原始项之前“等待”该项被发出。诀窍是延迟发出“null”项。
英文:
You can implement your delay as follows:
.call(ignored -> Uni.createFrom().nullItem().onItem().delayIt().by(getDuration()))
When you get the item, it subscribes to the Uni I created and waits for the item to be emitted before propagating the original item downstream. The trick is to delay the emission of the null
item.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论