Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

huangapple go评论85阅读模式
英文:

Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)

问题

我有以下代码,它在调用super.thenCompose时返回一个CompletableFuture,而不是我自定义的Custom Future.java,这在关键时刻有点接近。我试图复制Twitter的Scala Futures,它们具有以下功能:

  1. 能够添加类似Twitter Scala Futures的取消链接。
  2. 可以在thenApply和thenCompose链中传递请求上下文,以修复slf4j中的MDC(类似于ThreadLocal,但在每次lambda运行之前重新应用,如下面的代码所示)。
public class Future<T> extends CompletableFuture<T> {

    @Override
    public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);		
        
        return super.thenApply(f);
    }

    @Override
    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
        Map<String, Object> state = FutureLocal.fetchState();
        MyFunction f = new MyFunction(state, fn);
        
        return super.thenCompose(f);
    }

    // ... 其他方法 ...

    private class MyFunction implements Function {

        private Map<String, Object> state;
        private Function fn;

        public MyFunction(Map<String, Object> state, Function fn) {
            this.state = state;
            this.fn = fn;
        }

        @Override
        public Object apply(Object t) {
            try {
                FutureLocal.restoreState(state);
                return fn.apply(t);
            } finally {
                FutureLocal.restoreState(null);
            }
        }
    }
}

这是我用来运行上述代码的一些代码,但在映射中记录“test”开始在第3次远程调用时失败,这意味着slf4j MDC将会失效。

public class TestCustomFutures {

    private Executor exec = Executors.newFixedThreadPool(3);

    @Test
    public void testFutureContext() throws InterruptedException, ExecutionException {
        Set<Integer> hashSet = new HashSet<Integer>();
        FutureLocal.put("test", 100);

        CompletableFuture<Integer> f = myRemoteCall(4)
            .thenCompose(s -> myRemoteCall(3))
            .thenCompose(s -> myRemoteCall(2));

        f.get();
    }

    private Future<Integer> myRemoteCall(int i) {
        System.out.println("result=" + i + " map=" + FutureLocal.get("test") + " thread=" + Thread.currentThread().getName());
        
        Future<Integer> f = new Future<Integer>();
        exec.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    f.completeExceptionally(e);
                }
                f.complete(i);
            }
        });
        return f;
    }
}

输出结果如下:

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

请注意,我们不希望最后一个值为null。

英文:

I have the following code which was kind of close except when I call super.thenCompose and it returns a CompletableFuture instead of my Custom Future.java which is kind of critical. I am trying to copy twitter's scala futures that

  1. Be able to add cancellation chaining like twitter scala's futures

  2. can have a request context flow through the thenApply and thenCompose chains to fix MDC in slf4j (much like a ThreadLocal but it is re-applied just before each lambda is run as seen in the code below)

    public class Future<T> extends CompletableFuture<T> {

    @Override
    public &lt;U&gt; CompletableFuture&lt;U&gt; thenApply(Function&lt;? super T, ? extends U&gt; fn) {
    Map&lt;String, Object&gt; state = FutureLocal.fetchState();
    MyFunction f = new MyFunction(state, fn);		
    return super.thenApply(f);
    }
    @Override
    public &lt;U&gt; CompletableFuture&lt;U&gt; thenCompose(Function&lt;? super T, ? extends CompletionStage&lt;U&gt;&gt; fn) {
    Map&lt;String, Object&gt; state = FutureLocal.fetchState();
    MyFunction f = new MyFunction(state, fn);
    return super.thenCompose(f);
    }
    @SuppressWarnings(&quot;hiding&quot;)
    private class MyFunction implements Function {
    private Map&lt;String, Object&gt; state;
    private Function fn;
    public MyFunction(Map&lt;String, Object&gt; state, @SuppressWarnings(&quot;rawtypes&quot;) Function fn) {
    this.state = state;
    this.fn = fn;
    }
    @Override
    public Object apply(Object t) {
    try {
    FutureLocal.restoreState(state);
    return fn.apply(t);
    } finally {
    FutureLocal.restoreState(null);
    }
    }
    }
    @Override
    public boolean complete(T value) {
    return super.complete(value);
    }
    @Override
    public boolean completeExceptionally(Throwable ex) {
    return super.completeExceptionally(ex);
    }
    

    }

Here is some code I use to run that code but logging the "test" in the map starts failing on the 3rd remote call meaning slf4j MDC will break down.

public class TestCustomFutures {
private Executor exec = Executors.newFixedThreadPool(3);
@Test
public void testFutureContext() throws InterruptedException, ExecutionException {
Set&lt;Integer&gt; hashSet = new HashSet&lt;Integer&gt;();
FutureLocal.put(&quot;test&quot;, 100);
CompletableFuture&lt;Integer&gt; f = myRemoteCall(4)
.thenCompose(s -&gt; myRemoteCall(3))
.thenCompose(s -&gt; myRemoteCall(2));
f.get();
}
private Future&lt;Integer&gt; myRemoteCall(int i) {
System.out.println(&quot;result=&quot;+i+&quot; map=&quot;+FutureLocal.get(&quot;test&quot;)+&quot; thread=&quot;+Thread.currentThread().getName());
Future&lt;Integer&gt; f = new Future&lt;Integer&gt;();
exec.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}
f.complete(i);
}
});
return f;
}
}

The output then is this

result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2

notice that last value we do not want to be null

答案1

得分: 0

ahhh,我错过了一个简单的东西,因为我在jdk8中。然而在jdk11中,您可以覆盖它...

@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
    return new Future<U>();
}

在jdk8中,由于某种原因,这段代码无法编译,并且不会调用这个方法 :(. 糟糕,我还不想升级到11,因为还有一些用法仍然在jdk8上 :(.

英文:

ahhh, I was missing one simple thing BECAUSE I was in jdk8. In jdk11 however, you can override this...

@Override
public &lt;U&gt; CompletableFuture&lt;U&gt; newIncompleteFuture() {
return new Future&lt;U&gt;();
}

In jdk8, for some reason, this would not compile and it would not invoke this :(. Crap, I didn't want to upgrade to 11 yet as some usages are stilll on jdk8 :(.

huangapple
  • 本文由 发表于 2020年5月31日 06:39:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/62109484.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定