异常在执行Akka Streams时抛出,不包含对源代码的引用。

huangapple go评论63阅读模式
英文:

Excetption thrown while executing Akka Streams doesn't contain any references to the source code

问题

以下是您提供的代码的翻译部分:

这是一个Akka Streams Scala应用程序它从Kafka主题消费消息将它们转换为Dynamo DB请求并执行它们有时该应用程序会抛出以下异常

异常堆栈仅引导到库方法调用而不是实际应用程序源代码的位置有关查找错误实际原因的想法或如何设置Akka以正确报告异常的任何建议

这是大致制作Dynamo DB请求的代码

```scala
def makeRequest[In <: DynamoDbRequest, Out <: DynamoDbResponse](
    client: DynamoDbAsyncClient,
    retries: DynamoDBRetries,
    request: In
)(implicit
    dbOp: DynamoDbOp[In, Out],
    system: ActorSystem
): Future[Out] = {
  implicit val implicitClient: DynamoDbAsyncClient = client
  val source = RestartSource.onFailuresWithBackoff(
    RestartSettings(retries.minBackoff, retries.maxBackoff, retries.randomFactor)
  ) { () =>
    Source.single(request).via(DynamoDb.flow(1)).map(Success.apply).recover {
      // some error handling
    }
  }
  source.map(_.get).runWith(Sink.head)
}

以下是链接到Dynamo DB调用的代码:

makeRequest(
  client,
  config.retries,
  req
).flatMap { result => makeRequest(
  client,
  config.retries,
  anotherRequest
) }

希望这能帮助您进一步分析和解决问题。

英文:

There is an akka streams scala application, it consumes messages from kafka topics, converts them to Dynamo DB requests, and executes them. Sometimes the app throws exceptions these exceptions:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -&gt; 200 OK Default(13512 bytes)
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:65)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:103)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:184)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:79)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:103)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.completeResponseFuture(MakeAsyncHttpRequestStage.java:240)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$3(MakeAsyncHttpRequestStage.java:163)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -&gt; 200 OK Default(13512 bytes)
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:47)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:223)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:218)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:182)
... 23 common frames omitted
Caused by: java.util.concurrent.TimeoutException: Response entity was not subscribed after 1 second. Make sure to read the response `entity` body or call `entity.discardBytes()` on it -- in case you deal with `HttpResponse`, use the shortcut `response.discardEntityBytes()`. POST / Default(104 bytes) -&gt; 200 OK Default(13512 bytes)
at akka.http.impl.engine.client.pool.SlotState$WaitingForResponseEntitySubscription.onTimeout(SlotState.scala:313)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$onTimeout$1(NewHostConnectionPool.scala:187)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Event$.$anonfun$event0$1(NewHostConnectionPool.scala:189)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.runOneTransition$1(NewHostConnectionPool.scala:277)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.loop$1(NewHostConnectionPool.scala:364)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.updateState(NewHostConnectionPool.scala:373)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.updateState(NewHostConnectionPool.scala:267)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.$anonfun$updateState$2(NewHostConnectionPool.scala:289)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1.$anonfun$safeCallback$1(NewHostConnectionPool.scala:616)
at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1.$anonfun$safeCallback$1$adapted(NewHostConnectionPool.scala:616)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:467)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:517)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java. base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

The stack trace only leads back to the libraries method calls, but not to the actual application source code, any ideas on where to search for the actual cause of the error or how to setup akka to properly report the exception?

Here is roughly the code that makes requests to the dynamo DB:

  def makeRequest[In &lt;: DynamoDbRequest, Out &lt;: DynamoDbResponse](
client: DynamoDbAsyncClient,
retries: DynamoDBRetries,
request: In
)(implicit
dbOp: DynamoDbOp[In, Out],
system: ActorSystem
): Future[Out] = {
implicit val implicitClient: DynamoDbAsyncClient = client
val source = RestartSource.onFailuresWithBackoff(
RestartSettings(retries.minBackoff, retries.maxBackoff, retries.randomFactor)
) { () =&gt;
Source.single(request).via(DynamoDb.flow(1)).map(Success.apply).recover {
// some error handling
}
}
source.map(_.get).runWith(Sink.head)
}

and here is the code that chains the calls to the DDB:

    makeRequest(
client,
config.retries,
req
).flatMap { result =&gt; makeRequest(
client,
config.retries,
anotherRequest
) }

答案1

得分: 1

我不确定你的代码正在做什么,但根据《Akka Streams 集成中请求/响应实体流的含义》的描述,似乎你正在尝试消耗HTTP响应但未处理实体。

这个错误表明HTTP响应已经可用太长时间而没有被消耗。

解决方法可能是:

可以通过增加订阅超时来部分解决,但仍有可能在网络层面遇到超时问题,并且在负载下仍可能超过超时时间。

akka.http.host-connection-pool.response-entity-subscription-timeout = 10.seconds

建议的解决方案是实现对实体的处理。

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

case class ExamplePerson(name: String)

def parse(line: ByteString): Option[ExamplePerson] =
  line.utf8String.split(" ").headOption.map(ExamplePerson.apply)

val requests: Source[HttpRequest, NotUsed] = Source
  .fromIterator(() =>
    Range(0, 10).map(i => HttpRequest(uri = Uri(s"https://localhost/people/$i"))).iterator
  )

val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] =
  Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0))

// 运行并完全消耗单个Akka HTTP请求
def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] =
  Http()
    .singleRequest(req)
    .flatMap { response =>
      response.entity.dataBytes
        .runReduce(_ ++ _)
        .map(parse)
    }

// 运行每个Akka HTTP流到完成,然后继续处理。您需要调整`mapAsync`的`parallelism`参数--较高的值会创建更多的CPU和内存负载,这可能会或可能不会对性能产生积极影响。
requests
  .mapAsync(2)(runRequest)
  .via(processorFlow)
  .runWith(Sink.ignore)

更多信息:

英文:

I'm not sure what is your code doing, but based on Implications of the streaming nature of Request/Response Entities - Integrating with Akka Streams, sounds like you are trying to consume http responses without handling the entity.

> This error indicates that the http response has been available for too long without being consumed.

The workaround could be

> It can be partially worked around by increasing the subscription timeout, but you will still run the risk of running into network level timeouts and could still exceed the timeout under load

akka.http.host-connection-pool.response-entity-subscription-timeout = 10.seconds

The suggested solution is implement the handling of the entity

implicit val system: ActorSystem = ActorSystem()
implicit val dispatcher: ExecutionContext = system.dispatcher

case class ExamplePerson(name: String)

def parse(line: ByteString): Option[ExamplePerson] =
  line.utf8String.split(&quot; &quot;).headOption.map(ExamplePerson.apply)

val requests: Source[HttpRequest, NotUsed] = Source
  .fromIterator(() =&gt;
    Range(0, 10).map(i =&gt; HttpRequest(uri = Uri(s&quot;https://localhost/people/$i&quot;))).iterator
  )

val processorFlow: Flow[Option[ExamplePerson], Int, NotUsed] =
  Flow[Option[ExamplePerson]].map(_.map(_.name.length).getOrElse(0))

// Run and completely consume a single akka http request
def runRequest(req: HttpRequest): Future[Option[ExamplePerson]] =
  Http()
    .singleRequest(req)
    .flatMap { response =&gt;
      response.entity.dataBytes
        .runReduce(_ ++ _)
        .map(parse)
    }

// Run each akka http flow to completion, then continue processing. You&#39;ll want to tune the `parallelism`
// parameter to mapAsync -- higher values will create more cpu and memory load which may or may not positively
// impact performance.
requests
  .mapAsync(2)(runRequest)
  .via(processorFlow)
  .runWith(Sink.ignore)

For more info:

huangapple
  • 本文由 发表于 2023年7月18日 01:59:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76707027.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定