RxGo. How can I stop an observable stream?

huangapple go评论61阅读模式
英文:

RxGo. How can I stop an observable stream?

问题

我只会返回翻译好的部分,以下是翻译的结果:

我只是学习GO语言的初学者。

我熟悉RxJava、RxJs等。

在学习rxgo时,有一部分我不理解。

为什么即使发生错误,仍然打印出3呢?
为什么StopOnError选项没有生效?

import (
	"context"
	"errors"
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func exam() { 
	observable := rxgo.Create([]rxgo.Producer{
		func(ctx context.Context, next chan<- rxgo.Item) {
			next <- rxgo.Of(1)
			next <- rxgo.Of(2)
			next <- rxgo.Error(errors.New("unknown"))
			next <- rxgo.Of(3)
		},
	}, rxgo.WithErrorStrategy(rxgo.StopOnError))

	for v := range observable.Observe() {
		fmt.Printf("CHECK %+v\n", v)
	}
}

输出结果

CHECK {V:1 E:<nil>}
CHECK {V:2 E:<nil>}
CHECK {V:<nil> E:unknown}
CHECK {V:3 E:<nil>}
英文:

I'm just a beginner in learning the GO language.

I'm familiar with RxJava, RxJs, etc.

While studying rxgo, there is a part that I do not understand.

Why is 3 being printed even though an error has occurred?
Why didn't the StopOnError option apply?

import (
	&quot;context&quot;
	&quot;errors&quot;
	&quot;fmt&quot;

	&quot;github.com/reactivex/rxgo/v2&quot;
)

func exam() { 
	observable := rxgo.Create([]rxgo.Producer{
		func(ctx context.Context, next chan&lt;- rxgo.Item) {
			next &lt;- rxgo.Of(1)
			next &lt;- rxgo.Of(2)
			next &lt;- rxgo.Error(errors.New(&quot;unknown&quot;))
			next &lt;- rxgo.Of(3)
		},
	}, rxgo.WithErrorStrategy(rxgo.StopOnError))

	for v := range observable.Observe() {
		fmt.Printf(&quot;CHECK %+v\n&quot;, v)
	}
}

OUTPUT

CHECK {V:1 E:&lt;nil&gt;}
CHECK {V:2 E:&lt;nil&gt;}
CHECK {V:&lt;nil&gt; E:unknown}
CHECK {V:3 E:&lt;nil&gt;}

答案1

得分: 1

看起来库希望客户端对Observable执行一些转换来触发策略。我尝试了Map,但根据源代码,它嵌入在其他方法中。

这不是很直观,我也没有找到这种行为是否有文档记录。

import (
	"context"
	"errors"
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	errs := rxgo.Just(
		rxgo.Of(1),
		errors.New("foo"),
		rxgo.Of(2),
		rxgo.Of(3),
	)()
	for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
		fmt.Printf("CHECK %+v %v\n", v, v.Error())
	}

	errs = rxgo.Just(
		rxgo.Of(1),
		errors.New("foo"),
		rxgo.Of(2),
		rxgo.Of(3),
	)().Map(func(ctx context.Context, i interface{}) (interface{}, error) {
		return i, nil
	})
	for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
		fmt.Printf("CHECK MAP %+v %v\n", v, v.Error())
	}
}

实际上这可能是库中的一个bug。rxgo.StopOnError是默认策略,有一个示例可以覆盖它:

https://github.com/ReactiveX/RxGo/blob/6d27cca9572a67c6c423d4d4d41afd8edf5b1d72/doc/errors.md

该示例的问题是将rxgo.Continue更改为rxgo.StopOnError并没有改变任何内容:

errs := rxgo.Just(
	errors.New("foo"),
	errors.New("bar"),
	errors.New("baz"),
)().Errors(rxgo.WithErrorStrategy(rxgo.StopOnError))
fmt.Println(errs)

输出为:

[foo bar baz]

https://go.dev/play/p/swFwTo-q7c2

英文:

Looks like library expects client to perform some transformation on Observable to trigger strategy. I tried Map but based on source code, it is embedded in other methods.

It is not very intuitive, and I did not find if that behavior is documented.

import (
	&quot;context&quot;
	&quot;errors&quot;
	&quot;fmt&quot;

	&quot;github.com/reactivex/rxgo/v2&quot;
)

func main() {
	errs := rxgo.Just(
		rxgo.Of(1),
		errors.New(&quot;foo&quot;),
		rxgo.Of(2),
		rxgo.Of(3),
	)()
	for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
		fmt.Printf(&quot;CHECK %+v %v\n&quot;, v, v.Error())
	}

	errs = rxgo.Just(
		rxgo.Of(1),
		errors.New(&quot;foo&quot;),
		rxgo.Of(2),
		rxgo.Of(3),
	)().Map(func(ctx context.Context, i interface{}) (interface{}, error) {
		return i, nil
	})
	for v := range errs.Observe(rxgo.WithErrorStrategy(rxgo.StopOnError)) {
		fmt.Printf(&quot;CHECK MAP %+v %v\n&quot;, v, v.Error())
	}
}

https://go.dev/play/p/n2Cr-p6pUC5

It is actually can be a bug in library. rxgo.StopOnError is a default strategy and there is an example for overriding it:

https://github.com/ReactiveX/RxGo/blob/6d27cca9572a67c6c423d4d4d41afd8edf5b1d72/doc/errors.md

Problem with that example that changing rxgo.Continue to rxgo.StopOnError does not change anything:


	errs := rxgo.Just(
		errors.New(&quot;foo&quot;),
		errors.New(&quot;bar&quot;),
		errors.New(&quot;baz&quot;),
	)().Errors(rxgo.WithErrorStrategy(rxgo.StopOnError))
	fmt.Println(errs)

prints

[foo bar baz]

https://go.dev/play/p/swFwTo-q7c2

huangapple
  • 本文由 发表于 2022年6月19日 06:15:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/72673118.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定