英文:
Create a table from a topic using a where clause using ksql
问题
I'm using the latest version of Kafka SQL Server 0.29.2, I guess. I'm trying to create a reading table that reads from a specific topic which receives lots of events, but I'm interested in specific events. The JSON event has a property named "evenType", so I want to continually filter the events and create a specific table to store the client data, like phone number, email, etc., to update the client info.
我正在使用Kafka SQL Server的最新版本0.29.2,我猜。我尝试创建一个读取特定主题的读取表,该主题接收大量事件,但我只对特定事件感兴趣。JSON事件具有名为"eventType"的属性,因此我希望不断筛选事件并创建一个特定的表来存储客户数据,如电话号码、电子邮件等,以更新客户信息。
I created a stream called orders_inputs only for testing purposes, and then I tried to create this table, but I got that error.
我仅为测试目的创建了一个名为"orders_inputs"的流,然后尝试创建这个表,但出现了错误。
create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
line 1:120: mismatched input 'as' expecting ';'
Statement: create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
Caused by: line 1:120: mismatched input 'as' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException
创建表orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
第1行:120: 预期输入'as'而不是';'
语句: 创建表orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
导致原因: 第1行:120: 预期输入'as'而不是';'
导致原因: org.antlr.v4.runtime.InputMismatchException
英文:
I'm using the latest version of Kafka sql server 0.29.2, I guess. I'm trying to create a reading table that reads from a specific topic which receives lots of events, but I'm interested in specific events. The JSON event has a property named "evenType", so I want to continually filter the events and create a specific table to store the client data, like phone number, email etc., to update the client info.
I created a stream called orders_inputs only for testing purposes, and then I tried to create this table, but I got that error.
create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
line 1:120: mismatched input 'as' expecting ';'
Statement: create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
Caused by: line 1:120: mismatched input 'as' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException
答案1
得分: 0
如果您想要创建一个包含流查询结果的表,您可以使用 CREATE TABLE AS SELECT
例如:
CREATE TABLE orders AS
SELECT orderid, itemid FROM orders_inputs
WHERE type='t1';
在创建流order_inputs
时,您可以指定主键:https://docs.confluent.io/5.4.4/ksql/docs/developer-guide/syntax-reference.html#message-keys
否则,您可以在从主题创建表时指定主键:
https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-table-with-selected-columns
例如:
CREATE TABLE orders
(orderid VARCHAR PRIMARY KEY,
itemid VARCHAR)
WITH (KAFKA_TOPIC = 'orders',
VALUE_FORMAT='JSON');
但是,然后您将需要查询表并过滤type=t1
。
英文:
If you are wanting to create a table that contains the results of a select query from a stream you can use CREATE TABLE AS SELECT
e.g.
CREATE TABLE orders AS
SELECT orderid, itemid FROM orders_inputs
WHERE type='t1';
You can specify the primary key when creating the stream order_inputs: https://docs.confluent.io/5.4.4/ksql/docs/developer-guide/syntax-reference.html#message-keys
Otherwise, you can specify the primary key when creating a table from a topic:
https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-table-with-selected-columns
e.g.
CREATE TABLE orders
(orderid VARCHAR PRIMARY KEY,
itemid VARCHAR)
WITH (KAFKA_TOPIC = 'orders',
VALUE_FORMAT='JSON');
However, you would then have to query the table and filter where type=t1
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论