英文:
Is there a way to use kafka consumers and producers in the same Python file using confluent_kafka
问题
以下是您要翻译的部分:
Script1.py
from confluent_kafka import Consumer, Producer
from dotenv import load_dotenv
import os
load_dotenv(".env")
#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")
#Set up the Kafka producer
p = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
})
c = Consumer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
'group.id': 'script1-group',
'enable.auto.commit': False,
'auto.offset.reset': 'latest',
})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
p.poll(0)
data = 'temperature'
p.produce('script2', data.encode('utf-8'), callback=delivery_report)
p.flush()
c.subscribe(['script1'])
x = True
while x == True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
temp = msg.value().decode('utf-8')
print("The temperature is " + temp)
x = False
Script2.py
from confluent_kafka import Consumer, Producer
from dotenv import load_dotenv
import os
load_dotenv(".env")
#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")
#Set up the Kafka producer
p = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
})
c = Consumer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
'group.id': 'script2Group',
'enable.auto.commit': False,
'auto.offset.reset': 'latest',
})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
c.subscribe(['script2'])
x = True
while x == True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
if msg.value().decode('utf-8') == 'temperature':
p.poll(0)
data = "20 C"
p.produce('script1', data.encode('utf-8'), callback=delivery_report)
print("Temperature sent to Script 1")
p.flush()
x = False
英文:
I am currently trying to allow for two scripts to communicate in Python using confluent_kafka.<br>
The idea is that script 1 produces a message to script 2 for a temperature reading. Script 2 consumes the message from s1 and produces the current temperature back to script 1 and then prints the temperature to the console.<br>
The code below has been very finicky for me and works sometimes but most of the time script2.py will run but consume no messages. I was wondering if this is possible or if there are better alternatives for communicating between 2 different python scripts over the internet.
Script1.py
from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os
load_dotenv(".env")
#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")
#Set up the Kafka producer
p = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
})
c = Consumer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
'group.id': 'script1-group',
'enable.auto.commit': False,
'auto.offset.reset': 'latest',
})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
p.poll(0)
data = 'temperature'
p.produce('script2', data.encode('utf-8'), callback=delivery_report)
p.flush()
c.subscribe(['script1'])
x = True
while x == True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
temp = msg.value().decode('utf-8')
print("The temperature is " + temp)
x = False
Script2.py
from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os
load_dotenv(".env")
#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")
#Set up the Kafka producer
p = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
})
c = Consumer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': sasl_user_name,
'sasl.password': sasl_password,
'group.id': 'script2Group',
'enable.auto.commit': False,
'auto.offset.reset': 'latest',
})
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
c.subscribe(['script2'])
x = True
while x == True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
if msg.value().decode('utf-8') == 'temperature':
p.poll(0)
data = "20 C"
p.produce('script1', data.encode('utf-8'), callback=delivery_report)
print("Temperature sent to Script 1")
p.flush()
x = False
答案1
得分: 2
> 在两个不同的Python脚本之间进行互联网通信的更好选择。
您可以使用gRPC或HTTP,而不需要Kafka或任何外部消息队列...
您所做的事情与Kafka Streams的设计非常相似,另一个构建在该Python库之上的库Fluvii也能够实现这一功能。
关于您的代码,您的脚本在消费第一条消息时就停止了。这不是使用Kafka的推荐方式。消费者至少应该一直运行。
您还禁用了偏移量提交,并且没有手动提交。这与auto.offset.reset=latest
结合使用意味着您永远不会从主题中读取任何现有数据;因此,只有在启动script1之前启动script2时才能正常工作。
英文:
> better alternatives for communicating between 2 different python scripts over the internet.
You could use gPRC or HTTP, and not need Kafka or any external message queue...
What you're doing is very similar to Kafka Steams design, which another library Fluvii, built on that python library is capable of doing.
Regarding your code, your scripts stop on the first consumed message. This is not recommended way to use Kafka. Consumers, at least, should always be long-running.
You also have disabled offset commits, and are not committing on your own. This, combined with auto.offset.reset=latest
means you will never read any existing data from either topic; and so, it'll only work when you start script2 before script1
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论