英文:
Unable to propagate error from a reactive method
问题
我有以下方法,在其中的一个步骤中,我调用了throwACheckedExceptionMethod()方法。
那个方法有可能抛出一个已检查异常。现在我不想在这里处理它。我希望订阅它的方法来处理它。
我该如何做呢?因为现在这个代码无法编译,因为我没有在这里处理错误。
添加doOnError()或在方法级别抛出错误也没有帮助。我似乎在RX Java方面漏了点什么。
请帮我解决这个问题,谢谢。
以下是一个可能的选项,但我不想在这里处理错误,如前所述。
private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {
    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> {
                        try {
                            return throwACheckedExceptionMethod(connection, record);
                        } catch (Exception throwables) {
                            throwables.printStackTrace();
                        }
                    })
                            .toCompletable())
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}
英文:
I have the following method where in one of the steps, I am calling the throwACheckedExceptionMethod() method.
That method has a chance to throw a checked exception. Now I do not want to handle it here. I want the method which subscribes to it
to handle it.
How can I do that? Cos right now this does not compile cos I am not handling the error here.
Adding doOnError() or throwing the error at the method level doesn't help either. I seem to be missing something about RX Java here.
Could I get some help with this please? Thank you.
private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String, String>> records) throws Exception /*Doesn't make a diff*/ {
    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection, record)).toCompletable()) // line in question
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
    // some operation which may return a Single. 
    throw new Exception();
}
The following is a possible option but I do not want to handle the error here as mentioned.
private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {
    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> {
                        try {
                            return throwACheckedExceptionMethod(connection, record);
                        } catch (Exception throwables) {
                            throwables.printStackTrace();
                        }
                    })
                            .toCompletable())
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}
答案1
得分: 1
The throwACheckedExceptionMethod method should not throw an exception, because throwing exceptions breaks the observable chain.
Instead, it should return an observable that emits the error, Single.error:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
    // some operation which may return an UpdateResult 
    return Single.error(new Exception());
}
Now the exception gets propagated to the subscriber's onError method.
英文:
The throwACheckedExceptionMethod method should not throw an exception, because throwing exceptions breaks the observable chain.
Instead, it should return an observable that emits the error, Single.error:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
    // some operation which may return a UpdateResult 
    return Single.error(new Exception());
}
Now the exception gets propagated to the subscriber's onError method.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论