分享在Django应用程序上的Gunicorn工作进程之间的Pika连接单例对象。

huangapple go评论62阅读模式
英文:

Sharing Pika Connection Singleton Object between gunicron workers on django application

问题

我理解你想要翻译代码部分。以下是你提供的代码的翻译:

# 创建一个RabbitMQProducer类,用于连接RabbitMQ服务
class RabbitMQProducer(ABC):

    def __init__(self):

        self.credentials = pika.PlainCredentials(
            settings.RABBITMQCONFIG['username'],
            settings.RABBITMQCONFIG['password'],
        )

        self.parameters = pika.ConnectionParameters(
            host=settings.RABBITMQCONFIG['host'],
            port=settings.RABBITMQCONFIG['port'],
            virtual_host=settings.RABBITMQCONFIG['virtual_host'],
            credentials=self.credentials,
            heartbeat=600,
            blocked_connection_timeout=300,
            client_properties={
                'connection_name': self.get_connection_name(),
            }
        )

        self.connection = None
        self.channels = {}
        self.connect()

    def connect(self):
        if not self.connection or self.connection.is_closed:
            self.connection = BlockingConnection(self.parameters)
            self.close_channels()
            self.declare_channels()

    def declare_channels(self):
        for i in range(self.get_channel_count()):
            self.channels[i] = self.assign_channel()

    def send(self, message):
        try:
            self.connect()
            self.thread_safe_publish(message)
        except Exception as e:
            Log.e(f"Failed to send message to RabbitMQ: {e}")

    def assign_channel(self):
        if not self.connection or self.connection.is_closed:
            self.connect()
            return None
        channel = self.connection.channel(channel_number=None)
        channel.exchange_declare(
            exchange=self.get_rabbitmq_exchange_name(),
            exchange_type=self.get_rabbitmq_exchange_type(),
            durable=True,
        )
        return channel

    def thread_safe_publish(self, message):
        try:
            random_channel_number = CommonUtils.get_random_number(0, self.get_channel_count() - 1)
            channel = self.channels[random_channel_number]
            if not channel or channel.is_closed:
                channel = self.assign_channel()
                if channel:
                    self.channels[random_channel_number] = channel
            self.channels[random_channel_number].basic_publish(
                exchange=self.get_rabbitmq_exchange_name(),
                routing_key=self.get_rabbitmq_routing_key(),
                body=json.dumps(message),
                properties=pika.BasicProperties(
                    delivery_mode=2,  # make message persistent
                )
            )
            event_key = self.get_event_key()
            self.process_data_events(event_key)
        except Exception as e:
            Log.e(f"Failed to send message to RabbitMQ: {e}")

    def process_data_events(self, event_key):
        try:
            if not self.connection or self.connection.is_closed:
                self.connect()
            self.connection.process_data_events(time_limit=0)
            import time
            CacheUtils.set_key_payload(key=event_key, payload=int(TimeUtils.current_milli_time()))
        except Exception as e:
            Log.e(str(e))

    def close_channels(self):
        try:
            if self.channels:
                for key, channel in self.channels.items():
                    if channel.is_open:
                        channel.close()
        except Exception as e:
            Log.e(str(e))
        self.channels = {}

    @abstractmethod
    def get_rabbitmq_routing_key(self):
        pass

    @abstractmethod
    def get_rabbitmq_exchange_name(self):
        pass

    @abstractmethod
    def get_rabbitmq_exchange_type(self):
        pass

    @abstractmethod
    def get_queue_message_type(self):
        pass

    @abstractmethod
    def get_event_key(self):
        pass

    @abstractmethod
    def get_channel_count(self):
        pass

    @abstractmethod
    def get_connection_name(self):
        pass

# 创建一个EmailPublisher类,继承自RabbitMQProducer
class EmailPublisher(RabbitMQProducer):
    __singleton_instance = None

    @classmethod
    def instance(cls):
        # 检查是否存在单例实例
        if not cls.__singleton_instance:
            cls.__singleton_instance = EmailPublisher()
        # 返回单例实例
        return cls.__singleton_instance

    def get_rabbitmq_routing_key(self):
        return 'email.queue'

    def get_rabbitmq_exchange_name(self):
        return 'email_exchange'

    def get_rabbitmq_exchange_type(self):
        return "direct"

    def get_channel_count(self):
        return 5

    def get_connection_name(self):
        return 'email_connection'

希望这对你有帮助!如果有其他翻译需求,请随时提问。

英文:

