可以安全地使用Java 8+的CompletableFuture异步地与StreamObserver交互吗?

huangapple go评论61阅读模式
英文:

Is it safe to interact with StreamObserver asynchronously, i.e. with Java 8+ CompletableFutures?

问题

我正在查看来自grpc.io基础教程的Simple RPC示例

    @Override
    public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      responseObserver.onNext(checkFeature(request));
      responseObserver.onCompleted();
    }
    
    ...
    
    private Feature checkFeature(Point location) {
      for (Feature feature : features) {
        if (feature.getLocation().getLatitude() == location.getLatitude()
            && feature.getLocation().getLongitude() == location.getLongitude()) {
          return feature;
        }
      }
    
      // 未找到功能,返回一个未命名的功能。
      return Feature.newBuilder().setName("").setLocation(location).build();
    }

从其他线程与StreamObserver交互是否有任何注意事项? 例如,假设checkFeature() 异步地调用另一个服务,返回一个 CompletableFuture:

    @Override
    public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      checkFeature(request).
          thenAccept(feature -> responseObserver.onNext(feature));
      responseObserver.onCompleted();
    }

当然,上述代码不起作用,因为第一个线程会在返回功能之前执行 onCompleted()。所以让我们修复一下:

    @Override
    public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      checkFeature(request).
          thenAccept(feature -> {
            responseObserver.onNext(feature);
            responseObserver.onCompleted();
          });
    }

我认为这应该可以工作,但我对Java还不熟悉,所以想知道有什么影响。例如:

  • Context.current()是否一致?
  • 除了一元调用的onNext()onError(),是否有任何因素会导致StreamObserver过早销毁或关闭?
  • 有没有更好的做法?

如果有人可以向我解释一下他们是如何推理的,那将会很好。我尝试查找实际的StreamObserver实现,但不确定要寻找什么。

英文:

I'm looking at the Simple RPC example from grpc.io's basic tutorial:

@Override
public void getFeature(Point request, StreamObserver&lt;Feature&gt; responseObserver) {
  responseObserver.onNext(checkFeature(request));
  responseObserver.onCompleted();
}

...

private Feature checkFeature(Point location) {
  for (Feature feature : features) {
    if (feature.getLocation().getLatitude() == location.getLatitude()
        &amp;&amp; feature.getLocation().getLongitude() == location.getLongitude()) {
      return feature;
    }
  }

  // No feature was found, return an unnamed feature.
  return Feature.newBuilder().setName(&quot;&quot;).setLocation(location).build();
}

Are there any caveats to interacting with the StreamObserver from other threads? For example, say checkFeature() asynchronously hits another service, returning a CompletableFuture:

@Override
public void getFeature(Point request, StreamObserver&lt;Feature&gt; responseObserver) {
  checkFeature(request).
      thenAccept(feature -&gt; responseObserver.onNext(feature));
  responseObserver.onCompleted();
}

Of course the above wouldn't work because the first thread would execute onCompleted() before the feature is returned. So let's fix that:

@Override
public void getFeature(Point request, StreamObserver&lt;Feature&gt; responseObserver) {
  checkFeature(request).
      thenAccept(feature -&gt; {
        responseObserver.onNext(feature);
        responseObserver.onCompleted();
      });
}

I think this should work, but I'm new to Java so I wonder what ramifications there are. For example,

  • Will Context.current() be consistent?
  • Will anything cause the StreamObserver to destruct or close prematurely besides onNext() for a unary calls and onError()?
  • Is there a better practice?

It would be great if someone could also step me through how they reasoned. I tried looking up actual implementations of StreamObserver but I wasn't sure what to look for.

答案1

得分: 2

Sure, here's the translated content:

Will Context.current() be consistent?

Context.current() 使用了 ThreadLocal。如果你在不同的线程上访问它,它将不会保持一致。你可以在线程之间传播上下文。你可能会发现这篇帖子有用。

除了一元调用的 onNext() 和 onError() 之外,还有什么会导致 StreamObserver 提前销毁或关闭?

是的,StreamObserver 的正常流程在 onError 或 onCompleted 时结束。

正如 StreamObserverjavadoc所述,"由于单个 StreamObserver 不具备线程安全性,如果多个线程将同时向 StreamObserver 写入,则应用程序必须同步调用"。如果你在并发地调用 StreamObserver,你需要同步这些调用。换句话说,如果你确定即使在使用多个线程的情况下它不会被并发调用,那应该是没问题的。

如果在多个线程上访问同一个 StreamObserver,我建议除非性能关键,否则尽量进行同步,因为这容易引发错误。至少,值得加上详细的注释。

英文:

> Will Context.current() be consistent?

Context.current() is using ThreadLocal. if you are accessing it on a different thread, it won't be consistent. You can propagate context between threads. You may find this post useful.

> Will anything cause the StreamObserver to destruct or close prematurely besides onNext() for a unary calls and onError()?

Yes, Normal flow of StreamObserver ends with onError or onCompleted.

As StreamObserver javadoc states, "Since individual StreamObservers are not thread-safe, if multiple threads will be writing to a StreamObserver concurrently, the application must synchronize calls". If you are calling StreamObserver concurrently, you need to synchronize the calls. In other words, if you know for sure it won't be called concurrently even if you are using multiple threads, it should be fine.

If accessing the same StreamObserver on multiple threads, I would try to synchronize it unless the performance is critical since it is error prone. At least, it deserves a nice comment.

答案2

得分: 2

使用 thenAccept() 调用 onNext()onCompleted() 是可以的,因为观察者不会从多个线程同时调用。

那个"错误"的示例单独调用了 onCompleted(),也是错误的,因为它可能在没有任何形式的同步的情况下从多个线程调用观察者。StreamObservers 不能同时从多个线程调用。

然而,使用 thenAccept() 并不完全正确,因为它不能处理未来失败的情况。所以你需要同时接收 Throwable,可以使用 whenComplete() 来实现:

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
  checkFeature(request).
      whenComplete((feature, t) -> {
        if (t != null) {
          responseObserver.onError(t);
        } else {
          responseObserver.onNext(feature);
          responseObserver.onCompleted();
        }
      });
}

在处理该 lambda 时,上下文可能会出现问题。通常,我们会寻找"架构"解决方案,以确保上下文得以传播,比如在创建所有应用程序线程池时都将其包装在 Context.currentContextExecutor() 中,这样各个调用站点就不需要关心传播问题。我对 CompletableFuture 不够熟悉,无法为其提供策略。

英文:

Using thenAccept() to call onNext() and onCompleted() is fine, because the observer is not called concurrently from multiple threads.

The "broken" example that called onCompleted() separately was broken also because it could have called the observer from multiple threads without any form of synchronization. StreamObservers may not be called from multiple threads simultaneously.

Using thenAccept() isn't quite right though, as it doesn't handle the case where the future fails. So you need to receive the Throwable as well, which can be done with whenComplete():

@Override
public void getFeature(Point request, StreamObserver&lt;Feature&gt; responseObserver) {
  checkFeature(request).
      whenComplete((feature, t) -&gt; {
        if (t != null) {
          responseObserver.onError(t);
        } else {
          responseObserver.onNext(feature);
          responseObserver.onCompleted();
        }
      });
}

The Context could easily be "wrong" when processing that lambda. Typically we'd look for "architectural" solutions to make sure the context is propagated, like wrapping all application thread pools in Context.currentContextExecutor() when creating them, so individual call sites don't need to be concerned with propagation. I'm not familiar enough with CompletableFuture to provide strategy for it.

huangapple
  • 本文由 发表于 2020年5月5日 15:17:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/61607744.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定