Sunday, May 18, 2014

Scalable IoT integration using Apache ActiveMQ and MQTT

I have been doing a lot of work on MQTT support in Apache ActiveMQ recently, starting with hardening and adding support for MQTT 3.1.1 in ActiveMQ for the MQTT Interop Day Event I mentioned in a previous post.

I like MQTT as a simple protocol for IoT. It's easy to implement in devices, and is not overly complicated as protocols go. However, as an experienced JMS architect, the first thing that struck me is that it uses the publish-subscribe model. And as I expected AT_LEAST_ONCE and EXACTLY_ONCE subscriptions in MQTT are mapped to durable subscriptions in ActiveMQ.

This means MQTT consumers are limited to creating a single subscription with a fixed client-id for those QoSs, if they don't want to have to deal with duplicates. Essentially it has the same limitation when it comes to scaling consumers for JMS Topics.

If you aren't already familiar with it, the ActiveMQ documentation describes the issue in more detail. The documentation there also describes the ActiveMQ Virtual Topics feature to solve this problem using logical Topics which are mapped to physical Queues. Messages on these Queues can then be load balanced across multiple connections and consumers without having to worry about duplicates.

Compared to durable subscriptions, Queues also make management and monitoring easier. For instance, monitoring tools can be used to raise an alert when Queue size becomes too large, signaling that messages are piling up in the Broker. This alert could also be used to create more consumer process instances, etc. Apache Camel JMS endpoints, and other JMS utilities such as JMS listeners in Spring Framework can automatically increase or decrease the number of consumers based on demand.

The same documentation page also describes ActiveMQ's Composite Destinations feature for routing messages, which can come in handy as a wire-tap for audit logs, etc.

I have recently submitted a couple of major fixes in AMQ-5160 and AMQ-5187. AMQ-5160 started as an issue with wildcard authorization in ActiveMQ, which I first fixed for non-retained messages. Dejan Bosanac's suggestion of using Subscription Recovery Policy for retained messages, together with my fix for AMQ-5187, now makes it possible to do what I wanted to be able to do early on when I started fixing issues in ActiveMQ MQTT transport, i.e. process MQTT messages using Virtual Topics.

Also, the fix for AMQ-5160 basically adds Retained Messages as a Broker level feature in ActiveMQ. So non-MQTT Topic clients can set the ActiveMQ.Retain boolean property to true to mark a message to be retained in the Topic, and the Broker sets the boolean property ActiveMQ.Retained to true to mark a message as having been recovered as a retained message in a Topic. Note that the Broker always uses RetainedMessageSubscriptionRecoveryPolicy and any user supplied policies are simply added to retained message recovery. So, the user doesn't have to do anything special in the configuration for retained message support.

Retained messages work for mapped JMS Queues by recovering the retained message from the Virtual Topic for the first Queue consumer, so there are no duplicate recovered messages. The retained message will have the property ActiveMQ.Retained set to true.

The patches are waiting further testing and validation and should be applied to ActiveMQ trunk soon, to be included in the 5.10 release.

The highly scalable MQTT solution basically consists of MQTT producers sending messages using the MQTT protocol to ActiveMQ Virtual Topics, which are configured trivially using name patterns. These Virtual Topics are mapped to Queue names used by regular ActiveMQ Java JMS consumers. MQTT messages are mapped to JMS BytesMessages. Java developers should be happy to be able to use their favorite language on the server/consumer side.

Although the ActiveMQ Broker completely manages the QoS flow with the MQTT producer, the JMS BytesMessage will have the property ActiveMQ.MQTT.QoS set to sender's QoS. The JMS consumer does not have to do anything special with it, besides the standard JMS message acknowledgement. This property can also be used by JMS producers as the MQTT QoS for MQTT consumers. Also, JMS consumers can use JMS transactions to include other transactional resources such as databases, either using Idempotent Consumers or in the worst case, XA transactions.

Hopefully, users will have as much fun using these new capabilities in Apache ActiveMQ as I have had developing them. Cheers and good luck with your super scalable MQTT deployments with ActiveMQ.