Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
625 views
in Technique[技术] by (71.8m points)

activemq artemis - MQTT subscription queues not being cleaned up

We're using ActiveMQ Artemis 2.8.1 with MQTT.

We integrated ActiveMQ Artemis with Wildfly. Consider our server connects using client id SAM on 50 topics. On checking Artemis with JConsole we can see that each client subscription results in a queue whose name follows the pattern <client-id>_<topic>. In my case consider topic as com/api/output which means the subscription queue name will be SAM_com/api/output. Likewise there will be 50 other subscription queues using the same naming pattern (i.e. SAM_<topic>).

My findings

Based on my research each queue is used to store messages sent to that topic for each client's subscription. For example, when the same topic (e.g 1/2/3) is subscribed to by 3 different clients (e.g A,B,& C) then there will be 3 subscription queues (i.e. A_1/2/3, B_1/2/3, & C_1/2/3). Therefore when a message is sent to the topic 1/2/3 Artemis will put that messages in the subscription queues A_1/2/3,B_1/2/3, C_1/2/3.

Actual problem

Now same client wants to connect to broker with different client id now (e.g. TOM). My Client initiates connection drop and Artemis also recognizes connection drop, then my client connects to broker with new client id (TOM) for the same 50 topics. Now there will be 100 subscription queue total with each topic having 2 (i.e. one for each clientid - SAM & TOM). I find the reason that SAM queues are maintained because while initiating the connection we use cleanSession as false. So all those subscription queues will be durable, hence the queues are maintained even if the client is disconnected.

When a message is sent to the topic it will be put in two queues (SAM & TOM). Our client is connected to broker with client id TOM so the TOM queue has consumer which results in all the TOM queue messages being consumed by the client. However, the SAM queue accumulates messages and eats up all JVM's heapspace until the server dies.

The purpose of durable queues is to maintain the message even when the client disconnects, but is there any way to tell ActiveMQ Artemis to purge the client's queues and messages if client doesn't show up for certain time period or to purge the messages from the client's subscription queue when client drops the connection?

Our broker.xml

<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>
      <security-enabled>true</security-enabled>


      <persistence-enabled>false</persistence-enabled>

      <!-- this could be ASYNCIO, MAPPED, NIO
           ASYNCIO: Linux Libaio
           MAPPED: mmap files
           NIO: Plain Java Files
       -->
      <journal-type>ASYNCIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-file-size>10M</journal-file-size>
      
      <journal-buffer-timeout>64000</journal-buffer-timeout>


      <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
       -->
      <journal-max-io>4096</journal-max-io>
      <!--
        You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
         <network-check-NIC>theNicName</network-check-NIC>
        -->

      <!--
        Use this to use an HTTP server to validate the network
         <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

      <!-- <network-check-period>10000</network-check-period> -->
      <!-- <network-check-timeout>1000</network-check-timeout> -->

      <!-- this is a comma separated list, no spaces, just DNS or IPs
           it should accept IPV6

           Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
      <!-- <network-check-list>10.0.0.1</network-check-list> -->

      <!-- use this to customize the ping used for ipv4 addresses -->
      <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

      <!-- use this to customize the ping used for ipv6 addresses -->
      <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->




      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
      <disk-scan-period>5000</disk-scan-period>

      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
           that won't support flow control. -->
      <max-disk-usage>99</max-disk-usage>

      <!-- should the broker detect dead locks and other issues -->
      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>
      

      <acceptors>

         <!-- useEpoll means: it will use Netty epoll if you are on a system (Linux) that supports it -->
         <!-- amqpCredits: The number of credits sent to AMQP producers -->
         <!-- amqpLowCredits: The server will send the # credits specified at amqpCredits at this low mark -->

         <!-- Note: If an acceptor needs to be compatible with HornetQ and/or Artemis 1.x clients add
                    "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to the acceptor url.
                    See https://issues.apache.org/jira/browse/ARTEMIS-1644 for more information. -->

         <!-- MQTT Acceptor -->
         <acceptor name="mqtt">tcp://0.0.0.0:${activeMQ-mqtt-port}?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,MQTT;useEpoll=true</acceptor>
         <acceptor name="netty-ssl-acceptor">tcp://0.0.0.0:${activeMQ-mqtts-port}?sslEnabled=true;keyStorePath=${activeMQ-keystore-path};keyStorePassword=${activeMQ-keyStore-password};protocols=CORE,MQTT</acceptor>

      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>

      <broker-plugins>
         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
            <property key="LOG_ALL_EVENTS" value="true"/>
            <property key="LOG_CONNECTION_EVENTS" value="true"/>
            <property key="LOG_SESSION_EVENTS" value="tru

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It's really up to the subscriber to delete its subscription when it is done with it. In other words, the MQTT client should unsubscribe the existing subscriber before creating a new one. The broker has no way to know whether or not the subscriber plans to reconnect later to fetch the messages from its subscription.

If you really want the broker to delete the subscription queue you can use the following address-settings:

  • <auto-delete-queues>: This must be true.
  • <auto-delete-queues-delay>: This is the delay (in milliseconds) from the time the last consumer disconnects until the broker deletes the subscription queue.
  • <auto-delete-queues-message-count>: This must be -1 in order to ignore any messages in the subscription queue.

To be clear, I would recommend against this configuration as the broker may inadvertently delete legitimate messages. For example, if a subscriber mistakenly disconnects (e.g. due to a network failure, hardware failure, JVM crash, etc.) for longer than the configured <auto-delete-queues-delay> then the broker will delete its subscription queue and all the subscriber's messages will effectively be lost.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...