英文:
RxGo. How can I stop an observable stream?
问题
我只会返回翻译好的部分,以下是翻译的结果:
我只是学习GO语言的初学者。
我熟悉RxJava、RxJs等。
在学习rxgo时,有一部分我不理解。
为什么即使发生错误,仍然打印出3呢?
为什么StopOnError选项没有生效?
import (
"context"
"errors"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func exam() {
observable := rxgo.Create([]rxgo.Producer{
func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Error(errors.New("unknown"))
next <- rxgo.Of(3)
},
}, rxgo.WithErrorStrategy(rxgo.StopOnError))
for v := range observable.Observe() {
fmt.Printf("CHECK %+v\n", v)
}
}
输出结果
CHECK {V:1 E:<nil>}
CHECK {V:2 E:<nil>}
CHECK {V:<nil> E:unknown}
CHECK {V:3 E:<nil>}
英文:
I'm just a beginner in learning the GO language.
I'm familiar with RxJava, RxJs, etc.
While studying rxgo, there is a part that I do not understand.
Why is 3 being printed even though an error has occurred?
Why didn't the StopOnError option apply?
import (
"context"
"errors"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func exam() {
observable := rxgo.Create([]rxgo.Producer{
func(ctx context.Context, next chan<- rxgo.Item) {
next <- rxgo.Of(1)
next <- rxgo.Of(2)
next <- rxgo.Error(errors.New("unknown"))
next <- rxgo.Of(3)
},
}, rxgo.WithErrorStrategy(rxgo.StopOnError))
for v := range observable.Observe() {
fmt.Printf("CHECK %+v\n", v)
}
}
OUTPUT
CHECK {V:1 E:<nil>}
CHECK {V:2 E:<nil>}
CHECK {V:<nil> E:unknown}
CHECK {V:3 E:<nil>}
答案1
得分: 1
看起来库希望客户端对Observable执行一些转换来触发策略。我尝试了Map
,但根据源代码,它嵌入在其他方法中。
这不是很直观,我也没有找到这种行为是否有文档记录。
import (
"context"
"errors"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
errs := rxgo.Just(
rxgo.Of(1),
errors.New("foo"),
rxgo.Of(2),
rxgo.Of(3),
)()
for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
fmt.Printf("CHECK %+v %v\n", v, v.Error())
}
errs = rxgo.Just(
rxgo.Of(1),
errors.New("foo"),
rxgo.Of(2),
rxgo.Of(3),
)().Map(func(ctx context.Context, i interface{}) (interface{}, error) {
return i, nil
})
for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
fmt.Printf("CHECK MAP %+v %v\n", v, v.Error())
}
}
实际上这可能是库中的一个bug。rxgo.StopOnError
是默认策略,有一个示例可以覆盖它:
https://github.com/ReactiveX/RxGo/blob/6d27cca9572a67c6c423d4d4d41afd8edf5b1d72/doc/errors.md
该示例的问题是将rxgo.Continue
更改为rxgo.StopOnError
并没有改变任何内容:
errs := rxgo.Just(
errors.New("foo"),
errors.New("bar"),
errors.New("baz"),
)().Errors(rxgo.WithErrorStrategy(rxgo.StopOnError))
fmt.Println(errs)
输出为:
[foo bar baz]
https://go.dev/play/p/swFwTo-q7c2
英文:
Looks like library expects client to perform some transformation on Observable to trigger strategy. I tried Map
but based on source code, it is embedded in other methods.
It is not very intuitive, and I did not find if that behavior is documented.
import (
"context"
"errors"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
errs := rxgo.Just(
rxgo.Of(1),
errors.New("foo"),
rxgo.Of(2),
rxgo.Of(3),
)()
for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
fmt.Printf("CHECK %+v %v\n", v, v.Error())
}
errs = rxgo.Just(
rxgo.Of(1),
errors.New("foo"),
rxgo.Of(2),
rxgo.Of(3),
)().Map(func(ctx context.Context, i interface{}) (interface{}, error) {
return i, nil
})
for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
fmt.Printf("CHECK MAP %+v %v\n", v, v.Error())
}
}
https://go.dev/play/p/n2Cr-p6pUC5
It is actually can be a bug in library. rxgo.StopOnError
is a default strategy and there is an example for overriding it:
https://github.com/ReactiveX/RxGo/blob/6d27cca9572a67c6c423d4d4d41afd8edf5b1d72/doc/errors.md
Problem with that example that changing rxgo.Continue
to rxgo.StopOnError
does not change anything:
errs := rxgo.Just(
errors.New("foo"),
errors.New("bar"),
errors.New("baz"),
)().Errors(rxgo.WithErrorStrategy(rxgo.StopOnError))
fmt.Println(errs)
prints
[foo bar baz]
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论