Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

huangapple go评论109阅读模式
英文:

Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

问题

我有以下代码,它在调用super.thenCompose时返回一个CompletableFuture,而不是我自定义的Custom Future.java,这在关键时刻有点接近。我试图复制Twitter的Scala Futures,它们具有以下功能:

  1. 能够添加类似Twitter Scala Futures的取消链接。
  2. 可以在thenApply和thenCompose链中传递请求上下文,以修复slf4j中的MDC(类似于ThreadLocal,但在每次lambda运行之前重新应用,如下面的代码所示)。
  1. public class Future<T> extends CompletableFuture<T> {
  2. @Override
  3. public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
  4. Map<String, Object> state = FutureLocal.fetchState();
  5. MyFunction f = new MyFunction(state, fn);
  6. return super.thenApply(f);
  7. }
  8. @Override
  9. public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
  10. Map<String, Object> state = FutureLocal.fetchState();
  11. MyFunction f = new MyFunction(state, fn);
  12. return super.thenCompose(f);
  13. }
  14. // ... 其他方法 ...
  15. private class MyFunction implements Function {
  16. private Map<String, Object> state;
  17. private Function fn;
  18. public MyFunction(Map<String, Object> state, Function fn) {
  19. this.state = state;
  20. this.fn = fn;
  21. }
  22. @Override
  23. public Object apply(Object t) {
  24. try {
  25. FutureLocal.restoreState(state);
  26. return fn.apply(t);
  27. } finally {
  28. FutureLocal.restoreState(null);
  29. }
  30. }
  31. }
  32. }

这是我用来运行上述代码的一些代码,但在映射中记录“test”开始在第3次远程调用时失败,这意味着slf4j MDC将会失效。

  1. public class TestCustomFutures {
  2. private Executor exec = Executors.newFixedThreadPool(3);
  3. @Test
  4. public void testFutureContext() throws InterruptedException, ExecutionException {
  5. Set<Integer> hashSet = new HashSet<Integer>();
  6. FutureLocal.put("test", 100);
  7. CompletableFuture<Integer> f = myRemoteCall(4)
  8. .thenCompose(s -> myRemoteCall(3))
  9. .thenCompose(s -> myRemoteCall(2));
  10. f.get();
  11. }
  12. private Future<Integer> myRemoteCall(int i) {
  13. System.out.println("result=" + i + " map=" + FutureLocal.get("test") + " thread=" + Thread.currentThread().getName());
  14. Future<Integer> f = new Future<Integer>();
  15. exec.execute(new Runnable() {
  16. @Override
  17. public void run() {
  18. try {
  19. Thread.sleep(1000);
  20. } catch (InterruptedException e) {
  21. f.completeExceptionally(e);
  22. }
  23. f.complete(i);
  24. }
  25. });
  26. return f;
  27. }
  28. }

输出结果如下:

  1. result=4 map=100 thread=main
  2. result=3 map=100 thread=pool-1-thread-1
  3. result=2 map=null thread=pool-1-thread-2

请注意,我们不希望最后一个值为null。

英文:

