英文:
How to access Kafka Connect created by Strimzi?
问题
I'm here to provide translations. Here's the translated text:
我正在尝试通过Strimzi创建的Kafka Connect,通过Redpanda Console来访问。
我通过以下方式安装了我的Kafka Connect:
kubectl apply --filename=hm-kafka-iot-kafka-connect.yaml
hm-kafka-iot-kafka-connect.yaml:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hm-kafka-iot-kafka-connect
namespace: hm-kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
replicas: 3
bootstrapServers: hm-kafka-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: hm-kafka-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: hm-iot-db-credentials-volume
secret:
secretName: hm-iot-db-credentials
我通过以下方式安装了Redpanda Console:
helm upgrade \
redpanda-console \
console \
--install \
--repo=https://charts.redpanda.com \
--namespace=hm-redpanda-console \
--create-namespace \
--values=my-values.yaml
my-values.yaml:
console:
config:
kafka:
brokers:
- hm-kafka-kafka-bootstrap.hm-kafka.svc:9092
schemaRegistry:
enabled: true
urls:
- http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081
connect:
enabled: true
clusters:
- name: hm-kafka-iot-kafka-connect
url: http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083
然而,我在Redpanda Console的日志中遇到了连接我的Kafka Connect(由Strimzi创建)的错误:
{ "level": "info", "ts": "2023-05-06T05:12:24.156Z", "msg": "started Redpanda Console", "version": "v2.2.3", "built_at": "1679491937" }
{ "level": "info", "ts": "2023-05-06T05:12:24.156Z", "msg": "connecting to Kafka seed brokers, trying to fetch cluster metadata" }
{ "level": "info", "ts": "2023-05-06T05:12:24.165Z", "msg": "successfully connected to kafka cluster", "advertised_broker_count": 1, "topic_count": 12, "controller_id": 0, "kafka_version": "v3.4" }
{ "level": "info", "ts": "2023-05-06T05:12:24.165Z", "msg": "creating schema registry client and testing connectivity" }
{ "level": "info", "ts": "2023-05-06T05:12:24.175Z", "msg": "successfully tested schema registry connectivity" }
{ "level": "info", "ts": "2023-05-06T05:12:24.176Z", "msg": "creating Kafka connect HTTP clients and testing connectivity to all clusters" }
{ "level": "warn", "ts": "2023-05-06T05:12:24.182Z", "msg": "connect cluster is not reachable", "cluster_name": "hm-kafka-iot-kafka-connect", "cluster_address": "http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083", "error": "Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused" }
{ "level": "info", "ts": "2023-05-06T05:12:24.182Z", "msg": "tested Kafka connect cluster connectivity", "successful_clusters": 0, "failed_clusters": 1 }
{ "level": "info", "ts": "2023-05-06T05:12:24.182Z", "msg": "successfully create Kafka connect service" }
{ "level": "info", "ts": "2023-05-06T05:12:24.552Z", "msg": "Server listening on address", "address": "[::]:8080", "port": 8080 }
{ "level": "warn", "ts": "2023-05-06T05:13:05.781Z", "msg": "failed to list connectors from Kafka connect cluster", "cluster_name": "hm-kafka-iot-kafka-connect", "cluster_address": "http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083", "error": "Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/connectors?expand=info&expand=status\": dial tcp 10.43.193.144:8083: connect: connection refused" }
{ "level": "error", "ts": "2023-05-06T05:13:05.807Z", "msg": "Sending REST error", "cluster_name": "hm-kafka-iot-kafka-connect", "route": "/api/kafka-connect/clusters/hm-kafka-iot-kafka-connect", "method": "GET", "status_code": 503, "remote_address": "127.0.0.1:43902", "public_error": "Failed to get cluster info: Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused", "error": "Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused" }
在Redpanda Console的UI中,显示如下:
然而,如果我在本地进行port-forward
到Kafka Connect,我可以通过以下方式访问它:
# 打开第一个终端
kubectl port-forward service/hm-kafka-iot
<details>
<summary>英文:</summary>
I am trying to access Kafka Connect created by [Strimzi](https://github.com/strimzi/strimzi-kafka-operator) through [Redpanda Console](https://github.com/redpanda-data/console).
I installed my Kafka Connect by
```shell
kubectl apply --filename=hm-kafka-iot-kafka-connect.yaml
hm-kafka-iot-kafka-connect.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: hm-kafka-iot-kafka-connect
namespace: hm-kafka
annotations:
strimzi.io/use-connector-resources: "true"
spec:
image: ghcr.io/hongbo-miao/hm-kafka-iot-kafka-connect:latest
replicas: 3
bootstrapServers: hm-kafka-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: hm-kafka-cluster-ca-cert
certificate: ca.crt
config:
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
externalConfiguration:
volumes:
- name: hm-iot-db-credentials-volume
secret:
secretName: hm-iot-db-credentials
I installed Redpanda Console by
helm upgrade \
redpanda-console \
console \
--install \
--repo=https://charts.redpanda.com \
--namespace=hm-redpanda-console \
--create-namespace \
--values=my-values.yaml
my-values.yaml:
console:
config:
kafka:
brokers:
- hm-kafka-kafka-bootstrap.hm-kafka.svc:9092
schemaRegistry:
enabled: true
urls:
- http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081
connect:
enabled: true
clusters:
- name: hm-kafka-iot-kafka-connect
url: http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083
However, I got error connecting my Kafka Connect which is created by Strimzi in the Redpanda Console pod log:
{"level":"info","ts":"2023-05-06T05:12:24.156Z","msg":"started Redpanda Console","version":"v2.2.3","built_at":"1679491937"}
{"level":"info","ts":"2023-05-06T05:12:24.156Z","msg":"connecting to Kafka seed brokers, trying to fetch cluster metadata"}
{"level":"info","ts":"2023-05-06T05:12:24.165Z","msg":"successfully connected to kafka cluster","advertised_broker_count":1,"topic_count":12,"controller_id":0,"kafka_version":"v3.4"}
{"level":"info","ts":"2023-05-06T05:12:24.165Z","msg":"creating schema registry client and testing connectivity"}
{"level":"info","ts":"2023-05-06T05:12:24.175Z","msg":"successfully tested schema registry connectivity"}
{"level":"info","ts":"2023-05-06T05:12:24.176Z","msg":"creating Kafka connect HTTP clients and testing connectivity to all clusters"}
{"level":"warn","ts":"2023-05-06T05:12:24.182Z","msg":"connect cluster is not reachable","cluster_name":"hm-kafka-iot-kafka-connect","cluster_address":"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083","error":"Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused"}
{"level":"info","ts":"2023-05-06T05:12:24.182Z","msg":"tested Kafka connect cluster connectivity","successful_clusters":0,"failed_clusters":1}
{"level":"info","ts":"2023-05-06T05:12:24.182Z","msg":"successfully create Kafka connect service"}
{"level":"info","ts":"2023-05-06T05:12:24.552Z","msg":"Server listening on address","address":"[::]:8080","port":8080}
{"level":"warn","ts":"2023-05-06T05:13:05.781Z","msg":"failed to list connectors from Kafka connect cluster","cluster_name":"hm-kafka-iot-kafka-connect","cluster_address":"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083","error":"Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/connectors?expand=info&expand=status\": dial tcp 10.43.193.144:8083: connect: connection refused"}
{"level":"error","ts":"2023-05-06T05:13:05.807Z","msg":"Sending REST error","cluster_name":"hm-kafka-iot-kafka-connect","route":"/api/kafka-connect/clusters/hm-kafka-iot-kafka-connect","method":"GET","status_code":503,"remote_address":"127.0.0.1:43902","public_error":"Failed to get cluster info: Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused","error":"Get \"http://hm-kafka-iot-kafka-connect-connect-api.hm-kafka:8083/\": dial tcp 10.43.193.144:8083: connect: connection refused"}
And in the Redpanda Console UI, it shows:
However, if I port-forward
Kafka Connect locally, I can reach to it by
# Open first terminal
kubectl port-forward service/hm-kafka-iot-kafka-connect-connect-api --namespace=hm-kafka 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083
Handling connection for 8083
# Open second terminal
➜ curl --location 'http://localhost:8083/connectors?expand=info&expand=status'
{"hm-motor-jdbc-sink-kafka-connector":{"info":{"name":"hm-motor-jdbc-sink-kafka-connector","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","table.name.format":"motor","transforms.convertTimestamp.target.type":"Timestamp","connection.password":"${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_password}","topics":"hm.motor","tasks.max":"8","batch.size":"100000","transforms":"convertTimestamp","transforms.convertTimestamp.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value","value.converter.schema.registry.url":"http://confluent-schema-registry.hm-confluent-schema-registry.svc:8081","transforms.convertTimestamp.field":"timestamp","connection.user":"${file:/opt/kafka/external-configuration/hm-iot-db-credentials-volume/iot-db-credentials.properties:timescaledb_user}","name":"hm-motor-jdbc-sink-kafka-connector","connection.url":"jdbc:postgresql://timescale.hm-timescale.svc:5432/hm_iot_db","value.converter":"io.confluent.connect.avro.AvroConverter","insert.mode":"insert","pk.mode":"record_value","pk.fields":"timestamp"},"tasks":[{"connector":"hm-motor-jdbc-sink-kafka-connector","task":0},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":1},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":2},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":3},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":4},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":5},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":6},{"connector":"hm-motor-jdbc-sink-kafka-connector","task":7}],"type":"sink"},"status":{"name":"hm-motor-jdbc-sink-kafka-connector","connector":{"state":"RUNNING","worker_id":"10.42.0.21:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"10.42.0.21:8083"},{"id":1,"state":"RUNNING","worker_id":"10.42.0.18:8083"},{"id":2,"state":"RUNNING","worker_id":"10.42.0.20:8083"},{"id":3,"state":"RUNNING","worker_id":"10.42.0.19:8083"},{"id":4,"state":"RUNNING","worker_id":"10.42.0.22:8083"},{"id":5,"state":"RUNNING","worker_id":"10.42.0.21:8083"},{"id":6,"state":"RUNNING","worker_id":"10.42.0.18:8083"},{"id":7,"state":"RUNNING","worker_id":"10.42.0.20:8083"}],"type":"sink"}}}
Any idea? Thanks!
答案1
得分: 0
解决方案是为Kafka Connect添加一个新的NetworkPolicy
。
背景
我的Redpanda Console的Pod
YAML如下所示
kubectl get pod redpanda-console-7fb65b7f5-87cxk -n hm-redpanda-console -o yam
apiVersion: v1
kind: Pod
metadata:
name: redpanda-console-77595c8f75-ltspx
namespace: hm-redpanda-console
labels:
app.kubernetes.io/name: console
# ...
我的Kafka Connect的NetworkPolicy
YAML(由Strimzi创建)如下所示
kubectl get networkpolicy hm-kafka-iot-kafka-connect-connect -n hm-kafka -o yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: hm-kafka-iot-kafka-connect-connect
namespace: hm-kafka
labels:
# ...
spec:
podSelector:
matchLabels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: hm-kafka-iot-kafka-connect-connect
policyTypes:
- Ingress
ingress:
- ports:
- protocol: TCP
port: 8083
from:
- podSelector:
matchLabels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: hm-kafka-iot-kafka-connect-connect
- podSelector:
matchLabels:
strimzi.io/kind: cluster-operator
# ...
解决方案
基于我现有的由Strimzi创建的Kafka Connect的NetworkPolicy
,我需要创建一个新的NetworkPolicy
。
> 注意:编辑现有的Kafka Connect NetworkPolicy
不起作用,因为Strimzi会回滚更改。
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: hm-kafka-iot-kafka-connect-network-policy
namespace: hm-kafka
spec:
podSelector:
matchLabels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: hm-kafka-iot-kafka-connect-connect
policyTypes:
- Ingress
ingress:
- ports:
- protocol: TCP
port: 8083
from:
- namespaceSelector:
matchLabels:
name: hm-redpanda-console
podSelector:
matchLabels:
app.kubernetes.io/name: console
在这里,我们允许具有标签app.kubernetes.io/name: console
的命名空间hm-redpanda-console
中的Pod访问Kafka Connect。
如果您想允许任何命名空间中具有标签app.kubernetes.io/name: console
的Pod访问Kafka Connect,您可以使用namespaceSelector: {}
:
# ...
from:
- namespaceSelector: {}
podSelector:
matchLabels:
app.kubernetes.io/name: console
现在我的Redpanda Console可以成功连接到我的Kafka Connect。
参考
-
https://github.com/strimzi/strimzi-kafka-operator/issues/3294
-
https://github.com/strimzi/strimzi-kafka-operator/issues/3201
英文:
The solution is adding a new NetworkPolcy
for Kafka Connect.
Background
My Redpanda Console Pod
YAML looks like
kubectl get pod redpanda-console-7fb65b7f5-87cxk -n hm-redpanda-console -o yam
apiVersion: v1
kind: Pod
metadata:
name: redpanda-console-77595c8f75-ltspx
namespace: hm-redpanda-console
labels:
app.kubernetes.io/name: console
# ...
My Kafka Connect NetworkPolicy
YAML (created by Strimzi) looks like
kubectl get networkpolicy hm-kafka-iot-kafka-connect-connect -n hm-kafka -o yaml
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: hm-kafka-iot-kafka-connect-connect
namespace: hm-kafka
labels:
# ...
spec:
podSelector:
matchLabels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: hm-kafka-iot-kafka-connect-connect
policyTypes:
- Ingress
ingress:
- ports:
- protocol: TCP
port: 8083
from:
- podSelector:
matchLabels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: hm-kafka-iot-kafka-connect-connect
- podSelector:
matchLabels:
strimzi.io/kind: cluster-operator
# ...
Solution
Based on my exiting Kafka Connect NetworkPolicy
created by Strimzi, I need create a new NetworkPolicy
.
> Note: editing existing Kafka Connect NetworkPolicy
does not work because Strimzi will revert the change.
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: hm-kafka-iot-kafka-connect-network-policy
namespace: hm-kafka
spec:
podSelector:
matchLabels:
strimzi.io/cluster: hm-kafka-iot-kafka-connect
strimzi.io/kind: KafkaConnect
strimzi.io/name: hm-kafka-iot-kafka-connect-connect
policyTypes:
- Ingress
ingress:
- ports:
- protocol: TCP
port: 8083
from:
- namespaceSelector:
matchLabels:
name: hm-redpanda-console
podSelector:
matchLabels:
app.kubernetes.io/name: console
Here we allows pod with label app.kubernetes.io/name: console
in namespace hm-redpanda-console
to access Kafka Connect.
If you want to allow pod with label app.kubernetes.io/name: console
in any namespace to access Kafka Connect, you can use namespaceSelector: {}
by:
# ...
from:
- namespaceSelector: {}
podSelector:
matchLabels:
app.kubernetes.io/name: console
And now my Redpanda Console can successfully connect my Kafka Connect:
Reference
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论