英文:
Reactor Flux: how to parse file with header
问题
最近我开始使用 Project Reactor 3.3,但我不知道如何处理包含首行作为列名的行通量(flux),然后使用这些列名来处理/转换所有其他行。目前我是这样做的:
Flux<String> lines = ....;
Mono<String[]> columns = Mono.from(lines.take(1).map(header -> header.split(";"))); // 获取第一行
Flux<SomeDto> objectFlux = lines.skip(1) // 跳过第一行
.flatMapIterable(row -> // 遍历行
columns.map(cols -> convert(cols, row))); // 将行转换为 SomeDto 对象
所以这样做对吗?
英文:
Recently I started using project reactor 3.3 and I don't know what is the best way to handle flux of lines, with first line as column names, then use those column names to process/convert all other lines. Right now I'm doing this way:
Flux<String> lines = ....;
Mono<String[]> columns = Mono.from(lines.take(1).map(header -> header.split(";"))); //getting first line
Flux<SomeDto> objectFlux = lines.skip(1) //skip first line
.flatMapIterable(row -> //iterating over lines
columns.map(cols -> convert(cols, row))); //convert line into SomeDto object
So is it the right way?
答案1
得分: 2
> 这样做对吗?
煮鸡蛋有多种方法,但你提供的代码似乎有两个主要问题,可能不太正常或者不够优化:
- 我猜你想要提取每个记录或者数据传输对象(DTO),所以你使用
flatMapIterable()
而不是flatMap()
有些奇怪。 - 当重新评估那个
Mono
时,你会为每一行重新订阅lines
,这几乎肯定不是你想做的。(缓存这个Mono
会有所帮助,但你至少还是会重新订阅两次。)
相反,你可能想要考虑使用 switchOnFirst()
,它可以让你根据第一个元素(在这种情况下是头部)动态地转换 Flux
。这意味着你可以像这样做:
lines
.switchOnFirst((signal, flux) -> flux.zipWith(Flux.just(signal.get().split(";")).repeat()))
.map(row -> convert(row.getT1(), row.getT2()))
请注意,这只是一个简单的例子,在实际使用中你需要根据文档检查 signal
是否真的有值:
> 请注意,源可能会立即完成或出现错误,而不是发出 onNext 信号,必须相应地进行检查。
英文:
> So is it the right way?
There's always more than one way to cook an egg - but the code you have there seems odd / suboptimal for two main reasons:
- I'd assume it's one line per record / DTO you want to extract, so it's a bit odd you're using
flatMapIterable()
rather thanflatMap()
- You're going to resubscribe to
lines
once for each line, when you re-evaluate thatMono
. That's almost certainly not what you want to do. (Caching theMono
helps, but you'd still resubscribe at least twice.)
Instead you may want to look at using switchOnFirst()
, which will enable you to dynamically transform the Flux
based on the first element (the header in your case.) This means you can do something like so:
lines
.switchOnFirst((signal, flux) -> flux.zipWith(Flux.<String[]>just(signal.get().split(";")).repeat()))
.map(row -> convert(row.getT1(), row.getT2()))
Note this is a bear-bones example, in real-world use you'll need to check whether the signal
actually has a value as per the docs:
> Note that the source might complete or error immediately instead of emitting, in which case the Signal would be onComplete or onError. It is NOT necessarily an onNext Signal, and must be checked accordingly.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论