将一个带有onSuccess/onFailure的ListenableFuture重构为一个CompletableFuture。

huangapple go评论65阅读模式
英文:

Refactor a ListenableFuture (with onSuccess/onFailure) into a CompletableFuture

问题

因为升级了Kafka库,我需要将以下代码从ListenableFuture重写为使用CompletableFuture。差异在这个线程中有简要解释。

被处理的对象名为SendResult<String, Object>

旧代码如下:

ListenableFuture<SendResult<String, Object>> future = ...;
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {

    @Override
    public void onSuccess(final SendResult<String, Object> result) {

        ProducerRecord<String, Object> record = result.getProducerRecord();
        CaseStatusRequest data = (CaseStatusRequest) record.value();

        logger.info("Producing request succeeded: {}", data);
    }

    @Override
    public void onFailure(final Throwable throwable) {
        logger.error("Producing request failed: {}", request.getReceiptNumber());
    }
});

我理解的是,ListenableFuture.onSuccess的类似方法是CompletableFuture.whenComplete。这种重构很简单。但是ListenableFuture.onFailure没有明确的等效方法。有CompletableFuture.exceptionally,它必须返回某些内容,但不应该这样做;错误处理应该是一个void操作。

新代码如下:

CompletableFuture<SendResult<String, Object>> future = ...;
future.whenComplete(new BiConsumer<SendResult<String, Object>, Throwable>() {

    @Override
    public void accept(SendResult<String, Object> result, Throwable u) {
        ProducerRecord<String, Object> record = result.getProducerRecord();
        CaseStatusRequest data = (CaseStatusRequest) record.value();

        logger.info("Producing request succeeded: {}", data);	
    }	
});
future.exceptionally(new Function<Throwable, SendResult<String, Object>>() {

    @Override
    public SendResult<String, Object> apply(Throwable arg0) {
        logger.error("Producing request failed: {}", request.getReceiptNumber());
        // 这里需要返回一些内容。
        // 我应该返回NULL吗?
    }
});
英文:

Because of an upgrade in the Kafka library, I need to rewrite the following code with ListenableFuture into one that uses CompletableFuture. The difference is briefly explained in this thread.

The object being handled is called SendResult&lt;String,Object&gt;.

The legacy code is

        ListenableFuture&lt;SendResult&lt;String, Object&gt;&gt; future = ...;
		future.addCallback(new ListenableFutureCallback&lt;SendResult&lt;String, Object&gt;&gt;() {

			@Override
			public void onSuccess(final SendResult&lt;String, Object&gt; result) {

				ProducerRecord&lt;String, Object&gt; record = result.getProducerRecord();
				CaseStatusRequest data = (CaseStatusRequest) record.value();

				logger.info(&quot;Producing request succeeded: {}&quot;, data);
			}

			@Override
			public void onFailure(final Throwable throwable) {
				logger.error(&quot;Producing request failed: {}&quot;, request.getReceiptNumber());
			}
		});

My understanding is, the analogue of ListenableFuture.onSuccess is CompletableFuture.whenComplete. That refactoring was straightforward. But the ListenableFuture.onFailure doesn't have a clear equivalent. There is CompletableFuture.exceptionally which must return something, which shouldn't be done; the error handling should be a void operation.

		CompletableFuture&lt;SendResult&lt;String, Object&gt;&gt; future = ...;
		future.whenComplete(new BiConsumer&lt;SendResult&lt;String,Object&gt;,Throwable&gt;() {

			@Override
			public void accept(SendResult&lt;String, Object&gt; result, Throwable u) {
				ProducerRecord&lt;String, Object&gt; record = result.getProducerRecord();
				CaseStatusRequest data = (CaseStatusRequest) record.value();

				logger.info(&quot;Producing request succeeded: {}&quot;, data);	
			}	
		});
		future.exceptionally(new Function&lt;Throwable, SendResult&lt;String,Object&gt;&gt;() {

			@Override
			public SendResult&lt;String, Object&gt; apply(Throwable arg0) {
				logger.error(&quot;Producing request failed: {}&quot;, request.getReceiptNumber());
                // Something needs to be returned here.
                // Should I return NULL?
			}
			
		});

答案1

得分: 2

whenComplete 函数将在成功和失败情况下都被调用。whenComplete 的签名接受一个 throwable,用于表示在发生故障时将使用异常值调用此函数。

因此,您可能根本不需要 exceptionally 方法。

类似于以下内容:

future.whenComplete(new BiConsumer<SendResult<String,Object>, Throwable>() {
    @Override
    public void accept(SendResult<String, Object> result, Throwable u) {
        if (u != null) {
            logger.error(....)
        } else {
            ProducerRecord<String, Object> record = result.getProducerRecord();
            CaseStatusRequest data = (CaseStatusRequest) record.value();
            logger.info("Producing request succeeded: {}", data);   
        }
    }
});

替代实现

您可以利用Kafka异步发送方法,并具有回调来处理结果。这将需要对您的旧代码进行一些重构。

类似于这样:

producer.send(producerRecord, new Callback() {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        // 每当成功发送记录或抛出异常时都会执行
        if (e == null) {
            // 记录已成功发送
            log.info("Received new metadata. \n" +
                "Topic:" + recordMetadata.topic() + "\n" +
                "Partition: " + recordMetadata.partition() + "\n" +
                "Offset: " + recordMetadata.offset() + "\n" +
                "Timestamp: " + recordMetadata.timestamp());
        } else {
            log.error("Error while producing", e);
        }
    }
});
英文:

The whenComplete function will be invoked for both success and failure cases. The signature of whenComplete takes the throwable which denotes that in case of a failure, this function will be called with the exception value.

So you may not require the exceptionally method at all.

Something like

        future.whenComplete(new BiConsumer&lt;SendResult&lt;String,Object&gt;,Throwable&gt;() {

        @Override
        public void accept(SendResult&lt;String, Object&gt; result, Throwable u) {
            if (u != null) {
                logger.error(....)
            }
            else {
                ProducerRecord&lt;String, Object&gt; record = result.getProducerRecord();
                CaseStatusRequest data = (CaseStatusRequest) record.value();

                logger.info(&quot;Producing request succeeded: {}&quot;, data);   
           }   
      }
    });

Alternative implementation

You could leverage Kafka Async send method and have a callback to handle the results. This would require some more refactoring of your older code base.

Something like this

    producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // executes every time a record is successfully sent or an exception is thrown
                if (e == null) {
                    // the record was successfully sent
                    log.info(&quot;Received new metadata. \n&quot; +
                            &quot;Topic:&quot; + recordMetadata.topic() + &quot;\n&quot; +
                            &quot;Partition: &quot; + recordMetadata.partition() + &quot;\n&quot; +
                            &quot;Offset: &quot; + recordMetadata.offset() + &quot;\n&quot; +
                            &quot;Timestamp: &quot; + recordMetadata.timestamp());
                } else {
                    log.error(&quot;Error while producing&quot;, e);
                }
            }
        });

huangapple
  • 本文由 发表于 2023年3月1日 10:52:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/75599173.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定