使用CTE在Apache Flink SQL中

huangapple go评论69阅读模式
英文:

Using CTE in apache flink sql

问题

以下是要翻译的内容:

  1. 我正在尝试编写一个在Flink中使用CTESQL语句。
  2. 我有一个定义的表
  3. CREATE TABLE test_cte
  4. (
  5. pod VARCHAR,
  6. PRIMARY KEY (pod) NOT ENFORCED
  7. ) WITH (
  8. 'connector' = 'upsert-kafka',
  9. 'topic' = 'test_cte',
  10. 'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
  11. 'properties.group.id' = 'test_cte_group_id',
  12. 'value.format' = 'json',
  13. 'key.format' = 'json',
  14. 'properties.allow.auto.create.topics' = 'true',
  15. 'properties.replication.factor' = '3',
  16. 'value.json.timestamp-format.standard' = 'ISO-8601',
  17. 'sink.parallelism' = '3'
  18. );

然后,我插入如下:

  1. WITH q1 AS (SELECT pod FROM source)
  2. FROM q1
  3. INSERT OVERWRITE TABLE test_cte
  4. SELECT pod;

我收到一个错误,显示org.apache.flink.sql.parser.impl.ParseException: 在第2行,第2列附近的关键字 'FROM' 附近的语法不正确。

source表具有列pod

当我只运行类似于这里的选择时

  1. WITH q1 AS (SELECT pod FROM roles_deleted_raw_v1)
  2. SELECT * FROM q1;

它可以看到结果。

英文:

I am trying to write a sql that use CTE in flink.

I have a table defined

  1. CREATE TABLE test_cte
  2. (
  3. pod VARCHAR,
  4. PRIMARY KEY (pod) NOT ENFORCED
  5. ) WITH (
  6. 'connector' = 'upsert-kafka',
  7. 'topic' = 'test_cte',
  8. 'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
  9. 'properties.group.id' = 'test_cte_group_id',
  10. 'value.format' = 'json',
  11. 'key.format' = 'json',
  12. 'properties.allow.auto.create.topics' = 'true',
  13. 'properties.replication.factor' = '3',
  14. 'value.json.timestamp-format.standard' = 'ISO-8601',
  15. 'sink.parallelism' = '3'
  16. );

then I have insert as

  1. WITH q1 AS ( SELECT pod FROM source )
  2. FROM q1
  3. INSERT OVERWRITE TABLE test_cte
  4. SELECT pod;

I get an error saying org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'FROM' at line 2, column 2.

source tables has the column pod.

When I run just the select like here

  1. WITH q1 AS ( SELECT pod FROM roles_deleted_raw_v1)
  2. select * from q1;

its can see the see the result

答案1

得分: 0

CTE仅在使用Hive方言时在Flink中可用:

SET table.sql-dialect = hive;

然而,这个功能仅由HiveCatalog目录支持,因此无法与upsert-kafka一起使用。

有关更多信息,请查看Flink的文档

英文:

CTE is only available in Flink when using Hive dialect:

SET table.sql-dialect = hive;

However, this feature is only supported by the HiveCatalog catalog, so it is not possible to use with upsert-kafka.

For more info on this, you can check out the Flink docs.

huangapple
  • 本文由 发表于 2023年2月10日 10:06:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/75406285.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定