使用Redis Stream在Spring Boot应用中通过HTTP长轮询阻塞HTTP响应

huangapple go评论80阅读模式
英文:

Using Redis Stream to Block HTTP response via HTTP long polling in Spring Boot App

问题

以下是已翻译的内容:

我有一个使用Spring Boot开发的Web应用程序,具有更新实体StudioLinking的功能。这个实体描述了两个物联网(IoT)设备之间的临时、可变的、描述性的逻辑链接,我的Web应用程序是它们的云服务。这些设备之间的链接是短暂的,但是StudioLinking实体在数据库中保留以用于报告目的。使用Spring Data/ Hibernate,StudioLinking以传统方式存储到基于SQL的数据存储中。不时地,这个StudioLinking实体将使用Rest API中的新信息进行更新。当更新链接时,设备需要做出响应(改变颜色、音量等)。目前,这是通过每隔5秒轮询来处理的,但是这会导致人类用户输入系统更新和物联网设备实际更新之间存在延迟。这个延迟可以是毫秒,也可以是长达5秒!显然,增加轮询频率是不可持续的,而且绝大多数时间都没有更新!

因此,我正在尝试在同一应用程序中开发另一个使用HTTP长轮询的Rest API,该API将在给定的StudioLinking实体被更新或超时后返回。监听器不支持WebSocket或类似的内容,这让我只能使用长轮询。长轮询可能会产生一种竞争条件,其中您必须考虑这样一种可能性,即在连续的消息中,一个消息可能会在HTTP请求之间“丢失”(在连接关闭和打开时,可能会出现新的“更新”,如果我使用了发布/订阅,可能不会被“注意到”)。

重要的是要注意,这个“订阅更新”的API只应该在实际更新或自上次签入以来有更新发生时返回StudioLinking的最新和当前版本。 “订阅更新”的客户端最初将POST一个API请求来设置新的监听会话,并将其传递给服务器,以便服务器知道他们是谁。因为有可能多个设备都需要监视对同一StudioLinking实体的更新。我相信我可以通过在redis XREAD中使用具有不同名称的消费者来实现这一点。 (将此记住,稍后在问题中)

经过几个小时的研究,我相信实现这一点的方法是使用redis streams。

我找到了关于Spring Data Redis中Redis Streams的这两个链接:

我还阅读了这篇关于长轮询的文章,这两个链接都只在长轮询期间使用了一个睡眠计时器,这只是为了演示目的,但显然我想做一些有用的事情。

这两个链接都非常有帮助。目前,我没有问题可以弄清楚如何将更新发布到Redis Stream中-(这是未经测试的“伪代码”,但我不预计在实现这一点时会遇到任何问题)

// 在我的StudioLinking实体中

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord<String, StudioLinking> record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //我为每个单独的链接创建了一个流,可能吗?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}

但是当涉及订阅该流时,我遇到了困难:所以基本上我想在这里做的事情-请原谅我搞砸的伪代码,这只是为了表达思想。我很清楚,代码绝不是语言和框架实际行为的指示:)

// 参数studioLinkingID是请求者想要监视的StudioLinking
// updateList是用于在Redis中跟踪个别消费者的唯一标记
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public DeferredResult<ResponseEntity<?>> subscribeToUpdates(@PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
    LOG.info("Received async-deferredresult request");
    DeferredResult<ResponseEntity<?>> output = new DeferredResult<>(5000l);

    deferredResult.onTimeout(() -> 
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body("IT WAS NOT UPDATED!")));
    
    ForkJoinPool.commonPool().submit(() -> {
        //----------------------------------------------
        // 虚构的内容...在这里,我想要订阅流并阻塞!
        //----------------------------------------------
        LOG.info("Processing in separate thread");
        try {
            // 订阅Redis Stream,获取在长轮询之间发生的任何更新
            // 然后阻塞,直到/如果有新消息通过流传递
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID, updateList),
                StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
                streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult("IT WAS UPDATED!");
    });
    
    LOG.info("servlet thread freed");
    return output;
}

