Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
683 views
in Technique[技术] by (71.8m points)

activemq - Python stomp.py connection gets disconnected and listener stops working

I am writing a python script using the python stomp library to connect and subscribe to an ActiveMQ message queue.

My code is very similar to the examples in the documentation "Dealing with disconnects" with the addition of the timer being placed in a loop for a long running listener.

The listener class is working to receive and process messages. However after a few minutes, the connection gets disconnected and then the listener stops picking up messages.

Problem:

The on_disconnected method is getting called which runs the connect_and_subscribe() method, however it seems the listener stops working after this happens. Perhaps the listener needs to be re-initialized? After the script is run again, the listener is re-created, it starts picking up messages again, but this is not practical to keep running the script again periodically.

Question 1: How can I set this up to re-connect and re-create the listener automatically?

Question 2: Is there a better way to initialize a long-running listener rather than the timeout loop?

import os, time, datetime, stomp

_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
sub_id = 1

def connect_and_subscribe(conn):
  conn.connect(_user, _password, wait=True)
  conn.subscribe(destination=_queue, id=sub_id, ack='client-individual')
  print('connect_and_subscribe connecting {} to with connection id {}'.format(_queue, sub_id), flush=True)

class MqListener(stomp.ConnectionListener):
  def __init__(self, conn):
    self.conn = conn
    self.sub_id = sub_id
    print('MqListener init')

  def on_error(self, frame):
    print('received an error "%s"' % frame.body)

  def on_message(self, headers, body):
    print('received a message headers "%s"' % headers)
    print('message body "%s"' % body)
    time.sleep(1)
    print('processed message')
    print('Acknowledging')
    self.conn.ack(headers['message-id'], self.sub_id)

  def on_disconnected(self):
    print('disconnected! reconnecting...')
    connect_and_subscribe(self.conn)

def initialize_mqlistener():
  conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
  conn.set_listener('', MqListener(conn))
  connect_and_subscribe(conn)
  # https://github.com/jasonrbriggs/stomp.py/issues/206
  while conn.is_connected():
    time.sleep(2)
  conn.disconnect()

if __name__ == '__main__':
  initialize_mqlistener()
question from:https://stackoverflow.com/questions/65838058/python-stomp-py-connection-gets-disconnected-and-listener-stops-working

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I was able to solve this issue by refactoring the retry attempts loop and the on_error handler. Also, I have installed and configured supervisor in the docker container to run and manage the listener process. That way if the listener program stops it will be automatically restarted by the supervisor process manager.

Updated python stomp listener script

init_listener.py

import os, json, time, datetime, stomp

_host = os.getenv('MQ_HOST')
_port = os.getenv('MQ_PORT')
_user = os.getenv('MQ_USER')
_password = os.getenv('MQ_PASSWORD')
# The listener will listen for messages that are relevant to this specific worker
# Queue name must match the 'worker_type' in job tracker file
_queue = os.getenv('QUEUE_NAME')
# Subscription id is unique to the subscription in this case there is only one subscription per connection
_sub_id = 1
_reconnect_attempts = 0
_max_attempts = 1000

def connect_and_subscribe(conn):
  global _reconnect_attempts
  _reconnect_attempts = _reconnect_attempts + 1
  if _reconnect_attempts <= _max_attempts:
    try:
      conn.connect(_user, _password, wait=True)
      print('connect_and_subscribe connecting {} to with connection id {} reconnect attempts: {}'.format(_queue, _sub_id, _reconnect_attempts), flush=True)
    except Exception as e:
      print('Exception on disconnect. reconnecting...')
      print(e)
      connect_and_subscribe(conn)
    else:
      conn.subscribe(destination=_queue, id=_sub_id, ack='client-individual')
      _reconnect_attempts = 0
  else:
    print('Maximum reconnect attempts reached for this connection. reconnect attempts: {}'.format(_reconnect_attempts), flush=True)

class MqListener(stomp.ConnectionListener):
  def __init__(self, conn):
    self.conn = conn
    self._sub_id = _sub_id
    print('MqListener init')

  def on_error(self, headers, body):
    print('received an error "%s"' % body)

  def on_message(self, headers, body):
    print('received a message headers "%s"' % headers)
    print('message body "%s"' % body)

    message_id = headers.get('message-id')
    message_data = json.loads(body)
    task_name = message_data.get('task_name')
    prev_status = message_data.get('previous_step_status')

    if prev_status == "success":
        print('CALLING DO TASK')
        resp = True
    else:
        print('CALLING REVERT TASK')
        resp = True
    if (resp):
        print('Ack message_id {}'.format(message_id))
        self.conn.ack(message_id, self._sub_id)
    else:
        print('NON Ack message_id {}'.format(message_id))
        self.conn.nack(message_id, self._sub_id)
    print('processed message message_id {}'.format(message_id))

  def on_disconnected(self):
    print('disconnected! reconnecting...')
    connect_and_subscribe(self.conn)

def initialize_mqlistener():
  conn = stomp.Connection([(_host, _port)], heartbeats=(4000, 4000))
  conn.set_listener('', MqListener(conn))
  connect_and_subscribe(conn)
  # https://github.com/jasonrbriggs/stomp.py/issues/206
  while True:
    time.sleep(2)
    if not conn.is_connected():
      print('Disconnected in loop, reconnecting')
      connect_and_subscribe(conn)

if __name__ == '__main__':
  initialize_mqlistener()

Supervisor installation and configuration

Dockerfile

Some details removed for brevity

# Install supervisor
RUN apt-get update && apt-get install -y supervisor

# Add the supervisor configuration file
ADD supervisord.conf /etc/supervisor/conf.d/supervisord.conf

# Start supervisor with the configuration file
CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/conf.d/supervisord.conf"]

supervisor.conf

[supervisord]
nodaemon=true
logfile=/home/exampleuser/logs/supervisord.log

[program:mqutils]
command=python3 init_listener.py
directory=/home/exampleuser/mqutils
user=exampleuser
autostart=true
autorestart=true

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...