Saturday, May 10, 2014

MQTT Transport Architecture : WSO2 MB 3.x

Message Queuing and Telemetry Transport (MQTT) has found its way to be one of the protocols I've being exploring for some time. Keep your figures crossed, upcoming WSO2 Message Broker (MB) 3.x.x release will be supporting MQTT in addition to AMQP. 

MQTT in a Nutshell 

Focusing on Machine to Machine (M2M) paradigms. let's take a minute to picture how telemetry data are being exchanged between one system to the other. These systems use an integration layer to communicate back and forth. MQTT is introduced as a lightweight protocol to be applied on such integration use cases. 

MQTT as a protocol is lightweight in comparison to other queuing protocols, allowing systems to exchange information in a highly efficient manner. MQTT defines the reliability of message delivery through three levels of QoS. 

QoS 0 - > This level will ensure that the message delivery is efficient, however its not always guaranteed that the message will be delivered to its recipient/s. For instance, this level of QoS is applicable when it comes to data which is being exchanged through sensors where the published messages will be redundant and where efficiency is prioritized over reliability.  

QoS 1 -> This level will ensure that the message is delivered at least once. But messages can be duplicated. This will be a more guaranteed approach in comparison to QoS 0 but will not be efficient as much as it.   

QoS 2 ->  This is the highest level of QoS. This will guarantee that the message is delivered and its delivered only once. This option might not be efficient as much as the other levels of QoS. For use cases such as transaction done through an ATMs where atomicity is of concern, this level of QoS will be applicable. 

For more information on the protocol please refer  MQTT Specification 3.1 which will illustrate the protocol in detail.  

MB Transport Architecture  

WSO2 MB Architecture is designed such that the core of it  allows queues/topics to be globalized as well as scale across several instances. Moquette java based MQTT server implementation was partially adopted and integrated to the MB architecture to support MQTT in a scale-able manner.

The following section will explain how the implementations were combined to make WSO2 MB compatible to support MQTT in a distributed fashion.

Moquette Architecture  

Moquette is a java based Open Source light weight server capable to accept and handle MQTT based messages.  The following diagram depicts the architecture of the MQTT server.   

NettyAcceptor              : 

Netty is used as the NIO framework which handles TCP connections in the server. NettyAcceptor initiates the server connection through the provided attributes such as port number, the pipeline of handlers and spawns the acceptance and the worker threads.

ConnectDecoder         :

MQTT client streams messages as bytes in UTF encoded string format. ConnectDecoder as the name explains decodes these bytestream and validates the messages. For instance if the server is based on MQTT spec v 3.1 it will validate that the receiving bytes of the MQTT CONNECT message have "MQlsdp" as the protocol name and "3" as the version number. 

NettyMQTTHandler     :

Netty maintains a context for each connection, This context contains the state of each channel (active, inactive etc). NettyMQTTHandler maintains a map between these connection states and the channels, which in return allows identifying connection losses, sending of acknowledgments etc.  

SimpleMessaging     :

MQTT message has a fixed header explaining the type of the message (ex :- connection, publish) . Based on its type SimpleMessaging defines an event to each of the corresponding types such as PublishEvent , SubscribeEvent, OutputMessagingEvent,LostConnectionEven etc. Also this component will insert all of these events into a Disruptor ring buffer, which will again be processed sequentially and be processed through calling the necessary operations defined in the ProtocolProcessor. 

MessagingEvent         :

All of the above mentioned operations are generalized as MessagingEvent objects.

ProtocolProcessor      :

ProtocolProcessor is a centralized object which receives all the events and process them adhering to the MQTT protocol standards. The component basically persists all the subscription connections and map them with the corresponding topics, writes published messages to the corresponding subscription channels, manages persistence when required  (Messages with retain, QoS level > 0, clean session etc ). The out bound messages are also being distributed among the subscribers through a distruptor event ring. 

Moquette/WSO2 MB Integrated Architecture

WSO2 MB from its core offers scale ability across multiple nodes. In simple terms if there're three nodes in the cluster Node A, Node B and Node C. If a MQTT publisher, sends a message to Node A, not only the subscribers in Node A will receive the messages but rather all the subscribers who had subscribed to the relevant topic will receive the message. 

This approach makes the topics to be globalized allowing the broker to be scaled to meet high loads of requests and to cater to the enterprise needs. The diagram below depicts the integrated architecture. 

AndesBridge                        :  

As described above protocol events are being processed through ProtocolProcessor. For each event such as publish, subscribe, unsubscribe, disconnect there's a corresponding method virtualized in the AndesBridge component. Hence each event will result in triggering and informing the Kernal as well, the process will ensure that the Kernal could preserve the globalized aspect of the topic/s. 

MQTTChannel                    :

AndesBridge basically connects with the Kernal through this component. This ensures that the messages sent through the ProtocolProcessor are restructured  to objects which are accepted by the Kernal ex :- AndesMessagePart,AndesMessageMetadata. Also it will create the MQTTLocalSubscription objects accordingly and register it with the Kernal.   

MQTTLocalSubscription    : 

Each time when a message is published to the Kernal, it identifies the existence of subscriber to the corresponding topic through the registered LocalSubscription object/s. For each topic in a node there will be a corresponding LocalSubscription object registered, allowing it to trigger the callback to the AndesBridge connection which is specific to that local node. 

Let's say there're 10 subscribers to the topic "HelloMQTT" in Node A. For the Kernal it will be indicated as 1 MQTTLocalSubscription object to the topic "HelloMQTT" in Node A (Note that if there're subscribers in Node B for the same topic "HelloMQTT" a separate MQTTLocalSubscription will be created in that case, since though it has the same topic name it will have a separate AndesBridge connection specific to its own local instance). 

When a message is published to the topic it will trigger the AndesBridge connection relevant to the local node. The trigger will delegate the published message to the ProtocolProcessor instance of the local node which in return will write the message to all its subscribers.    

AndesKernal                         :

Allows topics to be globalized across the cluster. 

Consolidated Architecture 


No comments:

Post a Comment