将一个带有onSuccess/onFailure的ListenableFuture重构为一个CompletableFuture。

huangapple go评论126阅读模式
英文:

Refactor a ListenableFuture (with onSuccess/onFailure) into a CompletableFuture

问题

因为升级了Kafka库,我需要将以下代码从ListenableFuture重写为使用CompletableFuture。差异在这个线程中有简要解释。

被处理的对象名为SendResult<String, Object>

旧代码如下:

  1. ListenableFuture<SendResult<String, Object>> future = ...;
  2. future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
  3. @Override
  4. public void onSuccess(final SendResult<String, Object> result) {
  5. ProducerRecord<String, Object> record = result.getProducerRecord();
  6. CaseStatusRequest data = (CaseStatusRequest) record.value();
  7. logger.info("Producing request succeeded: {}", data);
  8. }
  9. @Override
  10. public void onFailure(final Throwable throwable) {
  11. logger.error("Producing request failed: {}", request.getReceiptNumber());
  12. }
  13. });

我理解的是,ListenableFuture.onSuccess的类似方法是CompletableFuture.whenComplete。这种重构很简单。但是ListenableFuture.onFailure没有明确的等效方法。有CompletableFuture.exceptionally,它必须返回某些内容,但不应该这样做;错误处理应该是一个void操作。

新代码如下:

  1. CompletableFuture<SendResult<String, Object>> future = ...;
  2. future.whenComplete(new BiConsumer<SendResult<String, Object>, Throwable>() {
  3. @Override
  4. public void accept(SendResult<String, Object> result, Throwable u) {
  5. ProducerRecord<String, Object> record = result.getProducerRecord();
  6. CaseStatusRequest data = (CaseStatusRequest) record.value();
  7. logger.info("Producing request succeeded: {}", data);
  8. }
  9. });
  10. future.exceptionally(new Function<Throwable, SendResult<String, Object>>() {
  11. @Override
  12. public SendResult<String, Object> apply(Throwable arg0) {
  13. logger.error("Producing request failed: {}", request.getReceiptNumber());
  14. // 这里需要返回一些内容。
  15. // 我应该返回NULL吗?
  16. }
  17. });
英文:

Because of an upgrade in the Kafka library, I need to rewrite the following code with ListenableFuture into one that uses CompletableFuture. The difference is briefly explained in this thread.

The object being handled is called SendResult&lt;String,Object&gt;.

The legacy code is

  1. ListenableFuture&lt;SendResult&lt;String, Object&gt;&gt; future = ...;
  2. future.addCallback(new ListenableFutureCallback&lt;SendResult&lt;String, Object&gt;&gt;() {
  3. @Override
  4. public void onSuccess(final SendResult&lt;String, Object&gt; result) {
  5. ProducerRecord&lt;String, Object&gt; record = result.getProducerRecord();
  6. CaseStatusRequest data = (CaseStatusRequest) record.value();
  7. logger.info(&quot;Producing request succeeded: {}&quot;, data);
  8. }
  9. @Override
  10. public void onFailure(final Throwable throwable) {
  11. logger.error(&quot;Producing request failed: {}&quot;, request.getReceiptNumber());
  12. }
  13. });

My understanding is, the analogue of ListenableFuture.onSuccess is CompletableFuture.whenComplete. That refactoring was straightforward. But the ListenableFuture.onFailure doesn't have a clear equivalent. There is CompletableFuture.exceptionally which must return something, which shouldn't be done; the error handling should be a void operation.

  1. CompletableFuture&lt;SendResult&lt;String, Object&gt;&gt; future = ...;
  2. future.whenComplete(new BiConsumer&lt;SendResult&lt;String,Object&gt;,Throwable&gt;() {
  3. @Override
  4. public void accept(SendResult&lt;String, Object&gt; result, Throwable u) {
  5. ProducerRecord&lt;String, Object&gt; record = result.getProducerRecord();
  6. CaseStatusRequest data = (CaseStatusRequest) record.value();
  7. logger.info(&quot;Producing request succeeded: {}&quot;, data);
  8. }
  9. });
  10. future.exceptionally(new Function&lt;Throwable, SendResult&lt;String,Object&gt;&gt;() {
  11. @Override
  12. public SendResult&lt;String, Object&gt; apply(Throwable arg0) {
  13. logger.error(&quot;Producing request failed: {}&quot;, request.getReceiptNumber());
  14. // Something needs to be returned here.
  15. // Should I return NULL?
  16. }
  17. });

