Is there a way to use kafka consumers and producers in the same Python file using confluent_kafka

huangapple go评论69阅读模式
英文:

Is there a way to use kafka consumers and producers in the same Python file using confluent_kafka

问题

以下是您要翻译的部分:

Script1.py

from confluent_kafka import Consumer, Producer
from dotenv import load_dotenv
import os

load_dotenv(".env")

#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")

#Set up the Kafka producer
p = Producer({
      'bootstrap.servers': bootstrap_server,
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'PLAIN',
      'sasl.username': sasl_user_name,
      'sasl.password': sasl_password,
})

c = Consumer({
    'bootstrap.servers': bootstrap_server,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': sasl_user_name,
    'sasl.password': sasl_password,
    'group.id': 'script1-group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'latest',
    
})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

p.poll(0)
data = 'temperature'
p.produce('script2', data.encode('utf-8'), callback=delivery_report)
p.flush()

c.subscribe(['script1'])

x = True
while x == True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    temp = msg.value().decode('utf-8')
    print("The temperature is " + temp)
    x = False

Script2.py

from confluent_kafka import Consumer, Producer
from dotenv import load_dotenv
import os

load_dotenv(".env")

#Gathers sensitive data from the .env file
bootstrap_server = os.getenv("BOOTSTRAP_SERVER")
sasl_user_name = os.getenv("CLIENT_ID")
sasl_password = os.getenv("CLIENT_SECRET")

#Set up the Kafka producer
p = Producer({
      'bootstrap.servers': bootstrap_server,
      'security.protocol': 'SASL_SSL',
      'sasl.mechanisms': 'PLAIN',
      'sasl.username': sasl_user_name,
      'sasl.password': sasl_password,
})

c = Consumer({
    'bootstrap.servers': bootstrap_server,
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': sasl_user_name,
    'sasl.password': sasl_password,
    'group.id': 'script2Group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'latest',
    
})

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

c.subscribe(['script2'])

x = True
while x == True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    if msg.value().decode('utf-8') == 'temperature':
        p.poll(0)
        data = "20 C"
        p.produce('script1', data.encode('utf-8'), callback=delivery_report)
        print("Temperature sent to Script 1")
        p.flush()
        x = False
英文:

I am currently trying to allow for two scripts to communicate in Python using confluent_kafka.<br>

The idea is that script 1 produces a message to script 2 for a temperature reading. Script 2 consumes the message from s1 and produces the current temperature back to script 1 and then prints the temperature to the console.<br>

The code below has been very finicky for me and works sometimes but most of the time script2.py will run but consume no messages. I was wondering if this is possible or if there are better alternatives for communicating between 2 different python scripts over the internet.

Script1.py

from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os

load_dotenv(&quot;.env&quot;)

#Gathers sensitive data from the .env file
bootstrap_server = os.getenv(&quot;BOOTSTRAP_SERVER&quot;)
sasl_user_name = os.getenv(&quot;CLIENT_ID&quot;)
sasl_password = os.getenv(&quot;CLIENT_SECRET&quot;)

#Set up the Kafka producer
p = Producer({
      &#39;bootstrap.servers&#39;: bootstrap_server,
      &#39;security.protocol&#39;: &#39;SASL_SSL&#39;,
      &#39;sasl.mechanisms&#39;: &#39;PLAIN&#39;,
      &#39;sasl.username&#39;: sasl_user_name,
      &#39;sasl.password&#39;: sasl_password,
})

c = Consumer({
    &#39;bootstrap.servers&#39;: bootstrap_server,
    &#39;security.protocol&#39;: &#39;SASL_SSL&#39;,
    &#39;sasl.mechanisms&#39;: &#39;PLAIN&#39;,
    &#39;sasl.username&#39;: sasl_user_name,
    &#39;sasl.password&#39;: sasl_password,
    &#39;group.id&#39;: &#39;script1-group&#39;,
    &#39;enable.auto.commit&#39;: False,
    &#39;auto.offset.reset&#39;: &#39;latest&#39;,
    
})

