ksqldb – 拉取查询只有一个传递,多个应用程序实例

huangapple go评论96阅读模式
英文:

ksqldb - exactly one delivery for pull query, multiple app instances

问题

我正在尝试在ksqldb之上构建一个应用程序。

假设我有一个简单的生产者:

package main

import (
	"fmt"
	"github.com/rmoff/ksqldb-go"
	"net/http"
)

var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()

func init() {
	offset := `SET 'auto.offset.reset' = 'earliest';`
	if err := client.Execute(offset); err != nil {
		panic(err)
	}

	s1 := `
		CREATE OR REPLACE STREAM userEvents (
			userId VARCHAR KEY,
			eventType VARCHAR
		)
  		WITH (
 			kafka_topic='user_events', 
			value_format='json', 
			partitions=8
		);
	`
	if err := client.Execute(s1); err != nil {
		panic(err)
	}
}

func main() {
	http.HandleFunc("/emit", hello)
	http.ListenAndServe(":4201", nil)
}

func hello(w http.ResponseWriter, req *http.Request) {
	userId := req.URL.Query().Get("userId")
	if userId == "" {
		http.Error(w, "no userId", 400)
		return
	}
	userEvent := req.URL.Query().Get("event")
	if userEvent == "" {
		userEvent = "unknown"
	}

	err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",
		userId, userEvent))
	if err != nil {
		http.Error(w, err.Error(), 500)
		return
	}

	w.WriteHeader(200)
	return
}

该应用程序创建了一个数据流,并公开了一个端点来填充该数据流。

此外,我还有一个消费者:

package main

import (
	"context"
	"fmt"
	"github.com/rmoff/ksqldb-go"
)

var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()

func main() {
	query := `SET 'auto.offset.reset' = 'earliest';`
	if err := client.Execute(query); err != nil {
		panic(err)
	}

	ctx := context.TODO()
	rows := make(chan ksqldb.Row)
	headers := make(chan ksqldb.Header)
	go func() {
		if err := client.Push(ctx,
			"SELECT * FROM userEvents EMIT CHANGES;",
			rows,
			headers); err != nil {
			panic(err)
		}
	}()

	h := <-headers
	fmt.Printf("headers: [%v]", h)

	for {
		select {
		case r := <-rows:
			fmt.Printf("received event: [%v]", r)
		}
	}
}

我运行一个生产者和多个消费者,使用相同的查询。如何(是否可能)只在一个消费者上接收事件?现在,使用这样的设置,我在所有可用的消费者上接收这些事件,但我希望在单个消费者上处理事件(处理时间可能很长,因此我需要并行处理)。

老实说,我以为这是“标准”的,即所有连接的应用程序都属于同一组,并且我可以免费获得这种传递方式。

本地集群配置(这是Confluent how-to-start的标准配置):

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.23.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.23.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

我的配置有问题吗?还是我对这个数据库的使用有误解?谢谢你的帮助!

英文:

I am trying to build an application on top of ksqldb.

Let's say I will have a simple producer:

package main

import (
	&quot;fmt&quot;
	&quot;github.com/rmoff/ksqldb-go&quot;
	&quot;net/http&quot;
)