I have the following code which was kind of close except when I call super.thenCompose and it returns a CompletableFuture instead of my Custom Future.java which is kind of critical. I am trying to copy twitter's scala futures that

  1. Be able to add cancellation chaining like twitter scala's futures

  2. can have a request context flow through the thenApply and thenCompose chains to fix MDC in slf4j (much like a ThreadLocal but it is re-applied just before each lambda is run as seen in the code below)

    public class Future<T> extends CompletableFuture<T> {

    1. @Override
    2. public &lt;U&gt; CompletableFuture&lt;U&gt; thenApply(Function&lt;? super T, ? extends U&gt; fn) {
    3. Map&lt;String, Object&gt; state = FutureLocal.fetchState();
    4. MyFunction f = new MyFunction(state, fn);
    5. return super.thenApply(f);
    6. }
    7. @Override
    8. public &lt;U&gt; CompletableFuture&lt;U&gt; thenCompose(Function&lt;? super T, ? extends CompletionStage&lt;U&gt;&gt; fn) {
    9. Map&lt;String, Object&gt; state = FutureLocal.fetchState();
    10. MyFunction f = new MyFunction(state, fn);
    11. return super.thenCompose(f);
    12. }
    13. @SuppressWarnings(&quot;hiding&quot;)
    14. private class MyFunction implements Function {
    15. private Map&lt;String, Object&gt; state;
    16. private Function fn;
    17. public MyFunction(Map&lt;String, Object&gt; state, @SuppressWarnings(&quot;rawtypes&quot;) Function fn) {
    18. this.state = state;
    19. this.fn = fn;
    20. }
    21. @Override
    22. public Object apply(Object t) {
    23. try {
    24. FutureLocal.restoreState(state);
    25. return fn.apply(t);
    26. } finally {
    27. FutureLocal.restoreState(null);
    28. }
    29. }
    30. }
    31. @Override
    32. public boolean complete(T value) {
    33. return super.complete(value);
    34. }
    35. @Override
    36. public boolean completeExceptionally(Throwable ex) {
    37. return super.completeExceptionally(ex);
    38. }

    }

Here is some code I use to run that code but logging the "test" in the map starts failing on the 3rd remote call meaning slf4j MDC will break down.

  1. public class TestCustomFutures {
  2. private Executor exec = Executors.newFixedThreadPool(3);
  3. @Test
  4. public void testFutureContext() throws InterruptedException, ExecutionException {
  5. Set&lt;Integer&gt; hashSet = new HashSet&lt;Integer&gt;();
  6. FutureLocal.put(&quot;test&quot;, 100);
  7. CompletableFuture&lt;Integer&gt; f = myRemoteCall(4)
  8. .thenCompose(s -&gt; myRemoteCall(3))
  9. .thenCompose(s -&gt; myRemoteCall(2));
  10. f.get();
  11. }
  12. private Future&lt;Integer&gt; myRemoteCall(int i) {
  13. System.out.println(&quot;result=&quot;+i+&quot; map=&quot;+FutureLocal.get(&quot;test&quot;)+&quot; thread=&quot;+Thread.currentThread().getName());
  14. Future&lt;Integer&gt; f = new Future&lt;Integer&gt;();
  15. exec.execute(new Runnable() {
  16. @Override
  17. public void run() {
  18. try {
  19. Thread.sleep(1000);
  20. } catch (InterruptedException e) {
  21. f.completeExceptionally(e);
  22. }
  23. f.complete(i);
  24. }
  25. });
  26. return f;
  27. }
  28. }

The output then is this

  1. result=4 map=100 thread=main
  2. result=3 map=100 thread=pool-1-thread-1
  3. result=2 map=null thread=pool-1-thread-2

notice that last value we do not want to be null

答案1

得分: 0

ahhh,我错过了一个简单的东西,因为我在jdk8中。然而在jdk11中,您可以覆盖它...

  1. @Override
  2. public <U> CompletableFuture<U> newIncompleteFuture() {
  3. return new Future<U>();
  4. }

在jdk8中,由于某种原因,这段代码无法编译,并且不会调用这个方法 :(. 糟糕,我还不想升级到11,因为还有一些用法仍然在jdk8上 :(.

英文:

ahhh, I was missing one simple thing BECAUSE I was in jdk8. In jdk11 however, you can override this...

  1. @Override
  2. public &lt;U&gt; CompletableFuture&lt;U&gt; newIncompleteFuture() {
  3. return new Future&lt;U&gt;();
  4. }

In jdk8, for some reason, this would not compile and it would not invoke this :(. Crap, I didn't want to upgrade to 11 yet as some usages are stilll on jdk8 :(.

huangapple
  • 本文由 发表于 2020年5月31日 06:39:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/62109484.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定