如何使用Java中的Mutiny以动态延迟响应性地触发事件?

huangapple go评论58阅读模式
英文:

How to Reactively Emit Events with Dynamic Delay using Mutiny in Java?

问题

如何使用 Mutiny 定期轮询数据资源以获取新事件,并延迟每个事件的发射时间一段动态时间,使事件的日期属性恰好为一秒前?

数据资源中的每个项目都包含一个代表录入时间的日期属性,目标是在录入时间后一秒发射每个事件。我考虑使用 Uni<List>(数据库查询是“oneshot”操作),但我更喜欢 Multi 来将流合并到父类中。

private LocalDateTime timeOfLastQuery;

public Multi<Event> getNewEvents() {
        final var eventStream = Multi.createFrom().iterable(getNewEvents(timeOfLastQuery))
                //todo delay item by millisToOneSecondOld
                .onItem().transform(mapper::toEvent);

        timeOfLastQuery = LocalDateTime.now();

        return eventStream;
    }

    private Long millisToOneSecondOld(LocalDateTime timeOfEntry) {
        final var aSecondAgo = LocalDateTime.now().minusSeconds(1);
        final var duration = Duration.between(aSecondAgo, timeOfEntry);
        return coerceAtLeast(duration.toMillis(),0);
    }

    public static long coerceAtLeast(long x, long minimum) {
        return Math.max(x, minimum);
    }
英文:

How can I use Mutiny to periodically poll for new events from a data resource, while delaying the emission of each event by a dynamic amount of time such that the event's date property is exactly one second ago?

Each item in the data resource contains a date property that represents the time of entry, and the goal is to emit each event one second after its entry time. I am considering using a Uni<List<Events>> (database queries are "oneshot" operations), but I would prefer Multi<Event> to combine streams in the parent class.

private LocalDateTime timeOfLastQuery;

public Multi&lt;Event&gt; getNewEvents() {
        final var eventStream = Multi.createFrom().iterable(getNewEvents(timeOfLastQuery))
                //todo delay item by millisToOneSecondOld
                .onItem().transform(mapper::toEvent);

        timeOfLastQuery = LocalDateTime.now();

        return eventStream;
    }

    private Long millisToOneSecondOld(LocalDateTime timeOfEntry) {
        final var aSecondAgo = LocalDateTime.now().minusSeconds(1);
        final var duration = Duration.between(aSecondAgo, timeOfEntry);
        return coerceAtLeast(duration.toMillis(),0);
    }

    public static long coerceAtLeast(long x, long minimum) {
        return Math.max(x, minimum);
    }

答案1

得分: 1

你可以按以下方式实现延迟:

.call(ignored -&gt; Uni.createFrom().nullItem().onItem().delayIt().by(getDuration()))

当你获得该项时,它会订阅我创建的 Uni,并在传播原始项之前“等待”该项被发出。诀窍是延迟发出“null”项。

英文:

You can implement your delay as follows:

.call(ignored -&gt; Uni.createFrom().nullItem().onItem().delayIt().by(getDuration()))

When you get the item, it subscribes to the Uni I created and waits for the item to be emitted before propagating the original item downstream. The trick is to delay the emission of the null item.

huangapple
  • 本文由 发表于 2023年4月13日 15:59:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/76003014.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定