英文:
Flink TableAPI: PartitionedBy columns missing in Parquet files
问题
我使用filesystem
连接器将数据以parquet
格式汇入S3,使用TableAPI。我发现在parquet文件中缺少partitionedBy列。以下是我使用的查询:
CREATE TABLE data_to_sink (
record_id STRING NOT NULL,
request_id STRING NOT NULL,
source_name STRING NOT NULL,
event_type STRING NOT NULL,
event_name STRING NOT NULL,
`date` STRING,
results_count BIGINT
) PARTITIONED BY (record_id, source_name, `date`) WITH (
'connector' = 'filesystem',
'path' = '<S3 path>',
'format' = 'parquet'
);
INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
我可以看到parquet文件已创建,但当我使用parquet-cli
工具验证模式时,模式不显示record_id
,source_name
和date
字段。我还验证了Flink的文档,但没有找到任何设置。这方面是否有已知问题?
英文:
I’m using filesystem
connector to sink data into S3 in parquet
format using TableAPI. I observed the partitionedBy columns are missing in the parquet file. Here are the queries I’m using:
CREATE TABLE data_to_sink (
record_id STRING NOT NULL,
request_id STRING NOT NULL,
source_name STRING NOT NULL,
event_type STRING NOT NULL,
event_name STRING NOT NULL,
`date` STRING,
results_count BIGINT
) PARTITIONED BY (record_id, source_name, `date`) WITH (
'connector' = 'filesystem',
'path' = '<S3 path>',
'format' = 'parquet'
);
INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
I can see the parquet files being created, but when I verified the schema using parquet-cli
tool, the schema doesn’t show record_id
, source_name
and date
fields. I verified Flink's document as well, but didn’t find any setting for this.
Is there any known issue around this?
答案1
得分: 1
I fixed this by cloning record_id, source_name columns and then partitioning by those columns.
创建表 data_to_sink (
record_id 字符串 非空,
request_id 字符串 非空,
source_name 字符串 非空,
event_type 字符串 非空,
event_name 字符串 非空,
date
字符串,
results_count 长整数,
recordId 字符串,
sourceName 字符串
) PARTITIONED BY (recordId, sourceName, date
) WITH (
'connector' = 'filesystem',
'path' = '
'format' = 'parquet'
);
INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count,
record_id AS recordId, source_name AS sourceName
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
英文:
I fixed this by cloning record_id, source_name columns and then partitioning by those columns.
CREATE TABLE data_to_sink (
record_id STRING NOT NULL,
request_id STRING NOT NULL,
source_name STRING NOT NULL,
event_type STRING NOT NULL,
event_name STRING NOT NULL,
`date` STRING,
results_count BIGINT,
recordId STRING,
sourceName STRING
) PARTITIONED BY (recordId, sourceName, `date`) WITH (
'connector' = 'filesystem',
'path' = '<S3 path>',
'format' = 'parquet'
);
INSERT INTO data_to_sink
SELECT record_id, request_id, source_name, event_type, event_name,
DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count,
record_id AS recordId, source_name AS sourceName
FROM data_from_source
GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论