Apache Flink 表查询结果作为字符串值

huangapple go评论69阅读模式
英文:

Apache Flink Table query result as string values

问题

我正在使用 Flink 的表 API 编写一个查询来检索一条记录。然后检查是否找到记录,如果找到,则获取该记录的每个列值的字符串。

例如:

用户表: 
| 编号 | 姓名 | 电话    |
|------|------|---------|
| 01   | 山姆 | 23354   |
| 02   | 杰克 | 23352   |
| 03   | 金姆 | 23351   |

问题在于 Flink 仅从查询返回表,因此我无法执行以下操作:1. 检查是否找到记录;2. 获取找到的记录的各个列值。

伪代码:

找到的记录 = 根据电话查找记录
  如果找到的记录 {
    创建 Visitor 的新实例
    Visitor.姓名 = 找到的记录.姓名
    Visitor.编号 = 找到的记录.编号
  } else {
    抛出异常
  }

根据 Flink 文档建议,下面的代码为我提供了一个表,但我不确定如何实现上述伪代码,因为它返回的又是另一个表,而我需要实际的记录值。

Table users = registeredUsers.select("id, name, phone").where("phone === '23354'");

Flink 文档供参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#expression-syntax

英文:

I am writing a query from a flink table api to retrieve a record. Then check if a record was found and if so, get the string value of each of the record's column values.

i.e.

users: 
|id | name | phone |
|---|------|-------|
| 01| sam  | 23354 |
| 02| jake | 23352 |
| 03| kim  | 23351 |

Issue is that flink only returns Table from a query so i am not able to 1: check if a record was found and 2: get the individual values of the found record's values

sudo code:

foundRecord = find record by phone
  if foundRecord {
    create new instance of Visitor
    Visitor.name = foundRecord.name
    Visitor.id = foundRecord.id
  } else {
    throw exception
  }

The code below as recommended by flink docs gives me a table but not sure how to implement the above sudo code since it is returning as another table and i need the actual record values.

Table users = registeredUsers.select("id, name, phone").where("phone === '23354'"));

Flink Docs for ref: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html#expression-syntax

答案1

得分: 0

为了确定找不到匹配的记录输入必须是有界的--因此我们将使用`BatchTableEnvironment`而不是`StreamTableEnvironment`。(对于流式输入可能最终会到达匹配的记录并满足查询只有使用批处理输入我们才能证明没有匹配。)

```scala
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.util.Collector

class MissingResultException() extends Exception {}

object Phone {
  case class Visitor(name: String, id: String)

  @throws[Exception]
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)

    val rawInput = env.fromElements(
      ("01", "sam", "23354"),
      ("02", "jake", "23352"),
      ("03", "kim", "23351"))

    val events = tableEnv.fromDataSet(rawInput, 'id, 'name, 'phone)
    tableEnv.registerTable("events", events)
    val resultTable = tableEnv
      .from("events")
      .select('id, 'name, 'phone)
      .where("phone === 'missing'")

    val results = resultTable.toDataSet[Row]

    results
      .map(row => new Visitor(row.getField(1).toString, row.getField(0).toString))
      .print

    val count: DataSet[Long] = env.fromElements(results.count())

    count
      .flatMap(new FlatMapFunction[Long, Collector[Long]]{
        
        override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
          if (x == 0L) {
            throw new MissingResultException
          }
        }})
      
      .print()
  }
}

我用来检测结果集为空的方法感觉有点像是一种技巧,但我想不出更好的方法。请注意,最后的print()是必要的,尽管没有要打印的内容,因为任何最终未传递到接收器的计算都将被优化掉,不会执行。


<details>
<summary>英文:</summary>
In order to know that a matching record cannot be found, the input must be bounded -- so we&#39;ll use a `BatchTableEnvironment`, and not a `StreamTableEnvironment`. (With streaming input, a matching record might eventually arrive and satisfy the query. Only with batch input can we prove the absence of a match.)
```scala
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.util.Collector
class MissingResultException() extends Exception {}
object Phone {
case class Visitor(name: String, id: String)
@throws[Exception]
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
val rawInput = env.fromElements(
(&quot;01&quot;, &quot;sam&quot;, &quot;23354&quot;),
(&quot;02&quot;, &quot;jake&quot;, &quot;23352&quot;),
(&quot;03&quot;, &quot;kim&quot;, &quot;23351&quot;))
val events = tableEnv.fromDataSet(rawInput, &#39;id, &#39;name, &#39;phone)
tableEnv.registerTable(&quot;events&quot;, events)
val resultTable = tableEnv
.from(&quot;events&quot;)
.select(&#39;id, &#39;name, &#39;phone)
.where(&quot;phone === &#39;missing&#39;&quot;)
val results = resultTable.toDataSet[Row]
results
.map(row =&gt; new Visitor(row.getField(1).toString, row.getField(0).toString))
.print
val count: DataSet[Long] = env.fromElements(results.count())
count
.flatMap(new FlatMapFunction[Long, Collector[Long]]{
override def flatMap(x: Long, collector: Collector[Collector[Long]]): Unit = {
if (x == 0L) {
throw new MissingResultException
}
}})
.print()
}
}

The approach I used to detect that the result set is empty feels like something of a hack, but I couldn't think of anything better. Note that the print() at the very end is necessary, though there's nothing to print, because any computation that isn't ultimately fed to a sink will be optimized away, and not executed.

huangapple
  • 本文由 发表于 2020年5月30日 04:25:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/62094093.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定