def delivery_report(err, msg):
    if err is not None:
        print(&#39;Message delivery failed: {}&#39;.format(err))
    else:
        print(&#39;Message delivered to {} [{}]&#39;.format(msg.topic(), msg.partition()))



p.poll(0)
data = &#39;temperature&#39;
p.produce(&#39;script2&#39;, data.encode(&#39;utf-8&#39;), callback=delivery_report)
p.flush()

c.subscribe([&#39;script1&#39;])

x = True
while x == True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(&quot;Consumer error: {}&quot;.format(msg.error()))
        continue
    temp = msg.value().decode(&#39;utf-8&#39;)
    print(&quot;The temperature is &quot; + temp)
    x = False

Script2.py

from confluent_kafka import Consumer, Producer
#from confluent_kafka import Producer
from dotenv import load_dotenv
import os

load_dotenv(&quot;.env&quot;)

#Gathers sensitive data from the .env file
bootstrap_server = os.getenv(&quot;BOOTSTRAP_SERVER&quot;)
sasl_user_name = os.getenv(&quot;CLIENT_ID&quot;)
sasl_password = os.getenv(&quot;CLIENT_SECRET&quot;)

#Set up the Kafka producer
p = Producer({
      &#39;bootstrap.servers&#39;: bootstrap_server,
      &#39;security.protocol&#39;: &#39;SASL_SSL&#39;,
      &#39;sasl.mechanisms&#39;: &#39;PLAIN&#39;,
      &#39;sasl.username&#39;: sasl_user_name,
      &#39;sasl.password&#39;: sasl_password,
})

c = Consumer({
    &#39;bootstrap.servers&#39;: bootstrap_server,
    &#39;security.protocol&#39;: &#39;SASL_SSL&#39;,
    &#39;sasl.mechanisms&#39;: &#39;PLAIN&#39;,
    &#39;sasl.username&#39;: sasl_user_name,
    &#39;sasl.password&#39;: sasl_password,
    &#39;group.id&#39;: &#39;script2Group&#39;,
    &#39;enable.auto.commit&#39;: False,
    &#39;auto.offset.reset&#39;: &#39;latest&#39;,
    
})

def delivery_report(err, msg):
    if err is not None:
        print(&#39;Message delivery failed: {}&#39;.format(err))
    else:
        print(&#39;Message delivered to {} [{}]&#39;.format(msg.topic(), msg.partition()))

c.subscribe([&#39;script2&#39;])

x = True
while x == True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(&quot;Consumer error: {}&quot;.format(msg.error()))
        continue
    if msg.value().decode(&#39;utf-8&#39;) == &#39;temperature&#39;:
        p.poll(0)
        data = &quot;20 C&quot;
        p.produce(&#39;script1&#39;, data.encode(&#39;utf-8&#39;), callback=delivery_report)
        print(&quot;Temperature sent to Script 1&quot;)
        p.flush()
        x = False

答案1

得分: 2

> 在两个不同的Python脚本之间进行互联网通信的更好选择。

您可以使用gRPC或HTTP,而不需要Kafka或任何外部消息队列...


您所做的事情与Kafka Streams的设计非常相似,另一个构建在该Python库之上的库Fluvii也能够实现这一功能。


关于您的代码,您的脚本在消费第一条消息时就停止了。这不是使用Kafka的推荐方式。消费者至少应该一直运行。

您还禁用了偏移量提交,并且没有手动提交。这与auto.offset.reset=latest结合使用意味着您永远不会从主题中读取任何现有数据;因此,只有在启动script1之前启动script2时才能正常工作。

英文:

> better alternatives for communicating between 2 different python scripts over the internet.

You could use gPRC or HTTP, and not need Kafka or any external message queue...


What you're doing is very similar to Kafka Steams design, which another library Fluvii, built on that python library is capable of doing.


Regarding your code, your scripts stop on the first consumed message. This is not recommended way to use Kafka. Consumers, at least, should always be long-running.

You also have disabled offset commits, and are not committing on your own. This, combined with auto.offset.reset=latest means you will never read any existing data from either topic; and so, it'll only work when you start script2 before script1

huangapple
  • 本文由 发表于 2023年3月3日 21:35:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/75627757.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定