分享在Django应用程序上的Gunicorn工作进程之间的Pika连接单例对象。

huangapple go评论96阅读模式
英文:

Sharing Pika Connection Singleton Object between gunicron workers on django application

问题

我理解你想要翻译代码部分。以下是你提供的代码的翻译:

  1. # 创建一个RabbitMQProducer类,用于连接RabbitMQ服务
  2. class RabbitMQProducer(ABC):
  3. def __init__(self):
  4. self.credentials = pika.PlainCredentials(
  5. settings.RABBITMQCONFIG['username'],
  6. settings.RABBITMQCONFIG['password'],
  7. )
  8. self.parameters = pika.ConnectionParameters(
  9. host=settings.RABBITMQCONFIG['host'],
  10. port=settings.RABBITMQCONFIG['port'],
  11. virtual_host=settings.RABBITMQCONFIG['virtual_host'],
  12. credentials=self.credentials,
  13. heartbeat=600,
  14. blocked_connection_timeout=300,
  15. client_properties={
  16. 'connection_name': self.get_connection_name(),
  17. }
  18. )
  19. self.connection = None
  20. self.channels = {}
  21. self.connect()
  22. def connect(self):
  23. if not self.connection or self.connection.is_closed:
  24. self.connection = BlockingConnection(self.parameters)
  25. self.close_channels()
  26. self.declare_channels()
  27. def declare_channels(self):
  28. for i in range(self.get_channel_count()):
  29. self.channels[i] = self.assign_channel()
  30. def send(self, message):
  31. try:
  32. self.connect()
  33. self.thread_safe_publish(message)
  34. except Exception as e:
  35. Log.e(f"Failed to send message to RabbitMQ: {e}")
  36. def assign_channel(self):
  37. if not self.connection or self.connection.is_closed:
  38. self.connect()
  39. return None
  40. channel = self.connection.channel(channel_number=None)
  41. channel.exchange_declare(
  42. exchange=self.get_rabbitmq_exchange_name(),
  43. exchange_type=self.get_rabbitmq_exchange_type(),
  44. durable=True,
  45. )
  46. return channel
  47. def thread_safe_publish(self, message):
  48. try:
  49. random_channel_number = CommonUtils.get_random_number(0, self.get_channel_count() - 1)
  50. channel = self.channels[random_channel_number]
  51. if not channel or channel.is_closed:
  52. channel = self.assign_channel()
  53. if channel:
  54. self.channels[random_channel_number] = channel
  55. self.channels[random_channel_number].basic_publish(
  56. exchange=self.get_rabbitmq_exchange_name(),
  57. routing_key=self.get_rabbitmq_routing_key(),
  58. body=json.dumps(message),
  59. properties=pika.BasicProperties(
  60. delivery_mode=2, # make message persistent
  61. )
  62. )
  63. event_key = self.get_event_key()
  64. self.process_data_events(event_key)
  65. except Exception as e:
  66. Log.e(f"Failed to send message to RabbitMQ: {e}")
  67. def process_data_events(self, event_key):
  68. try:
  69. if not self.connection or self.connection.is_closed:
  70. self.connect()
  71. self.connection.process_data_events(time_limit=0)
  72. import time
  73. CacheUtils.set_key_payload(key=event_key, payload=int(TimeUtils.current_milli_time()))
  74. except Exception as e:
  75. Log.e(str(e))
  76. def close_channels(self):
  77. try:
  78. if self.channels:
  79. for key, channel in self.channels.items():
  80. if channel.is_open:
  81. channel.close()
  82. except Exception as e:
  83. Log.e(str(e))
  84. self.channels = {}
  85. @abstractmethod
  86. def get_rabbitmq_routing_key(self):
  87. pass
  88. @abstractmethod
  89. def get_rabbitmq_exchange_name(self):
  90. pass
  91. @abstractmethod
  92. def get_rabbitmq_exchange_type(self):
  93. pass
  94. @abstractmethod
  95. def get_queue_message_type(self):
  96. pass
  97. @abstractmethod
  98. def get_event_key(self):
  99. pass
  100. @abstractmethod
  101. def get_channel_count(self):
  102. pass
  103. @abstractmethod
  104. def get_connection_name(self):
  105. pass
  106. # 创建一个EmailPublisher类,继承自RabbitMQProducer
  107. class EmailPublisher(RabbitMQProducer):
  108. __singleton_instance = None
  109. @classmethod
  110. def instance(cls):
  111. # 检查是否存在单例实例
  112. if not cls.__singleton_instance:
  113. cls.__singleton_instance = EmailPublisher()
  114. # 返回单例实例
  115. return cls.__singleton_instance
  116. def get_rabbitmq_routing_key(self):
  117. return 'email.queue'
  118. def get_rabbitmq_exchange_name(self):
  119. return 'email_exchange'
  120. def get_rabbitmq_exchange_type(self):
  121. return "direct"
  122. def get_channel_count(self):
  123. return 5
  124. def get_connection_name(self):
  125. return 'email_connection'

希望这对你有帮助!如果有其他翻译需求,请随时提问。

英文:

Hi I am trying to connect to email queue from django application. I have created an EmailPublisher which will be a singleton object which extends RabbitMQProducer object which has actual code of connecting to rabbitmq service. Now I want to share the rabbitmq connection between gunicorn workers how can I do this? I am using python 3.7
with django.

  1. class RabbitMQProducer(ABC):
  2. def __init__(self):
  3. self.credentials = pika.PlainCredentials(
  4. settings.RABBITMQCONFIG['username'],
  5. settings.RABBITMQCONFIG['password'],
  6. )
  7. self.parameters = pika.ConnectionParameters(
  8. host=settings.RABBITMQCONFIG['host'],
  9. port=settings.RABBITMQCONFIG['port'],
  10. virtual_host=settings.RABBITMQCONFIG['virtual_host'],
  11. credentials=self.credentials,
  12. heartbeat=600,
  13. blocked_connection_timeout=300,
  14. client_properties={
  15. 'connection_name': self.get_connection_name(),
  16. }
  17. )
  18. self.connection = None
  19. self.channels = {}
  20. self.connect()
  21. def connect(self):
  22. if not self.connection or self.connection.is_closed:
  23. self.connection = BlockingConnection(self.parameters)
  24. self.close_channels()
  25. self.declare_channels()
  26. def declare_channels(self):
  27. for i in range(self.get_channel_count()):
  28. self.channels[i] = self.assign_channel()
  29. def send(self, message):
  30. try:
  31. self.connect()
  32. self.thread_safe_publish(message)
  33. except Exception as e:
  34. Log.e(f"Failed to send message to RabbitMQ: {e}")
  35. def assign_channel(self):
  36. if not self.connection or self.connection.is_closed:
  37. self.connect()
  38. return None
  39. channel = self.connection.channel(channel_number=None)
  40. channel.exchange_declare(
  41. exchange=self.get_rabbitmq_exchange_name(),
  42. exchange_type=self.get_rabbitmq_exchange_type(),
  43. durable=True,
  44. )
  45. return channel
  46. def thread_safe_publish(self, message):
  47. try:
  48. random_channel_number = CommonUtils.get_random_number(0, self.get_channel_count() - 1)
  49. channel = self.channels[random_channel_number]
  50. if not channel or channel.is_closed:
  51. channel = self.assign_channel()
  52. if channel:
  53. self.channels[random_channel_number] = channel
  54. self.channels[random_channel_number].basic_publish(
  55. exchange=self.get_rabbitmq_exchange_name(),
  56. routing_key=self.get_rabbitmq_routing_key(),
  57. body=json.dumps(message),
  58. properties=pika.BasicProperties(
  59. delivery_mode=2, # make message persistent
  60. )
  61. )
  62. event_key = self.get_event_key()
  63. self.process_data_events(event_key)
  64. except Exception as e:
  65. Log.e(f"Failed to send message to RabbitMQ: {e}")
  66. def process_data_events(self, event_key):
  67. try:
  68. if not self.connection or self.connection.is_closed:
  69. self.connect()
  70. self.connection.process_data_events(time_limit=0)
  71. import time
  72. CacheUtils.set_key_payload(key=event_key, payload=int(TimeUtils.current_milli_time()))
  73. except Exception as e:
  74. Log.e(str(e))
  75. def close_channels(self):
  76. try:
  77. if self.channels:
  78. for key, channel in self.channels.items():
  79. if channel.is_open:
  80. channel.close()
  81. except Exception as e:
  82. Log.e(str(e))
  83. self.channels = {}
  84. @abstractmethod
  85. def get_rabbitmq_routing_key(self):
  86. pass
  87. @abstractmethod
  88. def get_rabbitmq_exchange_name(self):
  89. pass
  90. @abstractmethod
  91. def get_rabbitmq_exchange_type(self):
  92. pass
  93. @abstractmethod
  94. def get_queue_message_type(self):
  95. pass
  96. @abstractmethod
  97. def get_event_key(self):
  98. pass
  99. @abstractmethod
  100. def get_channel_count(self):
  101. pass
  102. @abstractmethod
  103. def get_connection_name(self):
  104. pass
  105. class EmailPublisher(RabbitMQProducer):
  106. __singleton_instance = None
  107. @classmethod
  108. def instance(cls):
  109. # check for the singleton instance
  110. if not cls.__singleton_instance:
  111. cls.__singleton_instance = EmailPublisher()
  112. # return the singleton instance
  113. return cls.__singleton_instance
  114. def get_rabbitmq_routing_key(self):
  115. return 'email.queue'
  116. def get_rabbitmq_exchange_name(self):
  117. return 'email_exchange'
  118. def get_rabbitmq_exchange_type(self):
  119. return "direct"
  120. def get_channel_count(self):
  121. return 5
  122. def get_connection_name(self):
  123. return 'email_connection'

答案1

得分: 1

现在我想在gunicorn工作进程之间共享rabbitmq连接,我该如何做到这一点?
你不能!
Gunicorn工作进程是独立的操作系统进程。每个工作进程必须有自己的连接到RabbitMQ。

<sub><b>注意:</b> RabbitMQ团队监控rabbitmq-users 邮件列表,并且有时在StackOverflow上回答问题。</sub>

英文:

> Now I want to share the rabbitmq connection between gunicorn workers how can I do this?

You can't!

Gunicorn workers are separate operating system processes. Every worker must have its own connection to RabbitMQ.


<sub><b>NOTE:</b> Team RabbitMQ monitors the rabbitmq-users mailing list and only sometimes answers questions on StackOverflow.</sub>

huangapple
  • 本文由 发表于 2023年5月25日 16:32:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76330304.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定