var client = ksqldb.NewClient(&quot;http://localhost:8088&quot;, &quot;&quot;, &quot;&quot;).Debug()

func init() {
	offset := `SET &#39;auto.offset.reset&#39; = &#39;earliest&#39;;`
	if err := client.Execute(offset); err != nil {
		panic(err)
	}

	s1 := `
		CREATE OR REPLACE STREAM userEvents (
			userId VARCHAR KEY,
			eventType VARCHAR
		)
  		WITH (
 			kafka_topic=&#39;user_events&#39;, 
			value_format=&#39;json&#39;, 
			partitions=8
		);
	`
	if err := client.Execute(s1); err != nil {
		panic(err)
	}
}

func main() {
	http.HandleFunc(&quot;/emit&quot;, hello)
	http.ListenAndServe(&quot;:4201&quot;, nil)
}

func hello(w http.ResponseWriter, req *http.Request) {
	userId := req.URL.Query().Get(&quot;userId&quot;)
	if userId == &quot;&quot; {
		http.Error(w, &quot;no userId&quot;, 400)
		return
	}
	userEvent := req.URL.Query().Get(&quot;event&quot;)
	if userEvent == &quot;&quot; {
		userEvent = &quot;unknown&quot;
	}

	err := client.Execute(fmt.Sprintf(&quot;INSERT INTO userEvents (userId, eventType) VALUES (&#39;%s&#39;, &#39;%s&#39;);&quot;,
		userId, userEvent))
	if err != nil {
		http.Error(w, err.Error(), 500)
		return
	}

	w.WriteHeader(200)
	return
}

This app creates one stream of data and exposes one endpoint to populate the stream with the data.

Also, I have a consumer:

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;github.com/rmoff/ksqldb-go&quot;
)

var client = ksqldb.NewClient(&quot;http://localhost:8088&quot;, &quot;&quot;, &quot;&quot;).Debug()

func main() {
	query := `SET &#39;auto.offset.reset&#39; = &#39;earliest&#39;;`
	if err := client.Execute(query); err != nil {
		panic(err)
	}

	ctx := context.TODO()
	rows := make(chan ksqldb.Row)
	headers := make(chan ksqldb.Header)
	go func() {
		if err := client.Push(ctx,
			&quot;SELECT * FROM userEvents EMIT CHANGES;&quot;,
			rows,
			headers); err != nil {
			panic(err)
		}
	}()

	h := &lt;-headers
	fmt.Printf(&quot;headers: [%v]&quot;, h)

	for {
		select {
		case r := &lt;-rows:
			fmt.Printf(&quot;received event: [%v]&quot;, r)
		}
	}
}

And I run one producer and multiple consumers, with the same query. How (and is it possible?) to receive the event only on one consumer? Now, with such a setup I receive those events on all available consumers, but I would like to process the event on a single one (the processing will be quite long, so I need this for parallelism).

To be honest I thought it's "standard", that all connected apps belong to the same group, and this kind of delivery I have for free.

The local cluster configuration (it's the standard config from Confluentic how-to-start):

---
version: &#39;2&#39;

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - &quot;2181:2181&quot;
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - &quot;29092:29092&quot;
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: &#39;zookeeper:2181&#39;
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.23.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - &quot;8088:8088&quot;
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: &quot;true&quot;
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: &quot;true&quot;

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.23.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Is something wrong with my configuration or do I misunderstand the usage of this database?
Thanks for your help!

答案1

得分: 2

首先,请注意我不再维护那个客户端,你可能想要查看 https://github.com/thmeitz/ksqldb-go

现在回答你的问题。如果我理解正确,你想要运行多个相同逻辑的消费者实例以实现并行处理,并且每条消息应该由该逻辑消费者处理一次。

如果是这样的话,你描述的是 Kafka 中所谓的“消费者组”。多个消费者实例使用相同的客户端 ID 进行标识,Kafka 确保来自源主题各个分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,每个消费者将获取两个分区的数据。如果一个消费者离开了消费者组(崩溃、缩容等),Kafka 会重新分配该消费者的分区给组中的其他消费者。

这与你所看到的行为不同,你实际上是在实例化多个独立的消费者。按设计,Kafka 确保订阅主题的每个消费者都接收该主题上的所有消息。

我故意在这里谈论 Kafka,而不是 ksqlDB。这是因为 ksqlDB 是构建在 Kafka 之上的,为了理解你所看到的情况,解释底层基础知识是很重要的。

为了获得你所期望的行为,你可能需要在消费者应用程序中直接使用 Consumer API。你可以在这个 Golang 和 Kafka 的快速入门示例中看到 Consumer API 的示例。要创建一个消费者组,你需要指定一个唯一的 group.id

英文:

First up, note that I no longer maintain that client, and you might want to check out https://github.com/thmeitz/ksqldb-go instead.


Now onto your question. If I'm understanding correctly you want to run multiple instances of the same logical consumer for parallelism purposes, and thus each message should be processed by that logical consumer once.

If that's the case then you are describing what is called a consumer group in Kafka. Multiple instances of a consumer identify themselves with the same client ID and Kafka ensures that data from across the source topic's partitions is routed to the available consumers within that group. If there are four consumers and eight partitions, each consumer is going to get the data from two partitions. If one consumer leaves the group (it crashes, you scale down, etc) then Kafka reassigns that consumer's partitions across the remaining consumers with the group.

This is different behaviour from what you are seeing, in which you are effectively instantiating multiple independent consumers. By design, Kafka ensures that each consumer that is subscribed to a topic receives all of the messages on that topic.

I'm deliberately talking about Kafka here, and not ksqlDB. That's because ksqlDB is built on Kafka and in order to make sense of what you are seeing it's important to explain the underpinning fundamentals.

To get the behaviour that you're looking for you probably want to look at using the Consumer API directly in your consumer application. You can see an example of the Consumer API in this quickstart for Golang and Kafka. To create a consumer group you specify a unique group.id.

huangapple
  • 本文由 发表于 2022年2月15日 03:12:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/71117119.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定