有没有办法在等待另一个流时处理流错误而不停止流?

huangapple go评论106阅读模式
英文:

Is there any way to handle stream error without stop stream, when we await for another stream

问题

我想通过另一个流来监听一些流。当我们使用使用另一个流的流时,我们需要使用await关键字,但我找不到捕获另一个流的错误并继续流数据的方法。

这是我的程序。

  1. Future<void> main() async {
  2. // 我想通过stream2来监听stream1,并希望在不停止流的情况下处理错误。
  3. stream2().listen(print, onError: print);
  4. /*
  5. * 结果:
  6. *
  7. * 0
  8. * 1
  9. * 2
  10. * Exception: Error at 3
  11. *
  12. */
  13. // 如果直接监听stream1,我们可以处理错误并继续获取流。
  14. // stream1.listen(print, onError: print);
  15. /*
  16. * 结果: 这是我想要的结果。
  17. *
  18. * 0
  19. * 1
  20. * 2
  21. * Exception: Error at 3
  22. * 4
  23. * 5
  24. *
  25. */
  26. }
  27. // 这个流是不可修改的。
  28. final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
  29. if (i == 3) {
  30. throw Exception("Error at $i");
  31. }
  32. return i;
  33. }).take(5);
  34. Stream<int> stream2() async* {
  35. await for (final item in stream1) {
  36. // 有没有一种方法可以在不停止stream2的情况下处理错误?
  37. try {
  38. yield item;
  39. } catch (e) {
  40. yield* Stream.error(e);
  41. }
  42. }
  43. }
英文:

I want to listen some stream via another stream.
When we use stream which uses another stream, we need to use await for,
however I can't find out the way to catch another stream's error and continue stream data.

Here is my program.

  1. Future&lt;void&gt; main() async {
  2. // I want to listen stream1 via stream2, and want to handle error without stop
  3. // stream.
  4. stream2().listen(print, onError:print);
  5. /*
  6. * Result:
  7. *
  8. * 0
  9. * 1
  10. * 2
  11. * Exception: Error at 3
  12. *
  13. */
  14. // If we directly listen stream1, we can handling error and continue
  15. // to get stream.
  16. // stream1.listen(print, onError:print);
  17. /*
  18. * Result: this is the result i want.
  19. *
  20. * 0
  21. * 1
  22. * 2
  23. * Exception: Error at 3
  24. * 4
  25. * 5
  26. *
  27. */
  28. }
  29. // This Stream is not modifiable.
  30. final stream1 = Stream&lt;int&gt;.periodic(Duration(seconds: 1), (i) {
  31. if (i == 3) {
  32. throw Exception(&quot;Error at $i&quot;);
  33. }
  34. return i;
  35. }).take(5);
  36. Stream&lt;int&gt; stream2() async* {
  37. await for (final item in stream1) {
  38. // Is there a way we handle error without stop stream2?
  39. try {
  40. yield item;
  41. } catch (e) {
  42. yield* Stream.error(e);
  43. }
  44. }
  45. }

答案1

得分: 0

你可以在流上使用handleError方法。

  1. Future<void> main() async {
  2. stream2().listen(print, onError: print);
  3. }
  4. final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
  5. if (i == 3) {
  6. throw Exception("Error at $i");
  7. }
  8. return i;
  9. }).take(5);
  10. Stream<int> stream2() async* {
  11. await for (final item in stream1.handleError(print)) {
  12. yield item;
  13. }
  14. }

输出:

  1. 0
  2. 1
  3. 2
  4. Exception: Error at 3
  5. 4
  6. 5

如果你需要在错误处理部分使用yieldyield*,那么.handleError方法将无法工作,因为你无法在另一个函数内部从外部函数中使用yield

我认为await-fortry-catch也不会起作用。try需要在await-for循环的外部才能捕获错误,但一旦捕获到错误,你就不再在循环中了。

你可以使用async库中的StreamQueue,你需要将其添加到pubspec.yaml文件中作为依赖项。

  1. import 'package:async/async.dart';
  2. ...
  3. Stream<int> stream2() async* {
  4. StreamQueue<int> queue = StreamQueue<int>(stream1);
  5. while (await queue.hasNext) {
  6. try {
  7. final item = await queue.next;
  8. yield item;
  9. } catch (e) {
  10. print(e);
  11. // 在这里可以使用yield或yield*
  12. }
  13. }
  14. }
