有没有办法在等待另一个流时处理流错误而不停止流?

huangapple go评论128阅读模式
英文:

Is there any way to handle stream error without stop stream, when we await for another stream

问题

我想通过另一个流来监听某个流。
当我们使用依赖于另一个流的流时,我们需要使用await for,然而我无法找到捕获另一个流错误并继续流数据的方法。

以下是我的程序。

Future<void> main() async {
  // 我想通过stream2来监听stream1,并希望在不停止流的情况下处理错误。
  stream2().listen(print, onError: print);
  /* 
   * 结果:
   * 
   * 0
   * 1
   * 2
   * Exception: Error at 3
   * 
   */
  
  
  // 如果直接监听stream1,我们可以处理错误并继续获取流。
  // stream1.listen(print, onError: print);
  /* 
   * 结果: 这是我想要的结果。
   * 
   * 0
   * 1
   * 2
   * Exception: Error at 3
   * 4
   * 5
   * 
   */
}

// 此流不可修改。
final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
  if (i == 3) {
    throw Exception("Error at $i");
  }
  return i;
}).take(5);


Stream<int> stream2() async* {
  await for (final item in stream1) {
    // 是否有一种方法可以在不停止stream2的情况下处理错误?
    try {
      yield item;
    } catch (e) {
      yield* Stream.error(e);
    }
  }
}

注意:上面的代码是您提供的代码的翻译,未做任何修改。如果您有任何进一步的问题或需要进一步的帮助,请随时提问。

英文:

I want to listen some stream via another stream.
When we use stream which uses another stream, we need to use await for,
however I can't find out the way to catch another stream's error and continue stream data.

Here is my program.

Future&lt;void&gt; main() async {
  // I want to listen stream1 via stream2, and want to handle error without stop
  // stream.
  stream2().listen(print, onError:print);
  /* 
   * Result:
   * 
   * 0
   * 1
   * 2
   * Exception: Error at 3
   * 
   */
  
  
  // If we directly listen stream1, we can handling error and continue
  // to get stream.
  // stream1.listen(print, onError:print);
  /* 
   * Result: this is the result i want.
   * 
   * 0
   * 1
   * 2
   * Exception: Error at 3
   * 4
   * 5
   * 
   */
}

// This Stream is not modifiable.
final stream1 = Stream&lt;int&gt;.periodic(Duration(seconds: 1), (i) {
  if (i == 3) {
    throw Exception(&quot;Error at $i&quot;);
  }
  return i;
}).take(5);


Stream&lt;int&gt; stream2() async* {
  await for (final item in stream1) {
    // Is there a way we handle error without stop stream2?
    try {
      yield item;
    } catch (e) {
      yield* Stream.error(e);
    }
  }
}

答案1

得分: 0

你可以在你的流上使用handleError方法。

Future<void> main() async {
  stream2().listen(print, onError: print);
}

final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
  if (i == 3) {
    throw Exception("Error at $i");
  }
  return i;
}).take(5);

Stream<int> stream2() async* {
  await for (final item in stream1.handleError(print)) {
    yield item;
  }
}

输出:

0
1
2
Exception: Error at 3
4
5

如果你需要在错误处理部分内部使用yieldyield*,那么.handleError是行不通的,因为你不能在另一个函数内部从外部函数中使用yield

我认为await-fortry-catch也不会起作用。try需要位于await-for循环的外部才能捕获错误,但一旦捕获到错误,你就不再在循环内部了。

你可以使用async库中的StreamQueue来解决这个问题,你需要将它作为依赖项添加到你的pubspec.yaml中。

import 'package:async/async.dart';

...

Stream<int> stream2() async* {
  StreamQueue<int> queue = StreamQueue<int>(stream1);
  while (await queue.hasNext) {
    try {
      final item = await queue.next;
      yield item;
    } catch (e) {
      print(e);
      // 可以在这里使用yield或yield*
    }
  }
}
英文:

You can use the handleError method on your stream.

Future&lt;void&gt; main() async {
  stream2().listen(print, onError: print);
}

final stream1 = Stream&lt;int&gt;.periodic(Duration(seconds: 1), (i) {
  if (i == 3) {
    throw Exception(&quot;Error at $i&quot;);
  }
  return i;
}).take(5);

Stream&lt;int&gt; stream2() async* {
  await for (final item in stream1.handleError(print)) {
    yield item;
  }
}

Output:

0
1
2
Exception: Error at 3
4
5

If you need to use yield or yield* inside the error handling part, then .handleError isn't going to work, since you can't yield from the outer function from within another function.

I don't think await-for with try-catch will work either. The try needs to be on the outside of the await-for loop in order to catch the error, but then you are no longer in the loop once the error is caught.

One thing you could do is use a StreamQueue from async library, which you will have to add to your pubspec.yaml as a dependency.

import &#39;package:async/async.dart&#39;;

...

Stream&lt;int&gt; stream2() async* {
  StreamQueue&lt;int&gt; queue = StreamQueue&lt;int&gt;(stream1);
  while (await queue.hasNext) {
    try {
      final item = await queue.next;
      yield item;
    } catch (e) {
      print(e);
      // can do a yield or yield* here
    }
  }
}

答案2

得分: 0

以下是翻译好的部分:

"Stream without external packages also worked. I'll leave the code for reference.

Stream<int> stream2() async* {
  final store = StreamTemporaryStore<int>(stream1);
  while (store.hasNext) {
    final val = await store.next();
    if (val != null) {
      if (val.error == null) {
        yield val.data!;
      } else {
        yield* Stream.error(val.error!);
      }
    }
  }
}

class StreamTemporaryStore<T> {
  StreamTemporaryStore(this.stream) {
    stream.listen((T data) {
      list = [(data: data, error: null), ...list];
    }, onError: (Object error) {
      list = [(data: null, error: error), ...list];
    }, onDone: () {
      hasNext = false;
    });
  }
  final Stream<T> stream;
  List<({T? data, Object? error})> list = [];
  bool hasNext = true;

  Future<({T? data, Object? error})?> next() async {
    await Future(() {});
    if (list.isEmpty) return null;
    final head = list[list.length - 1];
    list.removeAt(list.length - 1);
    return head;
  }
}

"

英文:

The method without external packages also worked. I'll leave the code for reference.

Stream&lt;int&gt; stream2() async* {
  final store = StreamTemporaryStore&lt;int&gt;(stream1);
  while (store.hasNext) {
    final val = await store.next();
    if (val != null) {
      if (val.error == null) {
        yield val.data!;
      } else {
        yield* Stream.error(val.error!);
      }
    }
  }
}

class StreamTemporaryStore&lt;T&gt; {
  StreamTemporaryStore(this.stream) {
    stream.listen((T data) {
      list = [(data: data, error: null), ...list];
    }, onError: (Object error) {
      list = [(data: null, error: error), ...list];
    }, onDone: () {
      hasNext = false;
    });
  }
  final Stream&lt;T&gt; stream;
  List&lt;({T? data, Object? error})&gt; list = [];
  bool hasNext = true;

  Future&lt;({T? data, Object? error})?&gt; next() async {
    await Future(() {});
    if (list.isEmpty) return null;
    final head = list[list.length - 1];
    list.removeAt(list.length - 1);
    return head;
  }
}

huangapple
  • 本文由 发表于 2023年8月9日 09:07:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/76863964-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定