使用CTE在Apache Flink SQL中

huangapple go评论37阅读模式
英文:

Using CTE in apache flink sql

问题

以下是要翻译的内容:

我正在尝试编写一个在Flink中使用CTE的SQL语句。

我有一个定义的表
CREATE TABLE test_cte
    (
        pod                     VARCHAR,
        PRIMARY KEY (pod) NOT ENFORCED
    ) WITH (
          'connector' = 'upsert-kafka',
          'topic' = 'test_cte',
          'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
          'properties.group.id' = 'test_cte_group_id',
          'value.format' = 'json',
          'key.format' = 'json',
          'properties.allow.auto.create.topics' = 'true',
          'properties.replication.factor' = '3',
          'value.json.timestamp-format.standard' = 'ISO-8601',
          'sink.parallelism' = '3'
          );

然后,我插入如下:

WITH q1 AS (SELECT pod FROM source) 
FROM q1
INSERT OVERWRITE TABLE test_cte
SELECT pod;

我收到一个错误,显示org.apache.flink.sql.parser.impl.ParseException: 在第2行,第2列附近的关键字 'FROM' 附近的语法不正确。

source表具有列pod

当我只运行类似于这里的选择时

WITH q1 AS (SELECT pod FROM roles_deleted_raw_v1)
SELECT * FROM q1;

它可以看到结果。

英文:

I am trying to write a sql that use CTE in flink.

I have a table defined

CREATE TABLE test_cte
    (
        pod                     VARCHAR,
        PRIMARY KEY (pod) NOT ENFORCED
    ) WITH (
          'connector' = 'upsert-kafka',
          'topic' = 'test_cte',
          'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
          'properties.group.id' = 'test_cte_group_id',
          'value.format' = 'json',
          'key.format' = 'json',
          'properties.allow.auto.create.topics' = 'true',
          'properties.replication.factor' = '3',
          'value.json.timestamp-format.standard' = 'ISO-8601',
          'sink.parallelism' = '3'
          );

then I have insert as

WITH q1 AS ( SELECT pod FROM source ) 
 FROM q1
INSERT OVERWRITE TABLE test_cte
SELECT pod;

I get an error saying org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'FROM' at line 2, column 2.

source tables has the column pod.

When I run just the select like here

WITH q1 AS ( SELECT pod FROM roles_deleted_raw_v1)
select * from q1;

its can see the see the result

答案1

得分: 0

CTE仅在使用Hive方言时在Flink中可用:

SET table.sql-dialect = hive;

然而,这个功能仅由HiveCatalog目录支持,因此无法与upsert-kafka一起使用。

有关更多信息,请查看Flink的文档

英文:

CTE is only available in Flink when using Hive dialect:

SET table.sql-dialect = hive;

However, this feature is only supported by the HiveCatalog catalog, so it is not possible to use with upsert-kafka.

For more info on this, you can check out the Flink docs.

huangapple
  • 本文由 发表于 2023年2月10日 10:06:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/75406285.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定