所以,我如何解决这个问题呢?我认为答案在于https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html,但我还不够了解Spring,无法真正理解Java Docs中的术语(Spring文档确实很好,但JavaDocs是用密集

英文:

I have a spring boot web application with the functionality to update an entity called StudioLinking. This entity describes a temporary, mutable, descriptive logical link between two IoT devices for which my web app is their cloud service. The Links between these devices are ephemeral in nature, but the StudioLinking Entity persists on the database for reporting purposes. StudioLinking is stored to the SQL based datastore in the conventional way using Spring Data/ Hibernate. From time to time this StudioLinking entity will be updated with new information from a Rest API. When that link is updated the devices need to respond (change colors, volume, etc). Right now this is handled with polling every 5 seconds but this creates lag from when a human user enters an update into the system and when the IoT devices actually update. It could be as little as a millisecond or up to 5 seconds! Clearly increasing the frequency of the polling is unsustainable and the vast majority of the time there are no updates at all!

So, I am trying to develop another Rest API on this same application with HTTP Long Polling which will return when a given StudioLinking entity is updated or after a timeout. The listeners do not support WebSocket or similar leaving me with Long Polling. Long polling can leave a race condition where you have to account for the possibility that with consecutive messages one message may be "lost" as it comes in between HTTP requests (while the connection is closing and opening, a new "update" might come in and not be "noticed" if I used a Pub/Sub).

It is important to note that this "subscribe to updates" API should only ever return the LATEST and CURRENT version of the StudioLinking, but should only do so when there is an actual update or if an update happened since the last checkin. The "subscribe to updates" client will initially POST an API request to setup a new listening session and pass that along so the server knows who they are. Because it is possible that multiple devices will need to monitor updates to the same StudioLinking entity. I believe I can acomplish this by using separately named consumers in the redis XREAD. (keep this in mind for later in the question)

After hours of research I believe the way to acomplish this is using redis streams.

I have found these two links regarding Redis Streams in Spring Data Redis:

https://www.vinsguru.com/redis-reactive-stream-real-time-producing-consuming-streams-with-spring-boot/
https://medium.com/@amitptl.in/redis-stream-in-action-using-java-and-spring-data-redis-a73257f9a281

I also have read this link about long polling, both of these links just have a sleep timer during the long polling which is for demonstration purposes but obviously I want to do something useful.

https://www.baeldung.com/spring-deferred-result

And both these links were very helpful. Right now I have no problem figuring out how to publish the updates to the Redis Stream - (this is untested "pseudo-code" but I don't anticipate having any issues implementing this)

// In my StudioLinking Entity

@PostUpdate
public void postToRedis() {
    StudioLinking link = this;
    ObjectRecord&lt;String, StudioLinking&gt; record = StreamRecords.newRecord()
            .ofObject(link)
            .withStreamKey(streamKey); //I am creating a stream for each individual linking probably?
    this.redisTemplate
            .opsForStream()
            .add(record)
            .subscribe(System.out::println);
    atomicInteger.incrementAndGet();
}

But I fall flat when it comes to subscribing to said stream: So basically what I want to do here - please excuse the butchered pseudocode, it is for idea purposes only. I am well aware that the code is in no way indicative of how the language and framework actually behaves 使用Redis Stream在Spring Boot应用中通过HTTP长轮询阻塞HTTP响应

// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
// updateList is a unique token to track individual consumers in Redis
@GetMapping(&quot;/subscribe-to-updates/{linkId}/{updatesId}&quot;)
public DeferredResult&lt;ResponseEntity&lt;?&gt;&gt; subscribeToUpdates(@PathVariable(&quot;linkId&quot;) Integer linkId, @PathVariable(&quot;updatesId&quot;) Integer updatesId) {
    LOG.info(&quot;Received async-deferredresult request&quot;);
    DeferredResult&lt;ResponseEntity&lt;?&gt;&gt; output = new DeferredResult&lt;&gt;(5000l);

    deferredResult.onTimeout(() -&gt; 
      deferredResult.setErrorResult(
        ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
          .body(&quot;IT WAS NOT UPDATED!&quot;)));
    
    ForkJoinPool.commonPool().submit(() -&gt; {
        //----------------------------------------------
        // Made up stuff... here is where I want to subscribe to a stream and block!
        //----------------------------------------------
        LOG.info(&quot;Processing in separate thread&quot;);
        try {
            // Subscribe to Redis Stream, get any updates that happened between long-polls
            // then block until/if a new message comes over the stream
            var subscription = listenerContainer.receiveAutoAck(
                Consumer.from(studioLinkingID, updateList),
                StreamOffset.create(studioLinkingID, ReadOffset.lastConsumed()),
                streamListener);
            listenerContainer.start();
        } catch (InterruptedException e) {
        }
        output.setResult(&quot;IT WAS UPDATED!&quot;);
    });
    
    LOG.info(&quot;servlet thread freed&quot;);
    return output;
}

So is there a good explanation to how I would go about this? I think the answer lies within https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveRedisTemplate.html but I am not a big enough Spring power user to really understand the terminology within Java Docs (the Spring documentation is really good, but the JavaDocs is written in the dense technical language which I appreciate but don't quite understand yet).

There are two more hurdles to my implementation:

  1. My exact understanding of spring is not at 100% yet. I haven't yet reached that a-ha moment where I really fully understand why all these beans are floating around. I think this is the key to why I am not getting things here... The configuration for the Redis is floating around in the Spring ether and I am not grasping how to just call it. I really need to keep investigating this (it is a huge hurdle to spring for me).
  2. These StudioLinking are short lived, so I need to do some cleanup too. I will implement this later once I get the whole thing up off the ground, I do know it will be needed.

答案1

得分: 1

为什么不使用阻塞轮询机制没有必要使用spring-data-redis的复杂功能只需使用简单的阻塞读取等待5秒因此此调用可能需要大约6秒左右您可以增加或减少阻塞超时时间

class LinkStatus {
    private final boolean updated;

    LinkStatus(boolean updated) {
        this.updated = updated;
    }
}

// 参数studioLinkingID指的是请求者想要监视的StudioLinking
// updateList是用于在Redis中跟踪单个消费者的唯一标记
@GetMapping("/subscribe-to-updates/{linkId}/{updatesId}")
public LinkStatus subscribeToUpdates(
    @PathVariable("linkId") Integer linkId, @PathVariable("updatesId") Integer updatesId) {
    StreamOperations<String, String, String> op = redisTemplate.opsForStream();
    
    Consumer consumer = Consumer.from("test-group", "test-consumer");
    // 使用自动确认的阻塞流读取,读取大小为1,超时时间为5秒
    StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
    List<MapRecord<String, String, String>> records =
        op.read(consumer, readOptions, StreamOffset.latest("test-stream"));
    return new LinkStatus(!CollectionUtils.isEmpty(records));
}
英文:

Why don't you use a blocking polling mechanism? No need to use fancy stuff of spring-data-redis. Just use simple blocking read of 5 seconds, so this call might take around 6 seconds or so. You can decrease or increase the blocking timeout.

class LinkStatus {
    private final boolean updated;

    LinkStatus(boolean updated) {
      this.updated = updated;
    }
  }



// Parameter studioLinkingID refers to the StudioLinking that the requester wants to monitor
  // updateList is a unique token to track individual consumers in Redis
  @GetMapping(&quot;/subscribe-to-updates/{linkId}/{updatesId}&quot;)
  public LinkStatus subscribeToUpdates(
      @PathVariable(&quot;linkId&quot;) Integer linkId, @PathVariable(&quot;updatesId&quot;) Integer updatesId) {
    StreamOperations&lt;String, String, String&gt; op = redisTemplate.opsForStream();
    
    Consumer consumer = Consumer.from(&quot;test-group&quot;, &quot;test-consumer&quot;);
    // auto ack block stream read with size 1 with timeout of 5 seconds
    StreamReadOptions readOptions = StreamReadOptions.empty().block(Duration.ofSeconds(5)).count(1);
    List&lt;MapRecord&lt;String, String, String&gt;&gt; records =
        op.read(consumer, readOptions, StreamOffset.latest(&quot;test-stream&quot;));
    return new LinkStatus(!CollectionUtils.isEmpty(records));
  }

huangapple
  • 本文由 发表于 2020年8月24日 13:37:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/63555265.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定