Hi I am trying to connect to email queue from django application. I have created an EmailPublisher which will be a singleton object which extends RabbitMQProducer object which has actual code of connecting to rabbitmq service. Now I want to share the rabbitmq connection between gunicorn workers how can I do this? I am using python 3.7
with django.

class RabbitMQProducer(ABC):
def __init__(self):
self.credentials = pika.PlainCredentials(
settings.RABBITMQCONFIG['username'],
settings.RABBITMQCONFIG['password'],
)
self.parameters = pika.ConnectionParameters(
host=settings.RABBITMQCONFIG['host'],
port=settings.RABBITMQCONFIG['port'],
virtual_host=settings.RABBITMQCONFIG['virtual_host'],
credentials=self.credentials,
heartbeat=600,
blocked_connection_timeout=300,
client_properties={
'connection_name': self.get_connection_name(),
}
)
self.connection = None
self.channels = {}
self.connect()
def connect(self):
if not self.connection or self.connection.is_closed:
self.connection = BlockingConnection(self.parameters)
self.close_channels()
self.declare_channels()
def declare_channels(self):
for i in range(self.get_channel_count()):
self.channels[i] = self.assign_channel()
def send(self, message):
try:
self.connect()
self.thread_safe_publish(message)
except Exception as e:
Log.e(f"Failed to send message to RabbitMQ: {e}")
def assign_channel(self):
if not self.connection or self.connection.is_closed:
self.connect()
return None
channel = self.connection.channel(channel_number=None)
channel.exchange_declare(
exchange=self.get_rabbitmq_exchange_name(),
exchange_type=self.get_rabbitmq_exchange_type(),
durable=True,
)
return channel
def thread_safe_publish(self, message):
try:
random_channel_number = CommonUtils.get_random_number(0, self.get_channel_count() - 1)
channel = self.channels[random_channel_number]
if not channel or channel.is_closed:
channel = self.assign_channel()
if channel:
self.channels[random_channel_number] = channel
self.channels[random_channel_number].basic_publish(
exchange=self.get_rabbitmq_exchange_name(),
routing_key=self.get_rabbitmq_routing_key(),
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,  # make message persistent
)
)
event_key = self.get_event_key()
self.process_data_events(event_key)
except Exception as e:
Log.e(f"Failed to send message to RabbitMQ: {e}")
def process_data_events(self, event_key):
try:
if not self.connection or self.connection.is_closed:
self.connect()
self.connection.process_data_events(time_limit=0)
import time
CacheUtils.set_key_payload(key=event_key, payload=int(TimeUtils.current_milli_time()))
except Exception as e:
Log.e(str(e))
def close_channels(self):
try:
if self.channels:
for key, channel in self.channels.items():
if channel.is_open:
channel.close()
except Exception as e:
Log.e(str(e))
self.channels = {}
@abstractmethod
def get_rabbitmq_routing_key(self):
pass
@abstractmethod
def get_rabbitmq_exchange_name(self):
pass
@abstractmethod
def get_rabbitmq_exchange_type(self):
pass
@abstractmethod
def get_queue_message_type(self):
pass
@abstractmethod
def get_event_key(self):
pass
@abstractmethod
def get_channel_count(self):
pass
@abstractmethod
def get_connection_name(self):
pass
class EmailPublisher(RabbitMQProducer):
__singleton_instance = None
@classmethod
def instance(cls):
# check for the singleton instance
if not cls.__singleton_instance:
cls.__singleton_instance = EmailPublisher()
# return the singleton instance
return cls.__singleton_instance
def get_rabbitmq_routing_key(self):
return 'email.queue'
def get_rabbitmq_exchange_name(self):
return 'email_exchange'
def get_rabbitmq_exchange_type(self):
return "direct"
def get_channel_count(self):
return 5
def get_connection_name(self):
return 'email_connection'

答案1

得分: 1

现在我想在gunicorn工作进程之间共享rabbitmq连接,我该如何做到这一点?
你不能!
Gunicorn工作进程是独立的操作系统进程。每个工作进程必须有自己的连接到RabbitMQ。

<sub><b>注意:</b> RabbitMQ团队监控rabbitmq-users 邮件列表,并且有时在StackOverflow上回答问题。</sub>

英文:

> Now I want to share the rabbitmq connection between gunicorn workers how can I do this?

You can't!

Gunicorn workers are separate operating system processes. Every worker must have its own connection to RabbitMQ.


<sub><b>NOTE:</b> Team RabbitMQ monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.</sub>

huangapple
  • 本文由 发表于 2023年5月25日 16:32:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76330304.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定