将Akka流传递到上游服务以填充数据。

huangapple go评论96阅读模式
英文:

passing an Akka stream to an upstream service to populate

问题

我需要调用一个上游服务(Azure Blob Service),将数据推送到一个输出流(OutputStream),然后我需要通过akka将其推送回客户端。如果没有akka(只使用servlet代码),我会获取ServletOutputStream并将其传递给Azure服务的方法。

我能尝试的最接近的方法,显然是错误的,类似于以下内容:

Source<ByteString, OutputStream> source = StreamConverters.asOutputStream().mapMaterializedValue(os -> {
    blobClient.download(os);
    return os;
});

ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

这个想法是我正在调用一个上游服务,以获取通过调用 blobClient.download(os) 填充的输出流。

似乎 Lambda 函数被调用并返回,但之后会失败,因为没有数据或其他原因。就好像我不应该让 Lambda 函数来完成工作,而是可能返回一些执行工作的对象?不确定。

如何正确处理这种情况?

英文:

I need to call an upstream service (Azure Blob Service) to push data to an OutputStream, which then i need to turn around and push it back to the client, thru akka. Without akka (and just servlet code), i'd just get the ServletOutputStream and pass it to the azure service's method.

The closest i can try to stumble upon, and clearly this is wrong, is something like this

        Source&lt;ByteString, OutputStream&gt; source = StreamConverters.asOutputStream().mapMaterializedValue(os -&gt; {
            blobClient.download(os);
            return os;
        });

        ResponseEntity resposeEntity = HttpEntities.create(ContentTypes.APPLICATION_OCTET_STREAM, preAuthData.getFileSize(), source);

        sender().tell(new RequestResult(resposeEntity, StatusCodes.OK), self());

The idea is i'm calling an upstream service to get an outputstream populated by calling
blobClient.download(os);

It seems like the the lambda function gets called and returns, but then afterwards it fails, because there's no data or something. As if i'm not supposed to be have that lambda function do the work, but perhaps return some object that does the work? Not sure.

How does one do this?

答案1

得分: 1

实际问题在于 Azure API 并没有设计用于反压。输出流无法向 Azure 发信号表明尚未准备好接收更多数据。换句话说,如果 Azure 推送数据的速度超过您的消费速度,就可能会在某个地方发生丑陋的缓冲区溢出故障。

接受这个事实后,我们可以做的下一件最好的事情是:

  • 使用 Source.lazySource 仅在下游需求出现时(即源正在运行并且数据正在被请求)才开始下载数据。
  • download 调用放在另一个线程中,以便它在不阻塞源的情况下继续执行。一种方法是使用 Future(我不确定 Java 的最佳实践是什么,但无论哪种方式都应该可以正常工作)。虽然最初可能无关紧要,但您可能需要选择一个与 system.dispatcher 不同的执行上下文 - 这完全取决于 download 是否阻塞。

