反应堆流量:如何解析带有标题的文件

huangapple go评论86阅读模式
英文:

Reactor Flux: how to parse file with header

问题

最近我开始使用 Project Reactor 3.3,但我不知道如何处理包含首行作为列名的行通量(flux),然后使用这些列名来处理/转换所有其他行。目前我是这样做的:

Flux<String> lines = ....;
Mono<String[]> columns = Mono.from(lines.take(1).map(header -> header.split(";"))); // 获取第一行
Flux<SomeDto> objectFlux = lines.skip(1) // 跳过第一行
  .flatMapIterable(row -> // 遍历行
      columns.map(cols -> convert(cols, row)));  // 将行转换为 SomeDto 对象

所以这样做对吗?

英文:

Recently I started using project reactor 3.3 and I don't know what is the best way to handle flux of lines, with first line as column names, then use those column names to process/convert all other lines. Right now I'm doing this way:

Flux&lt;String&gt; lines = ....;
Mono&lt;String[]&gt; columns = Mono.from(lines.take(1).map(header -&gt; header.split(&quot;;&quot;))); //getting first line
Flux&lt;SomeDto&gt; objectFlux = lines.skip(1) //skip first line
  .flatMapIterable(row -&gt; //iterating over lines
      columns.map(cols -&gt; convert(cols, row)));  //convert line into SomeDto object

So is it the right way?

答案1

得分: 2

> 这样做对吗?

煮鸡蛋有多种方法,但你提供的代码似乎有两个主要问题,可能不太正常或者不够优化:

  • 我猜你想要提取每个记录或者数据传输对象(DTO),所以你使用 flatMapIterable() 而不是 flatMap() 有些奇怪。
  • 当重新评估那个 Mono 时,你会为每一行重新订阅 lines,这几乎肯定不是你想做的。(缓存这个 Mono 会有所帮助,但你至少还是会重新订阅两次。)

相反,你可能想要考虑使用 switchOnFirst(),它可以让你根据第一个元素(在这种情况下是头部)动态地转换 Flux。这意味着你可以像这样做:

lines
    .switchOnFirst((signal, flux) -> flux.zipWith(Flux.just(signal.get().split(";")).repeat()))
    .map(row -> convert(row.getT1(), row.getT2()))

请注意,这只是一个简单的例子,在实际使用中你需要根据文档检查 signal 是否真的有值:

> 请注意,源可能会立即完成或出现错误,而不是发出 onNext 信号,必须相应地进行检查。

英文:

> So is it the right way?

There's always more than one way to cook an egg - but the code you have there seems odd / suboptimal for two main reasons:

  • I'd assume it's one line per record / DTO you want to extract, so it's a bit odd you're using flatMapIterable() rather than flatMap()
  • You're going to resubscribe to lines once for each line, when you re-evaluate that Mono. That's almost certainly not what you want to do. (Caching the Mono helps, but you'd still resubscribe at least twice.)

Instead you may want to look at using switchOnFirst(), which will enable you to dynamically transform the Flux based on the first element (the header in your case.) This means you can do something like so:

lines
        .switchOnFirst((signal, flux) -&gt; flux.zipWith(Flux.&lt;String[]&gt;just(signal.get().split(&quot;;&quot;)).repeat()))
        .map(row -&gt; convert(row.getT1(), row.getT2()))

Note this is a bear-bones example, in real-world use you'll need to check whether the signal actually has a value as per the docs:

> Note that the source might complete or error immediately instead of emitting, in which case the Signal would be onComplete or onError. It is NOT necessarily an onNext Signal, and must be checked accordingly.

huangapple
  • 本文由 发表于 2020年5月29日 15:34:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/62080902.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定