英文:

You can use the handleError method on your stream.

  1. Future&lt;void&gt; main() async {
  2. stream2().listen(print, onError: print);
  3. }
  4. final stream1 = Stream&lt;int&gt;.periodic(Duration(seconds: 1), (i) {
  5. if (i == 3) {
  6. throw Exception(&quot;Error at $i&quot;);
  7. }
  8. return i;
  9. }).take(5);
  10. Stream&lt;int&gt; stream2() async* {
  11. await for (final item in stream1.handleError(print)) {
  12. yield item;
  13. }
  14. }

Output:

  1. 0
  2. 1
  3. 2
  4. Exception: Error at 3
  5. 4
  6. 5

If you need to use yield or yield* inside the error handling part, then .handleError isn't going to work, since you can't yield from the outer function from within another function.

I don't think await-for with try-catch will work either. The try needs to be on the outside of the await-for loop in order to catch the error, but then you are no longer in the loop once the error is caught.

One thing you could do is use a StreamQueue from async library, which you will have to add to your pubspec.yaml as a dependency.

  1. import &#39;package:async/async.dart&#39;;
  2. ...
  3. Stream&lt;int&gt; stream2() async* {
  4. StreamQueue&lt;int&gt; queue = StreamQueue&lt;int&gt;(stream1);
  5. while (await queue.hasNext) {
  6. try {
  7. final item = await queue.next;
  8. yield item;
  9. } catch (e) {
  10. print(e);
  11. // can do a yield or yield* here
  12. }
  13. }
  14. }

答案2

得分: 0

不带外部包的方法也起作用。我会留下代码供参考。

  1. Stream<int> stream2() async* {
  2. final store = StreamTemporaryStore<int>(stream1);
  3. while (store.hasNext) {
  4. final val = await store.next();
  5. if (val != null) {
  6. if (val.error == null) {
  7. yield val.data!;
  8. } else {
  9. yield* Stream.error(val.error!);
  10. }
  11. }
  12. }
  13. }
  14. class StreamTemporaryStore<T> {
  15. StreamTemporaryStore(this.stream) {
  16. stream.listen((T data) {
  17. list = [(data: data, error: null), ...list];
  18. }, onError: (Object error) {
  19. list = [(data: null, error: error), ...list];
  20. }, onDone: () {
  21. hasNext = false;
  22. });
  23. }
  24. final Stream<T> stream;
  25. List<({T? data, Object? error})> list = [];
  26. bool hasNext = true;
  27. Future<({T? data, Object? error})?> next() async {
  28. await Future(() {});
  29. if (list.isEmpty) return null;
  30. final head = list[list.length - 1];
  31. list.removeAt(list.length - 1);
  32. return head;
  33. }
  34. }

希望对你有帮助!

英文:

The method without external packages also worked. I'll leave the code for reference.

  1. Stream&lt;int&gt; stream2() async* {
  2. final store = StreamTemporaryStore&lt;int&gt;(stream1);
  3. while (store.hasNext) {
  4. final val = await store.next();
  5. if (val != null) {
  6. if (val.error == null) {
  7. yield val.data!;
  8. } else {
  9. yield* Stream.error(val.error!);
  10. }
  11. }
  12. }
  13. }
  14. class StreamTemporaryStore&lt;T&gt; {
  15. StreamTemporaryStore(this.stream) {
  16. stream.listen((T data) {
  17. list = [(data: data, error: null), ...list];
  18. }, onError: (Object error) {
  19. list = [(data: null, error: error), ...list];
  20. }, onDone: () {
  21. hasNext = false;
  22. });
  23. }
  24. final Stream&lt;T&gt; stream;
  25. List&lt;({T? data, Object? error})&gt; list = [];
  26. bool hasNext = true;
  27. Future&lt;({T? data, Object? error})?&gt; next() async {
  28. await Future(() {});
  29. if (list.isEmpty) return null;
  30. final head = list[list.length - 1];
  31. list.removeAt(list.length - 1);
  32. return head;
  33. }
  34. }

huangapple
  • 本文由 发表于 2023年8月9日 09:07:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/76863964.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定