有没有一种方法可以通过spring-boot访问通过KSQL(kafka)创建的表格?

huangapple go评论96阅读模式
英文:

Is there a way to access a table created via KSQL (kafka) through spring-boot?

问题

我是Kafka宇宙的新手,我在这里真的陷入困境。所以,任何帮助将非常感激。

我已经使用以下KSQL语句从Kafka流创建了一个表:

CREATE TABLE calc AS 
SELECT id, datetime, count(*) 
FROM streamA 
GROUP BY id, datetime 
HAVING count(*) = total;

其中 "streamA" 是由 "topicA" 创建的流

我目前在使用:

  • Java 8,
  • Spring Boot v2.2.9

我的 pom.xml 如下所示:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns="http://maven.apache.org/POM/4.0.0"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<!-- Packaging -->
	<modelVersion>4.0.0</modelVersion>
	<packaging>jar</packaging>

	<properties>
		<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
	</properties>
	<!-- Versioning -->
	<groupId>some.name</groupId>
	<artifactId>kafka.project</artifactId>
	<version>2020.2.0-SNAPSHOT</version>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.9.RELEASE</version>
		<relativePath />
	</parent>

	<!-- Meta-data -->
	<name>[${project.artifactId}]</name>
	<description>Kafka Project</description>

	<!-- Dependencies -->
	<dependencies>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-test-support</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<!-- Build settings -->
	<build>
		<!-- Plugins -->
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

所以,有两个问题:

  1. 是否有任何方式可以通过 Kafka Streams API 访问该表格?
  2. 我是否可以通过应用程序执行类似的操作(例如创建该表格),而不是使用 KSQL?

提前感谢您的回答。

