英文:
Refactor a ListenableFuture (with onSuccess/onFailure) into a CompletableFuture
问题
因为升级了Kafka库,我需要将以下代码从ListenableFuture
重写为使用CompletableFuture
。差异在这个线程中有简要解释。
被处理的对象名为SendResult<String, Object>
。
旧代码如下:
ListenableFuture<SendResult<String, Object>> future = ...;
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(final SendResult<String, Object> result) {
ProducerRecord<String, Object> record = result.getProducerRecord();
CaseStatusRequest data = (CaseStatusRequest) record.value();
logger.info("Producing request succeeded: {}", data);
}
@Override
public void onFailure(final Throwable throwable) {
logger.error("Producing request failed: {}", request.getReceiptNumber());
}
});
我理解的是,ListenableFuture.onSuccess
的类似方法是CompletableFuture.whenComplete
。这种重构很简单。但是ListenableFuture.onFailure
没有明确的等效方法。有CompletableFuture.exceptionally
,它必须返回某些内容,但不应该这样做;错误处理应该是一个void操作。
新代码如下:
CompletableFuture<SendResult<String, Object>> future = ...;
future.whenComplete(new BiConsumer<SendResult<String, Object>, Throwable>() {
@Override
public void accept(SendResult<String, Object> result, Throwable u) {
ProducerRecord<String, Object> record = result.getProducerRecord();
CaseStatusRequest data = (CaseStatusRequest) record.value();
logger.info("Producing request succeeded: {}", data);
}
});
future.exceptionally(new Function<Throwable, SendResult<String, Object>>() {
@Override
public SendResult<String, Object> apply(Throwable arg0) {
logger.error("Producing request failed: {}", request.getReceiptNumber());
// 这里需要返回一些内容。
// 我应该返回NULL吗?
}
});
英文:
Because of an upgrade in the Kafka library, I need to rewrite the following code with ListenableFuture
into one that uses CompletableFuture
. The difference is briefly explained in this thread.
The object being handled is called SendResult<String,Object>
.
The legacy code is
ListenableFuture<SendResult<String, Object>> future = ...;
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(final SendResult<String, Object> result) {
ProducerRecord<String, Object> record = result.getProducerRecord();
CaseStatusRequest data = (CaseStatusRequest) record.value();
logger.info("Producing request succeeded: {}", data);
}
@Override
public void onFailure(final Throwable throwable) {
logger.error("Producing request failed: {}", request.getReceiptNumber());
}
});
My understanding is, the analogue of ListenableFuture.onSuccess
is CompletableFuture.whenComplete
. That refactoring was straightforward. But the ListenableFuture.onFailure
doesn't have a clear equivalent. There is CompletableFuture.exceptionally
which must return something, which shouldn't be done; the error handling should be a void operation.
CompletableFuture<SendResult<String, Object>> future = ...;
future.whenComplete(new BiConsumer<SendResult<String,Object>,Throwable>() {
@Override
public void accept(SendResult<String, Object> result, Throwable u) {
ProducerRecord<String, Object> record = result.getProducerRecord();
CaseStatusRequest data = (CaseStatusRequest) record.value();
logger.info("Producing request succeeded: {}", data);
}
});
future.exceptionally(new Function<Throwable, SendResult<String,Object>>() {
@Override
public SendResult<String, Object> apply(Throwable arg0) {
logger.error("Producing request failed: {}", request.getReceiptNumber());
// Something needs to be returned here.
// Should I return NULL?
}
});
答案1
得分: 2
whenComplete
函数将在成功和失败情况下都被调用。whenComplete
的签名接受一个 throwable,用于表示在发生故障时将使用异常值调用此函数。
因此,您可能根本不需要 exceptionally
方法。
类似于以下内容:
future.whenComplete(new BiConsumer<SendResult<String,Object>, Throwable>() {
@Override
public void accept(SendResult<String, Object> result, Throwable u) {
if (u != null) {
logger.error(....)
} else {
ProducerRecord<String, Object> record = result.getProducerRecord();
CaseStatusRequest data = (CaseStatusRequest) record.value();
logger.info("Producing request succeeded: {}", data);
}
}
});
替代实现
您可以利用Kafka异步发送方法,并具有回调来处理结果。这将需要对您的旧代码进行一些重构。
类似于这样:
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// 每当成功发送记录或抛出异常时都会执行
if (e == null) {
// 记录已成功发送
log.info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
英文:
The whenComplete
function will be invoked for both success and failure cases. The signature of whenComplete
takes the throwable which denotes that in case of a failure, this function will be called with the exception value.
So you may not require the exceptionally
method at all.
Something like
future.whenComplete(new BiConsumer<SendResult<String,Object>,Throwable>() {
@Override
public void accept(SendResult<String, Object> result, Throwable u) {
if (u != null) {
logger.error(....)
}
else {
ProducerRecord<String, Object> record = result.getProducerRecord();
CaseStatusRequest data = (CaseStatusRequest) record.value();
logger.info("Producing request succeeded: {}", data);
}
}
});
Alternative implementation
You could leverage Kafka Async send method and have a callback to handle the results. This would require some more refactoring of your older code base.
Something like this
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
log.info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
log.error("Error while producing", e);
}
}
});
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论