英文:
Using CTE in apache flink sql
问题
以下是要翻译的内容:
我正在尝试编写一个在Flink中使用CTE的SQL语句。
我有一个定义的表
CREATE TABLE test_cte
(
pod VARCHAR,
PRIMARY KEY (pod) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test_cte',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'test_cte_group_id',
'value.format' = 'json',
'key.format' = 'json',
'properties.allow.auto.create.topics' = 'true',
'properties.replication.factor' = '3',
'value.json.timestamp-format.standard' = 'ISO-8601',
'sink.parallelism' = '3'
);
然后,我插入如下:
WITH q1 AS (SELECT pod FROM source)
FROM q1
INSERT OVERWRITE TABLE test_cte
SELECT pod;
我收到一个错误,显示org.apache.flink.sql.parser.impl.ParseException: 在第2行,第2列附近的关键字 'FROM' 附近的语法不正确。
source
表具有列pod
。
当我只运行类似于这里的选择时
WITH q1 AS (SELECT pod FROM roles_deleted_raw_v1)
SELECT * FROM q1;
它可以看到结果。
英文:
I am trying to write a sql that use CTE in flink.
I have a table defined
CREATE TABLE test_cte
(
pod VARCHAR,
PRIMARY KEY (pod) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'test_cte',
'properties.bootstrap.servers' = 'localhost:29092,localhost:39092',
'properties.group.id' = 'test_cte_group_id',
'value.format' = 'json',
'key.format' = 'json',
'properties.allow.auto.create.topics' = 'true',
'properties.replication.factor' = '3',
'value.json.timestamp-format.standard' = 'ISO-8601',
'sink.parallelism' = '3'
);
then I have insert as
WITH q1 AS ( SELECT pod FROM source )
FROM q1
INSERT OVERWRITE TABLE test_cte
SELECT pod;
I get an error saying org.apache.flink.sql.parser.impl.ParseException: Incorrect syntax near the keyword 'FROM' at line 2, column 2.
source
tables has the column pod
.
When I run just the select like here
WITH q1 AS ( SELECT pod FROM roles_deleted_raw_v1)
select * from q1;
its can see the see the result
答案1
得分: 0
CTE仅在使用Hive方言时在Flink中可用:
SET table.sql-dialect = hive;
然而,这个功能仅由HiveCatalog
目录支持,因此无法与upsert-kafka
一起使用。
有关更多信息,请查看Flink的文档。
英文:
CTE is only available in Flink when using Hive dialect:
SET table.sql-dialect = hive;
However, this feature is only supported by the HiveCatalog
catalog, so it is not possible to use with upsert-kafka
.
For more info on this, you can check out the Flink docs.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论