更新
感谢您的建议 Shrey Jakhmola(https://stackoverflow.com/questions/54102221/whats-the-way-of-running-ksql-from-spring-boot-app),但我有一个需要定期访问的大型数据集。我认为这个解决方案可能不理想。

@Joshua Oliphant,是的,这个表格是由一个从主题创建的流生成的。

英文:

I am new at Kafka's universe and I am really stucked here. So, any help would be very much appreciated.

I have created a table out of a kafka stream, using the below KSQL statement:

CREATE TABLE calc AS 
SELECT id, datetime, count(*) 
FROM streamA 
GROUP BY id, datetime 
HAVING count(*) = total;

where "streamA" is a stream created by "topicA"

I am currently using:

  • Java 8,
  • Spring Boot v2.2.9

My pom.xml looks like:

&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
&lt;project xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot;
xsi:schemaLocation=&quot;http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&quot;&gt;
&lt;!-- Packaging --&gt;
&lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;
&lt;packaging&gt;jar&lt;/packaging&gt;
&lt;properties&gt;
&lt;spring-cloud.version&gt;Hoxton.SR8&lt;/spring-cloud.version&gt;
&lt;/properties&gt;
&lt;!-- Versioning --&gt;
&lt;groupId&gt;some.name&lt;/groupId&gt;
&lt;artifactId&gt;kafka.project&lt;/artifactId&gt;
&lt;version&gt;2020.2.0-SNAPSHOT&lt;/version&gt;
&lt;parent&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-parent&lt;/artifactId&gt;
&lt;version&gt;2.2.9.RELEASE&lt;/version&gt;
&lt;relativePath /&gt;
&lt;/parent&gt;
&lt;!-- Meta-data --&gt;
&lt;name&gt;[${project.artifactId}]&lt;/name&gt;
&lt;description&gt;Kafka Project&lt;/description&gt;
&lt;!-- Dependencies --&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka-streams&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream-binder-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream-binder-kafka-streams&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
&lt;artifactId&gt;spring-kafka&lt;/artifactId&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-starter-test&lt;/artifactId&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;exclusions&gt;
&lt;exclusion&gt;
&lt;groupId&gt;org.junit.vintage&lt;/groupId&gt;
&lt;artifactId&gt;junit-vintage-engine&lt;/artifactId&gt;
&lt;/exclusion&gt;
&lt;/exclusions&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-stream-test-support&lt;/artifactId&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.kafka&lt;/groupId&gt;
&lt;artifactId&gt;spring-kafka-test&lt;/artifactId&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;dependencyManagement&gt;
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework.cloud&lt;/groupId&gt;
&lt;artifactId&gt;spring-cloud-dependencies&lt;/artifactId&gt;
&lt;version&gt;${spring-cloud.version}&lt;/version&gt;
&lt;type&gt;pom&lt;/type&gt;
&lt;scope&gt;import&lt;/scope&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;/dependencyManagement&gt;
&lt;!-- Build settings --&gt;
&lt;build&gt;
&lt;!-- Plugins --&gt;
&lt;plugins&gt;
&lt;plugin&gt;
&lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
&lt;artifactId&gt;spring-boot-maven-plugin&lt;/artifactId&gt;
&lt;/plugin&gt;
&lt;/plugins&gt;
&lt;/build&gt;
&lt;/project&gt;

So, have two questions:

  1. Is there any way to access that table via Kafka Streams API?
  2. Could I do something similar (e.g. creating that table) through my application instead of KSQL?

Thank you in advance

UPDATE
Thank you for your suggestion Shrey Jakhmola (https://stackoverflow.com/questions/54102221/whats-the-way-of-running-ksql-from-spring-boot-app), but I have a big dataset which needs to be accessed in a regular basis. I don't think this solution would be ideal.

@Joshua Oliphant, yes this table is generated by a stream which is created from a topic.

答案1

得分: 1

  1. 通过Kafka Streams API有没有访问该表的方式?

calc将由名为CALC的更改日志主题支持。如果需要,您可以在应用程序中消费此主题,无论是使用标准消费者还是Kafka Streams。

但是,如果您只想查询表的当前状态,您可以使用ksqlDB的拉取查询来实现。这些查询允许您从由ksqlDB构建的表中拉取行。这个功能相对基础,因为它不是ksqlDB提供的核心“流”SQL的一部分,但它满足了一些用例。

如果您需要更多功能,那么您还有其他选择:

  1. 您可以将结果发送到您选择的更传统的SQL系统,例如postgres,并在其中查询。(您可以使用ksql的CREATE SINK CONNECTOR将数据导出到postgres)。
  2. 您可以使用标准Kafka客户端在您自己的应用程序中消费数据。(但是,只有在您的每个应用程序实例可以容纳表中所有数据的情况下,这种方法才能正常工作)。
  3. 您可以在应用程序中使用Kafka Streams来消费表格。这样做的好处是,您的应用程序的多个实例可以聚集在一起,这样每个实例只消费表格数据的一部分。然后,您可能希望利用Kafka Streams交互式查询来访问表的当前状态。
  1. 我能否通过我的应用程序执行类似的操作(例如创建那个表),而不是使用KSQL?

如果您想排除ksqlDB,那么是的,ksqlDB在内部使用Kafka Streams,因此您可以直接使用Kafka Streams执行任何可以通过ksqlDB执行的操作。

类似于以下SQL:

CREATE TABLE calc AS 
   SELECT id, datetime, count(*) 
   FROM streamA 
   GROUP BY id, datetime 
   HAVING count(*) = total;

会映射为类似于以下的代码(粗略代码):

StreamsBuilder builder = new StreamsBuilder();

builder
   .stream("streamA", Consumed.with(<适当的序列化配置>))
   .groupBy(<返回id和datetime作为新键的映射器>)
   .count()
   .filter(<过滤器>)
   .toStream()
   .to("CALC");

new KafkaStreams(builder.build(), props, clients).start();
英文:

> 1. Is there any way to access that table via Kafka Streams API?

Table calc will be backed by a changelog topic called CALC. You are free to consume this topic in your application if you need. Either using the standard consumer or Kafka Streams.

However, if all you're wanting to do is query the current state of the table, then you can do so using ksqlDB's pull queries. These allow you to pull rows back from the table being built by ksqlDB. The functionality is basic, as its not part of the core streaming SQL that ksqlDB provides, but meets some use-cases.

If you need something beyond this, then there are other options open to you:

  1. You can pump the result into a more traditional sql system of your choice, e.g. postgres, and query that. (You can use ksql's CREATE SINK CONNECTOR to export the data to postgres).
  2. You can consume the data in your own app using the standard Kafka client. (Though this only works well if each instance of your app can hold all the data in the table).
  3. You can use Kafka Streams within your app to consume the table. This has the benefit that multiple instances of your app can cluster together, so that each only consumes a portion of the table's data. You may then want to make use of Kafka Streams Interactive Queries to access the current state of the table.ation will load

> 2. Could I do something similar (e.g. creating that table) through my application instead of KSQL?

If you want to cut ksqlDB out of the equation, then yes, ksqlDB is internally using KAfka streams, so anything you can do with ksqlDB, you can also do directly with Kafka Streams.

SQL like:

CREATE TABLE calc AS 
   SELECT id, datetime, count(*) 
   FROM streamA 
   GROUP BY id, datetime 
   HAVING count(*) = total;

Would map to something like (rough code):

StreamsBuilder builder = new StreamsBuilder();

builder
   .stream(&quot;streamA&quot;, Consumed.with(&lt;appropriate serde&gt;))
   .groupBy(&lt;a mapper that returns id and datetime as new key&gt;)
   .count()
   .filter(&lt;filter&gt;);
   .toStream()
   .to(&quot;CALC&quot;);

new KafkaStreams(builder.build(), props, clients).start();

huangapple
  • 本文由 发表于 2020年9月16日 22:20:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/63922095.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定