如果这段 Java 代码存在问题,我提前道歉 - 我使用 Scala 的 Akka,所以这些都是根据查阅 Akka Java API 和 Java 语法参考得出的。

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // 等待下游需求以初始化源...
  Source.lazySource(() -> {
    // 在源开始运行之前,预先实例化输出流
    Pair<OutputStream, Source<ByteString, NotUsed>> pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // 在单独的线程中开始向下载流写入
    Futures.future(() -> { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // 返回源 - 由于 `lazySource` 表示有需求,它应该开始运行
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());
英文:

The real issue here is that the Azure API is not designed for back-pressuring. There is no way for the output stream to signal back to Azure that it is not ready for more data. To put it another way: if Azure pushes data faster than you are able to consume it, there will have to be some ugly buffer overflow failure somewhere.

Accepting this fact, the next best thing we can do is:

  • Use Source.lazySource to only start downloading data when there is downstream demand (aka. the source is being run and data is being requested).
  • Put the download call in some other thread so that it continues executing without blocking the source from being returned. Once way to do this is with a Future (I'm not sure what Java best practices are, but should work fine either way). Although it won't matter initially, you may need to choose an execution context other than system.dispatcher - it all depends on whether download is blocking or not.

I apologize in advance if this Java code is malformed - I use Akka with Scala, so this is all from looking at the Akka Java API and Java syntax reference.

ResponseEntity responseEntity = HttpEntities.create(
  ContentTypes.APPLICATION_OCTET_STREAM,
  preAuthData.getFileSize(),

  // Wait until there is downstream demand to intialize the source...
  Source.lazySource(() -&gt; {
    // Pre-materialize the outputstream before the source starts running
    Pair&lt;OutputStream, Source&lt;ByteString, NotUsed&gt;&gt; pair =
      StreamConverters.asOutputStream().preMaterialize(system);

    // Start writing into the download stream in a separate thread
    Futures.future(() -&gt; { blobClient.download(pair.first()); return pair.first(); }, system.getDispatcher());

    // Return the source - it should start running since `lazySource` indicated demand
    return pair.second();
  })
);

sender().tell(new RequestResult(responseEntity, StatusCodes.OK), self());

</details>



# 答案2
**得分**: 0

```java
在这种情况下,`OutputStream``Source`物化值”,只有在流被运行物化为正在运行的流时才会被创建运行它是不在你的控制范围内的因为你将 `Source` 交给了 Akka HTTP而后者将在以后实际运行你的源

`.mapMaterializedValue(matval -&gt; ...)` 通常用于转换物化值但由于它作为物化的一部分被调用你可以在其中执行诸如将 matval 发送到消息中的副作用就像你已经找出的那样即使看起来有些奇怪这并没有必然的问题重要的是要理解在 lambda 完成之前流不会完成其物化并开始运行这意味着如果 `download()` 方法是阻塞的而不是在不同的线程上启动一些工作并立即返回则会出现问题

然而还有另一种解决方案:`Source.preMaterialize()`,它将源物化并为你提供一个物化值的 `Pair`,以及一个可以用来消耗已经启动的源的新 `Source`:

```java
Pair&lt;OutputStream, Source&lt;ByteString, NotUsed&gt;&gt; pair = 
  StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source&lt;ByteString, NotUsed&gt; source = pair.second();

请注意,在你的代码中还有一些其他要考虑的事项,最重要的是,如果 blobClient.download(os) 调用会阻塞直到完成,并且你从 actor 中调用它,在这种情况下,你必须确保你的 actor 不会饿死分发器,并阻止应用程序中的其他 actor 执行(请参阅 Akka 文档:https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management)。


<details>
<summary>英文:</summary>
The `OutputStream` in this case is the &quot;materialized value&quot; of the `Source` and it will only be created once the stream is run (or &quot;materialized&quot; into a running stream). Running it is out of your control since you hand the `Source` to Akka HTTP and that will later actually run your source.
`.mapMaterializedValue(matval -&gt; ...)` is usually used to transform the materialized value but since it is invoked as a part of materialization you can use that to do side effects such as sending the matval in a message, just like you have figured out, there isn&#39;t necessarily anything wrong with that even if it looks funky. It is important to understand that the stream will not complete its materialization and become running until that lambda completes. This means problems if `download()` is blocking rather than forking off some work on a different thread and immediately returning.
There is however another solution: `Source.preMaterialize()`, it materializes the source and gives you a `Pair` of the materialized value and a new `Source` that can be used to consume the already started source:
```java
Pair&lt;OutputStream, Source&lt;ByteString, NotUsed&gt;&gt; pair = 
StreamConverters.asOutputStream().preMaterialize(system);
OutputStream os = pair.first();
Source&lt;ByteString, NotUsed&gt; source = pair.second();

Note that there are a few additional things to think of in your code, most importantly if the blobClient.download(os) call blocks until it is done and you call that from the actor, in that case you must make sure that your actor does not starve the dispatcher and stop other actors in your application from executing (see Akka docs: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking-needs-careful-management ).

huangapple
  • 本文由 发表于 2020年4月5日 10:56:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/61037458.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定