英文:
Unable to propagate error from a reactive method
问题
我有以下方法,在其中的一个步骤中,我调用了throwACheckedExceptionMethod()
方法。
那个方法有可能抛出一个已检查异常。现在我不想在这里处理它。我希望订阅它的方法来处理它。
我该如何做呢?因为现在这个代码无法编译,因为我没有在这里处理错误。
添加doOnError()
或在方法级别抛出错误也没有帮助。我似乎在RX Java方面漏了点什么。
请帮我解决这个问题,谢谢。
以下是一个可能的选项,但我不想在这里处理错误,如前所述。
private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> {
try {
return throwACheckedExceptionMethod(connection, record);
} catch (Exception throwables) {
throwables.printStackTrace();
}
})
.toCompletable())
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
英文:
I have the following method where in one of the steps, I am calling the throwACheckedExceptionMethod() method.
That method has a chance to throw a checked exception. Now I do not want to handle it here. I want the method which subscribes to it
to handle it.
How can I do that? Cos right now this does not compile cos I am not handling the error here.
Adding doOnError() or throwing the error at the method level doesn't help either. I seem to be missing something about RX Java here.
Could I get some help with this please? Thank you.
private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String, String>> records) throws Exception /*Doesn't make a diff*/ {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection, record)).toCompletable()) // line in question
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return a Single.
throw new Exception();
}
The following is a possible option but I do not want to handle the error here as mentioned.
private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> {
try {
return throwACheckedExceptionMethod(connection, record);
} catch (Exception throwables) {
throwables.printStackTrace();
}
})
.toCompletable())
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
答案1
得分: 1
The throwACheckedExceptionMethod
method should not throw an exception, because throwing exceptions breaks the observable chain.
Instead, it should return an observable that emits the error, Single.error
:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return an UpdateResult
return Single.error(new Exception());
}
Now the exception gets propagated to the subscriber's onError
method.
英文:
The throwACheckedExceptionMethod
method should not throw an exception, because throwing exceptions breaks the observable chain.
Instead, it should return an observable that emits the error, Single.error
:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return a UpdateResult
return Single.error(new Exception());
}
Now the exception gets propagated to the subscriber's onError
method.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论