使用ksql从一个主题创建一张带有where子句的表格。

huangapple go评论49阅读模式
英文:

Create a table from a topic using a where clause using ksql

问题

I'm using the latest version of Kafka SQL Server 0.29.2, I guess. I'm trying to create a reading table that reads from a specific topic which receives lots of events, but I'm interested in specific events. The JSON event has a property named "evenType", so I want to continually filter the events and create a specific table to store the client data, like phone number, email, etc., to update the client info.

我正在使用Kafka SQL Server的最新版本0.29.2,我猜。我尝试创建一个读取特定主题的读取表,该主题接收大量事件,但我只对特定事件感兴趣。JSON事件具有名为"eventType"的属性,因此我希望不断筛选事件并创建一个特定的表来存储客户数据,如电话号码、电子邮件等,以更新客户信息。

I created a stream called orders_inputs only for testing purposes, and then I tried to create this table, but I got that error.

我仅为测试目的创建了一个名为"orders_inputs"的流,然后尝试创建这个表,但出现了错误。

 create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
line 1:120: mismatched input 'as' expecting ';'
Statement: create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
Caused by: line 1:120: mismatched input 'as' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException
创建表orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
第1行:120: 预期输入'as'而不是';'
语句: 创建表orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
导致原因: 第1行:120: 预期输入'as'而不是';'
导致原因: org.antlr.v4.runtime.InputMismatchException
英文:

I'm using the latest version of Kafka sql server 0.29.2, I guess. I'm trying to create a reading table that reads from a specific topic which receives lots of events, but I'm interested in specific events. The JSON event has a property named "evenType", so I want to continually filter the events and create a specific table to store the client data, like phone number, email etc., to update the client info.

I created a stream called orders_inputs only for testing purposes, and then I tried to create this table, but I got that error.

 create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
line 1:120: mismatched input 'as' expecting ';'
Statement: create table orders(orderid varchar PRIMARY KEY, itemid varchar) WITH (KAFKA_TOPIC='ORDERS', PARTITIONS=1, REPLICAS=1) as select orderid, itemid from orders_inputs where type='t1';
Caused by: line 1:120: mismatched input 'as' expecting ';'
Caused by: org.antlr.v4.runtime.InputMismatchException

答案1

得分: 0

如果您想要创建一个包含流查询结果的表,您可以使用 CREATE TABLE AS SELECT

https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-ksql-table-with-streaming-query-results

例如:

CREATE TABLE orders AS 
SELECT orderid, itemid FROM orders_inputs 
WHERE type='t1';

在创建流order_inputs时,您可以指定主键:https://docs.confluent.io/5.4.4/ksql/docs/developer-guide/syntax-reference.html#message-keys

否则,您可以在从主题创建表时指定主键:
https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-table-with-selected-columns

例如:

CREATE TABLE orders
  (orderid VARCHAR PRIMARY KEY,
   itemid VARCHAR)
WITH (KAFKA_TOPIC = 'orders',
      VALUE_FORMAT='JSON');

但是,然后您将需要查询表并过滤type=t1

英文:

If you are wanting to create a table that contains the results of a select query from a stream you can use CREATE TABLE AS SELECT

https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-ksql-table-with-streaming-query-results

e.g.

CREATE TABLE orders AS 
SELECT orderid, itemid FROM orders_inputs 
WHERE type='t1';

You can specify the primary key when creating the stream order_inputs: https://docs.confluent.io/5.4.4/ksql/docs/developer-guide/syntax-reference.html#message-keys

Otherwise, you can specify the primary key when creating a table from a topic:
https://docs.confluent.io/5.2.1/ksql/docs/developer-guide/create-a-table.html#create-a-table-with-selected-columns

e.g.

CREATE TABLE orders
  (orderid VARCHAR PRIMARY KEY,
   itemid VARCHAR)
WITH (KAFKA_TOPIC = 'orders',
      VALUE_FORMAT='JSON');

However, you would then have to query the table and filter where type=t1

huangapple
  • 本文由 发表于 2023年2月8日 12:49:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/75381484.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定