英文:
Sharing Pika Connection Singleton Object between gunicron workers on django application
问题
我理解你想要翻译代码部分。以下是你提供的代码的翻译:
# 创建一个RabbitMQProducer类,用于连接RabbitMQ服务
class RabbitMQProducer(ABC):
def __init__(self):
self.credentials = pika.PlainCredentials(
settings.RABBITMQCONFIG['username'],
settings.RABBITMQCONFIG['password'],
)
self.parameters = pika.ConnectionParameters(
host=settings.RABBITMQCONFIG['host'],
port=settings.RABBITMQCONFIG['port'],
virtual_host=settings.RABBITMQCONFIG['virtual_host'],
credentials=self.credentials,
heartbeat=600,
blocked_connection_timeout=300,
client_properties={
'connection_name': self.get_connection_name(),
}
)
self.connection = None
self.channels = {}
self.connect()
def connect(self):
if not self.connection or self.connection.is_closed:
self.connection = BlockingConnection(self.parameters)
self.close_channels()
self.declare_channels()
def declare_channels(self):
for i in range(self.get_channel_count()):
self.channels[i] = self.assign_channel()
def send(self, message):
try:
self.connect()
self.thread_safe_publish(message)
except Exception as e:
Log.e(f"Failed to send message to RabbitMQ: {e}")
def assign_channel(self):
if not self.connection or self.connection.is_closed:
self.connect()
return None
channel = self.connection.channel(channel_number=None)
channel.exchange_declare(
exchange=self.get_rabbitmq_exchange_name(),
exchange_type=self.get_rabbitmq_exchange_type(),
durable=True,
)
return channel
def thread_safe_publish(self, message):
try:
random_channel_number = CommonUtils.get_random_number(0, self.get_channel_count() - 1)
channel = self.channels[random_channel_number]
if not channel or channel.is_closed:
channel = self.assign_channel()
if channel:
self.channels[random_channel_number] = channel
self.channels[random_channel_number].basic_publish(
exchange=self.get_rabbitmq_exchange_name(),
routing_key=self.get_rabbitmq_routing_key(),
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
event_key = self.get_event_key()
self.process_data_events(event_key)
except Exception as e:
Log.e(f"Failed to send message to RabbitMQ: {e}")
def process_data_events(self, event_key):
try:
if not self.connection or self.connection.is_closed:
self.connect()
self.connection.process_data_events(time_limit=0)
import time
CacheUtils.set_key_payload(key=event_key, payload=int(TimeUtils.current_milli_time()))
except Exception as e:
Log.e(str(e))
def close_channels(self):
try:
if self.channels:
for key, channel in self.channels.items():
if channel.is_open:
channel.close()
except Exception as e:
Log.e(str(e))
self.channels = {}
@abstractmethod
def get_rabbitmq_routing_key(self):
pass
@abstractmethod
def get_rabbitmq_exchange_name(self):
pass
@abstractmethod
def get_rabbitmq_exchange_type(self):
pass
@abstractmethod
def get_queue_message_type(self):
pass
@abstractmethod
def get_event_key(self):
pass
@abstractmethod
def get_channel_count(self):
pass
@abstractmethod
def get_connection_name(self):
pass
# 创建一个EmailPublisher类,继承自RabbitMQProducer
class EmailPublisher(RabbitMQProducer):
__singleton_instance = None
@classmethod
def instance(cls):
# 检查是否存在单例实例
if not cls.__singleton_instance:
cls.__singleton_instance = EmailPublisher()
# 返回单例实例
return cls.__singleton_instance
def get_rabbitmq_routing_key(self):
return 'email.queue'
def get_rabbitmq_exchange_name(self):
return 'email_exchange'
def get_rabbitmq_exchange_type(self):
return "direct"
def get_channel_count(self):
return 5
def get_connection_name(self):
return 'email_connection'
希望这对你有帮助!如果有其他翻译需求,请随时提问。
英文:
Hi I am trying to connect to email queue from django application. I have created an EmailPublisher which will be a singleton object which extends RabbitMQProducer object which has actual code of connecting to rabbitmq service. Now I want to share the rabbitmq connection between gunicorn workers how can I do this? I am using python 3.7
with django.
class RabbitMQProducer(ABC):
def __init__(self):
self.credentials = pika.PlainCredentials(
settings.RABBITMQCONFIG['username'],
settings.RABBITMQCONFIG['password'],
)
self.parameters = pika.ConnectionParameters(
host=settings.RABBITMQCONFIG['host'],
port=settings.RABBITMQCONFIG['port'],
virtual_host=settings.RABBITMQCONFIG['virtual_host'],
credentials=self.credentials,
heartbeat=600,
blocked_connection_timeout=300,
client_properties={
'connection_name': self.get_connection_name(),
}
)
self.connection = None
self.channels = {}
self.connect()
def connect(self):
if not self.connection or self.connection.is_closed:
self.connection = BlockingConnection(self.parameters)
self.close_channels()
self.declare_channels()
def declare_channels(self):
for i in range(self.get_channel_count()):
self.channels[i] = self.assign_channel()
def send(self, message):
try:
self.connect()
self.thread_safe_publish(message)
except Exception as e:
Log.e(f"Failed to send message to RabbitMQ: {e}")
def assign_channel(self):
if not self.connection or self.connection.is_closed:
self.connect()
return None
channel = self.connection.channel(channel_number=None)
channel.exchange_declare(
exchange=self.get_rabbitmq_exchange_name(),
exchange_type=self.get_rabbitmq_exchange_type(),
durable=True,
)
return channel
def thread_safe_publish(self, message):
try:
random_channel_number = CommonUtils.get_random_number(0, self.get_channel_count() - 1)
channel = self.channels[random_channel_number]
if not channel or channel.is_closed:
channel = self.assign_channel()
if channel:
self.channels[random_channel_number] = channel
self.channels[random_channel_number].basic_publish(
exchange=self.get_rabbitmq_exchange_name(),
routing_key=self.get_rabbitmq_routing_key(),
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
event_key = self.get_event_key()
self.process_data_events(event_key)
except Exception as e:
Log.e(f"Failed to send message to RabbitMQ: {e}")
def process_data_events(self, event_key):
try:
if not self.connection or self.connection.is_closed:
self.connect()
self.connection.process_data_events(time_limit=0)
import time
CacheUtils.set_key_payload(key=event_key, payload=int(TimeUtils.current_milli_time()))
except Exception as e:
Log.e(str(e))
def close_channels(self):
try:
if self.channels:
for key, channel in self.channels.items():
if channel.is_open:
channel.close()
except Exception as e:
Log.e(str(e))
self.channels = {}
@abstractmethod
def get_rabbitmq_routing_key(self):
pass
@abstractmethod
def get_rabbitmq_exchange_name(self):
pass
@abstractmethod
def get_rabbitmq_exchange_type(self):
pass
@abstractmethod
def get_queue_message_type(self):
pass
@abstractmethod
def get_event_key(self):
pass
@abstractmethod
def get_channel_count(self):
pass
@abstractmethod
def get_connection_name(self):
pass
class EmailPublisher(RabbitMQProducer):
__singleton_instance = None
@classmethod
def instance(cls):
# check for the singleton instance
if not cls.__singleton_instance:
cls.__singleton_instance = EmailPublisher()
# return the singleton instance
return cls.__singleton_instance
def get_rabbitmq_routing_key(self):
return 'email.queue'
def get_rabbitmq_exchange_name(self):
return 'email_exchange'
def get_rabbitmq_exchange_type(self):
return "direct"
def get_channel_count(self):
return 5
def get_connection_name(self):
return 'email_connection'
答案1
得分: 1
现在我想在gunicorn工作进程之间共享rabbitmq连接,我该如何做到这一点?
你不能!
Gunicorn工作进程是独立的操作系统进程。每个工作进程必须有自己的连接到RabbitMQ。
<sub><b>注意:</b> RabbitMQ团队监控rabbitmq-users
邮件列表,并且有时在StackOverflow上回答问题。</sub>
英文:
> Now I want to share the rabbitmq connection between gunicorn workers how can I do this?
You can't!
Gunicorn workers are separate operating system processes. Every worker must have its own connection to RabbitMQ.
<sub><b>NOTE:</b> Team RabbitMQ monitors the rabbitmq-users
mailing list and only sometimes answers questions on StackOverflow.</sub>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论