无法从响应式方法传播错误。

huangapple go评论69阅读模式
英文:

Unable to propagate error from a reactive method

问题

我有以下方法,在其中的一个步骤中,我调用了throwACheckedExceptionMethod()方法。

那个方法有可能抛出一个已检查异常。现在我不想在这里处理它。我希望订阅它的方法来处理它。

我该如何做呢?因为现在这个代码无法编译,因为我没有在这里处理错误。

添加doOnError()或在方法级别抛出错误也没有帮助。我似乎在RX Java方面漏了点什么。

请帮我解决这个问题,谢谢。

以下是一个可能的选项,但我不想在这里处理错误,如前所述。

private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {

    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> {
                        try {
                            return throwACheckedExceptionMethod(connection, record);
                        } catch (Exception throwables) {
                            throwables.printStackTrace();
                        }
                    })
                            .toCompletable())
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}
英文:

I have the following method where in one of the steps, I am calling the throwACheckedExceptionMethod() method.

That method has a chance to throw a checked exception. Now I do not want to handle it here. I want the method which subscribes to it
to handle it.

How can I do that? Cos right now this does not compile cos I am not handling the error here.
Adding doOnError() or throwing the error at the method level doesn't help either. I seem to be missing something about RX Java here.

Could I get some help with this please? Thank you.

private Observable&lt;Void&gt; myMethod(Observable&lt;KafkaConsumerRecord&lt;String, String&gt;&gt; records) throws Exception /*Doesn&#39;t make a diff*/ {
    return client.rxGetConnection()
            .flatMapCompletable(connection -&gt; connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -&gt; throwACheckedExceptionMethod(connection, record)).toCompletable()) // line in question
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}

private Single&lt;UpdateResult&gt; throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord&lt;String, String&gt; record) throws Exception {
    // some operation which may return a Single. 
    throw new Exception();
}

The following is a possible option but I do not want to handle the error here as mentioned.

private Observable&lt;Void&gt; processTransaction(Observable&lt;KafkaConsumerRecord&lt;String, String&gt;&gt; records) {

    return client.rxGetConnection()
            .flatMapCompletable(connection -&gt; connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -&gt; {
                        try {
                            return throwACheckedExceptionMethod(connection, record);
                        } catch (Exception throwables) {
                            throwables.printStackTrace();
                        }
                    })
                            .toCompletable())
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}

答案1

得分: 1

The throwACheckedExceptionMethod method should not throw an exception, because throwing exceptions breaks the observable chain.

Instead, it should return an observable that emits the error, Single.error:

private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
    // some operation which may return an UpdateResult 
    return Single.error(new Exception());
}

Now the exception gets propagated to the subscriber's onError method.

英文:

The throwACheckedExceptionMethod method should not throw an exception, because throwing exceptions breaks the observable chain.

Instead, it should return an observable that emits the error, Single.error:

private Single&lt;UpdateResult&gt; throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord&lt;String, String&gt; record) throws Exception {
    // some operation which may return a UpdateResult 
    return Single.error(new Exception());
}

Now the exception gets propagated to the subscriber's onError method.

huangapple
  • 本文由 发表于 2020年8月12日 23:06:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/63379411.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定