英文:
Is there a good way to add FutureLocal.java to custom Future.java extending CompletableFuture? (example code below)
问题
我有以下代码,它在调用super.thenCompose时返回一个CompletableFuture,而不是我自定义的Custom Future.java,这在关键时刻有点接近。我试图复制Twitter的Scala Futures,它们具有以下功能:
- 能够添加类似Twitter Scala Futures的取消链接。
- 可以在thenApply和thenCompose链中传递请求上下文,以修复slf4j中的MDC(类似于ThreadLocal,但在每次lambda运行之前重新应用,如下面的代码所示)。
public class Future<T> extends CompletableFuture<T> {
@Override
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) {
Map<String, Object> state = FutureLocal.fetchState();
MyFunction f = new MyFunction(state, fn);
return super.thenApply(f);
}
@Override
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) {
Map<String, Object> state = FutureLocal.fetchState();
MyFunction f = new MyFunction(state, fn);
return super.thenCompose(f);
}
// ... 其他方法 ...
private class MyFunction implements Function {
private Map<String, Object> state;
private Function fn;
public MyFunction(Map<String, Object> state, Function fn) {
this.state = state;
this.fn = fn;
}
@Override
public Object apply(Object t) {
try {
FutureLocal.restoreState(state);
return fn.apply(t);
} finally {
FutureLocal.restoreState(null);
}
}
}
}
这是我用来运行上述代码的一些代码,但在映射中记录“test”开始在第3次远程调用时失败,这意味着slf4j MDC将会失效。
public class TestCustomFutures {
private Executor exec = Executors.newFixedThreadPool(3);
@Test
public void testFutureContext() throws InterruptedException, ExecutionException {
Set<Integer> hashSet = new HashSet<Integer>();
FutureLocal.put("test", 100);
CompletableFuture<Integer> f = myRemoteCall(4)
.thenCompose(s -> myRemoteCall(3))
.thenCompose(s -> myRemoteCall(2));
f.get();
}
private Future<Integer> myRemoteCall(int i) {
System.out.println("result=" + i + " map=" + FutureLocal.get("test") + " thread=" + Thread.currentThread().getName());
Future<Integer> f = new Future<Integer>();
exec.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}
f.complete(i);
}
});
return f;
}
}
输出结果如下:
result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2
请注意,我们不希望最后一个值为null。
英文:
I have the following code which was kind of close except when I call super.thenCompose and it returns a CompletableFuture instead of my Custom Future.java which is kind of critical. I am trying to copy twitter's scala futures that
-
Be able to add cancellation chaining like twitter scala's futures
-
can have a request context flow through the thenApply and thenCompose chains to fix MDC in slf4j (much like a ThreadLocal but it is re-applied just before each lambda is run as seen in the code below)
public class Future<T> extends CompletableFuture<T> {
@Override public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn) { Map<String, Object> state = FutureLocal.fetchState(); MyFunction f = new MyFunction(state, fn); return super.thenApply(f); } @Override public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) { Map<String, Object> state = FutureLocal.fetchState(); MyFunction f = new MyFunction(state, fn); return super.thenCompose(f); } @SuppressWarnings("hiding") private class MyFunction implements Function { private Map<String, Object> state; private Function fn; public MyFunction(Map<String, Object> state, @SuppressWarnings("rawtypes") Function fn) { this.state = state; this.fn = fn; } @Override public Object apply(Object t) { try { FutureLocal.restoreState(state); return fn.apply(t); } finally { FutureLocal.restoreState(null); } } } @Override public boolean complete(T value) { return super.complete(value); } @Override public boolean completeExceptionally(Throwable ex) { return super.completeExceptionally(ex); }
}
Here is some code I use to run that code but logging the "test" in the map starts failing on the 3rd remote call meaning slf4j MDC will break down.
public class TestCustomFutures {
private Executor exec = Executors.newFixedThreadPool(3);
@Test
public void testFutureContext() throws InterruptedException, ExecutionException {
Set<Integer> hashSet = new HashSet<Integer>();
FutureLocal.put("test", 100);
CompletableFuture<Integer> f = myRemoteCall(4)
.thenCompose(s -> myRemoteCall(3))
.thenCompose(s -> myRemoteCall(2));
f.get();
}
private Future<Integer> myRemoteCall(int i) {
System.out.println("result="+i+" map="+FutureLocal.get("test")+" thread="+Thread.currentThread().getName());
Future<Integer> f = new Future<Integer>();
exec.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
f.completeExceptionally(e);
}
f.complete(i);
}
});
return f;
}
}
The output then is this
result=4 map=100 thread=main
result=3 map=100 thread=pool-1-thread-1
result=2 map=null thread=pool-1-thread-2
notice that last value we do not want to be null
答案1
得分: 0
ahhh,我错过了一个简单的东西,因为我在jdk8中。然而在jdk11中,您可以覆盖它...
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new Future<U>();
}
在jdk8中,由于某种原因,这段代码无法编译,并且不会调用这个方法 :(. 糟糕,我还不想升级到11,因为还有一些用法仍然在jdk8上 :(.
英文:
ahhh, I was missing one simple thing BECAUSE I was in jdk8. In jdk11 however, you can override this...
@Override
public <U> CompletableFuture<U> newIncompleteFuture() {
return new Future<U>();
}
In jdk8, for some reason, this would not compile and it would not invoke this :(. Crap, I didn't want to upgrade to 11 yet as some usages are stilll on jdk8 :(.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论