答案1

得分: 2

whenComplete 函数将在成功和失败情况下都被调用。whenComplete 的签名接受一个 throwable,用于表示在发生故障时将使用异常值调用此函数。

因此,您可能根本不需要 exceptionally 方法。

类似于以下内容:

  1. future.whenComplete(new BiConsumer<SendResult<String,Object>, Throwable>() {
  2. @Override
  3. public void accept(SendResult<String, Object> result, Throwable u) {
  4. if (u != null) {
  5. logger.error(....)
  6. } else {
  7. ProducerRecord<String, Object> record = result.getProducerRecord();
  8. CaseStatusRequest data = (CaseStatusRequest) record.value();
  9. logger.info("Producing request succeeded: {}", data);
  10. }
  11. }
  12. });

替代实现

您可以利用Kafka异步发送方法,并具有回调来处理结果。这将需要对您的旧代码进行一些重构。

类似于这样:

  1. producer.send(producerRecord, new Callback() {
  2. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  3. // 每当成功发送记录或抛出异常时都会执行
  4. if (e == null) {
  5. // 记录已成功发送
  6. log.info("Received new metadata. \n" +
  7. "Topic:" + recordMetadata.topic() + "\n" +
  8. "Partition: " + recordMetadata.partition() + "\n" +
  9. "Offset: " + recordMetadata.offset() + "\n" +
  10. "Timestamp: " + recordMetadata.timestamp());
  11. } else {
  12. log.error("Error while producing", e);
  13. }
  14. }
  15. });
英文:

The whenComplete function will be invoked for both success and failure cases. The signature of whenComplete takes the throwable which denotes that in case of a failure, this function will be called with the exception value.

So you may not require the exceptionally method at all.

Something like

  1. future.whenComplete(new BiConsumer&lt;SendResult&lt;String,Object&gt;,Throwable&gt;() {
  2. @Override
  3. public void accept(SendResult&lt;String, Object&gt; result, Throwable u) {
  4. if (u != null) {
  5. logger.error(....)
  6. }
  7. else {
  8. ProducerRecord&lt;String, Object&gt; record = result.getProducerRecord();
  9. CaseStatusRequest data = (CaseStatusRequest) record.value();
  10. logger.info(&quot;Producing request succeeded: {}&quot;, data);
  11. }
  12. }
  13. });

Alternative implementation

You could leverage Kafka Async send method and have a callback to handle the results. This would require some more refactoring of your older code base.

Something like this

  1. producer.send(producerRecord, new Callback() {
  2. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
  3. // executes every time a record is successfully sent or an exception is thrown
  4. if (e == null) {
  5. // the record was successfully sent
  6. log.info(&quot;Received new metadata. \n&quot; +
  7. &quot;Topic:&quot; + recordMetadata.topic() + &quot;\n&quot; +
  8. &quot;Partition: &quot; + recordMetadata.partition() + &quot;\n&quot; +
  9. &quot;Offset: &quot; + recordMetadata.offset() + &quot;\n&quot; +
  10. &quot;Timestamp: &quot; + recordMetadata.timestamp());
  11. } else {
  12. log.error(&quot;Error while producing&quot;, e);
  13. }
  14. }
  15. });

huangapple
  • 本文由 发表于 2023年3月1日 10:52:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/75599173.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定