英文:
Is there any way to handle stream error without stop stream, when we await for another stream
问题
我想通过另一个流来监听某个流。
当我们使用依赖于另一个流的流时,我们需要使用await for
,然而我无法找到捕获另一个流错误并继续流数据的方法。
以下是我的程序。
Future<void> main() async {
// 我想通过stream2来监听stream1,并希望在不停止流的情况下处理错误。
stream2().listen(print, onError: print);
/*
* 结果:
*
* 0
* 1
* 2
* Exception: Error at 3
*
*/
// 如果直接监听stream1,我们可以处理错误并继续获取流。
// stream1.listen(print, onError: print);
/*
* 结果: 这是我想要的结果。
*
* 0
* 1
* 2
* Exception: Error at 3
* 4
* 5
*
*/
}
// 此流不可修改。
final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
if (i == 3) {
throw Exception("Error at $i");
}
return i;
}).take(5);
Stream<int> stream2() async* {
await for (final item in stream1) {
// 是否有一种方法可以在不停止stream2的情况下处理错误?
try {
yield item;
} catch (e) {
yield* Stream.error(e);
}
}
}
注意:上面的代码是您提供的代码的翻译,未做任何修改。如果您有任何进一步的问题或需要进一步的帮助,请随时提问。
英文:
I want to listen some stream via another stream.
When we use stream which uses another stream, we need to use await for,
however I can't find out the way to catch another stream's error and continue stream data.
Here is my program.
Future<void> main() async {
// I want to listen stream1 via stream2, and want to handle error without stop
// stream.
stream2().listen(print, onError:print);
/*
* Result:
*
* 0
* 1
* 2
* Exception: Error at 3
*
*/
// If we directly listen stream1, we can handling error and continue
// to get stream.
// stream1.listen(print, onError:print);
/*
* Result: this is the result i want.
*
* 0
* 1
* 2
* Exception: Error at 3
* 4
* 5
*
*/
}
// This Stream is not modifiable.
final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
if (i == 3) {
throw Exception("Error at $i");
}
return i;
}).take(5);
Stream<int> stream2() async* {
await for (final item in stream1) {
// Is there a way we handle error without stop stream2?
try {
yield item;
} catch (e) {
yield* Stream.error(e);
}
}
}
答案1
得分: 0
你可以在你的流上使用handleError方法。
Future<void> main() async {
stream2().listen(print, onError: print);
}
final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
if (i == 3) {
throw Exception("Error at $i");
}
return i;
}).take(5);
Stream<int> stream2() async* {
await for (final item in stream1.handleError(print)) {
yield item;
}
}
输出:
0
1
2
Exception: Error at 3
4
5
如果你需要在错误处理部分内部使用yield
或yield*
,那么.handleError
是行不通的,因为你不能在另一个函数内部从外部函数中使用yield
。
我认为await-for
与try-catch
也不会起作用。try
需要位于await-for
循环的外部才能捕获错误,但一旦捕获到错误,你就不再在循环内部了。
你可以使用async库中的StreamQueue来解决这个问题,你需要将它作为依赖项添加到你的pubspec.yaml
中。
import 'package:async/async.dart';
...
Stream<int> stream2() async* {
StreamQueue<int> queue = StreamQueue<int>(stream1);
while (await queue.hasNext) {
try {
final item = await queue.next;
yield item;
} catch (e) {
print(e);
// 可以在这里使用yield或yield*
}
}
}
英文:
You can use the handleError method on your stream.
Future<void> main() async {
stream2().listen(print, onError: print);
}
final stream1 = Stream<int>.periodic(Duration(seconds: 1), (i) {
if (i == 3) {
throw Exception("Error at $i");
}
return i;
}).take(5);
Stream<int> stream2() async* {
await for (final item in stream1.handleError(print)) {
yield item;
}
}
Output:
0
1
2
Exception: Error at 3
4
5
If you need to use yield
or yield*
inside the error handling part, then .handleError
isn't going to work, since you can't yield from the outer function from within another function.
I don't think await-for
with try-catch
will work either. The try
needs to be on the outside of the await-for
loop in order to catch the error, but then you are no longer in the loop once the error is caught.
One thing you could do is use a StreamQueue from async library, which you will have to add to your pubspec.yaml
as a dependency.
import 'package:async/async.dart';
...
Stream<int> stream2() async* {
StreamQueue<int> queue = StreamQueue<int>(stream1);
while (await queue.hasNext) {
try {
final item = await queue.next;
yield item;
} catch (e) {
print(e);
// can do a yield or yield* here
}
}
}
答案2
得分: 0
以下是翻译好的部分:
"Stream without external packages also worked. I'll leave the code for reference.
Stream<int> stream2() async* {
final store = StreamTemporaryStore<int>(stream1);
while (store.hasNext) {
final val = await store.next();
if (val != null) {
if (val.error == null) {
yield val.data!;
} else {
yield* Stream.error(val.error!);
}
}
}
}
class StreamTemporaryStore<T> {
StreamTemporaryStore(this.stream) {
stream.listen((T data) {
list = [(data: data, error: null), ...list];
}, onError: (Object error) {
list = [(data: null, error: error), ...list];
}, onDone: () {
hasNext = false;
});
}
final Stream<T> stream;
List<({T? data, Object? error})> list = [];
bool hasNext = true;
Future<({T? data, Object? error})?> next() async {
await Future(() {});
if (list.isEmpty) return null;
final head = list[list.length - 1];
list.removeAt(list.length - 1);
return head;
}
}
"
英文:
The method without external packages also worked. I'll leave the code for reference.
Stream<int> stream2() async* {
final store = StreamTemporaryStore<int>(stream1);
while (store.hasNext) {
final val = await store.next();
if (val != null) {
if (val.error == null) {
yield val.data!;
} else {
yield* Stream.error(val.error!);
}
}
}
}
class StreamTemporaryStore<T> {
StreamTemporaryStore(this.stream) {
stream.listen((T data) {
list = [(data: data, error: null), ...list];
}, onError: (Object error) {
list = [(data: null, error: error), ...list];
}, onDone: () {
hasNext = false;
});
}
final Stream<T> stream;
List<({T? data, Object? error})> list = [];
bool hasNext = true;
Future<({T? data, Object? error})?> next() async {
await Future(() {});
if (list.isEmpty) return null;
final head = list[list.length - 1];
list.removeAt(list.length - 1);
return head;
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论