Monday, February 27, 2017

How i got started with Ballerina

I am certain most of my friend's would click on the link to see me dancing :)

With the announcement of Ballerina, the new integration language. I thought of writing a quick summary on how i got started. 


I downloaded Ballerina from here. Also i referred Installation-instructions to get started.

Writing an EIP

CBR as a very common EIP in the integration world was something i tried out with Ballerina. So here's how i did it.  

Creating a Mock Service in Ballerina

Something i was longing to try out in Ballerina is to be able to write a service which could be executed in the same runtime. So here's how i did it, 

Started the composer, and viola it provided a graphical view for me to represent the service and what it should do and all i had to do was drag and drop a few elements to the canvas. This was like drawing a floor chart. 

the service i created would accept an incoming http message and send a mock respond back. The source view showed the language syntax i could use, here's how that looked like.

import ballerina.lang.messages;

service GadgetInventoryMockService {
resource inquire(message m) {
message response = {};
json payload = `{"inquire":"gadget","availability":"true"}`;
messages:setJsonPayload(response, payload);
reply response;

Similarly i managed to create both the services ("Widget Inventory" and "Gadget Inventory").

Routing with Ballerina

Just like creating a service i was able to drag an drop a set of elements from the graphical view and create the router

import ballerina.lang.jsons;
import ballerina.lang.messages;

service ContentBasedRouter {
resource lookup(message m) {
http:ClientConnector widgetEP = create http:ClientConnector("http://localhost:9090/widgets");
http:ClientConnector gadgetEP = create http:ClientConnector("http://localhost:9090/gadgets");
json requestMessage = messages:getJsonPayload(m);
string inventoryType = jsons:getString(requestMessage, "$.type");
message response = {};
if (inventoryType == "gadget") {
response = http:ClientConnector.get(gadgetEP, "/", m);
else {
response = http:ClientConnector.get(widgetEP, "/", m);
reply response;

While looking back i realize, it was not only convenient to create the message flow, but it was also easier for me to describe the flow through the diagram. The way it was describing the connections, the message flow and the client as seperate entities (the picture was actually speaking 1000 words :) ). 

Running What I Wrote 

I was excited to see how this diagram, would look like when it's running.

This is all what i had to do,

ballerina run service ./gadgetInventoryMockService.bal ./widgetInventoryMockService.bal ./router.bal

where, gadgetInventoryMockService.bal and widgetInventoryMockService.bal were the mock services i wrote and router.bal is the routing logic. In this case i would've preferred to actually be able to bundle the whole project into one package instead of having to give each an individual file as arguments. I checked on this capability with the team and this will be supported in the near future by the composer. So i'll have my fingers crossed for this. As a result in my local machine each of the bal files were running as a service in the following URLs. The files i used could be found here.

Gadget Inventory Mock Service
Widget Inventory Mock Service

So to practically experience how Ballerina routed the requests i did the following, using cURL client i sent the following request, 

curl -v http://localhost:9090/route -d '{"type" : "gadget"}'

The following response should be observed,


Re executed the request with the following,
curl -v http://localhost:9090/route -d '{"type" : "widget"}'

Then the following response should be observed,

In general there're more components i.e fork-join capability which will be required to implement some of the EIPs i wanted to try out i.e scatter-gather, so tick tock for the next release. However, it was a great experience.

Sunday, June 12, 2016

ESB as a Transistor for Protocol Switching Between AMQP and MQTT - Part 2

In the previous post i described how WSO2 ESB could be used to switch between AMQP and MQTT protocols. In that the pattern of having AMQP queue producer and and MQTT as a consumer was discussed. In this post i would be elaborating how a message could be published to AMQP topic and received by both AMQP and MQTT consumers.

2.0 AMQP Topic to MQTT Topic 

Please follow the instruction described in the previous post to setup the environments if it has not being done already.

Usage: Publishing message to AMQP topic and receiving is from both AMQP and MQTT consumers

Message Flow

Use case: This sample demonstrates how a message published to AMQP topic via JMS could be consumed by both MQTT and AMQP topic consumers.

  1. Start the ESB server and log into its management console UI (https://localhost:9443/carbon). In the management console, navigate to the main menu and click source view in the service bus section. Next, copy the configuration which could be found in ESBConfiguration.xml
  2. Replace the file which could be located in the ESB ($ESB_HOME /repository/conf) directory with
  3. Start two JMeter instances, one which would act as a JMS message publisher and the other which will act as a JMS message consumer. The consumer could be found in AMQPTopicConsumer.jmx and the producer could be found in AMQPTopicPublisher.jmx.
  4. Start MQTT fx client and subscribe to the topic ‘TrackLocationMqttTopic’.
  5. Execute the Jmeter consumer and producer and observe.


JMS message which was published to ‘TrackLocationAMQPTopic’ would be received by both the AMQP subscriber who subscribed to the topic ‘‘TrackLocationAMQPTopic’’ and MQTT subscriber who subscribed to ‘‘TrackLocationMqttTopic’”.

JMS Topic Publisher (JMeter)

JMS Topic Subscriber (JMeter)

MQTT Topic Subscriber (MQTT Fx)

Note : Message Broker current version ( < 3.1.0) does not support subscribing to the same topic name with two different protocols. I.e AMQP and MQTT, hence the consumers were subscribed to two different topic names and was correlated through the ESB. however, Message Broker future releases intend to allow the capability to subscribe to a common topic name by different protocols. 

The next two patterns "Publishing to MQTT topic and receiving from AMQP queue consumer" and "Publishing to MQTT topic and receiving from AMQP topic consumer" would discuss the inverse of the two patterns discussed in part 1 and 2. Will elaborate these patterns in the upcoming days. 

ESB as a Transistor for Protocol Switching Between AMQP and MQTT - Part 1

In my previous post, i discussed how both protocols AMQP and MQTT could be of use for different enterprise requirements. Both protocols has its own advantages, in my opinion AMQP could be more robust over MQTT due to its maturity, however MQTT would outperform AMQP to its lightweight nature making the protocol more suitable for applications intended to be executed in low powered and constrained network environments. 

Due to the potential of both the protocols the apparent question would be the possibility of using both of them. Could this be achieved ? the simple answer would be yes. The following post would discuss in a series on how this could be achieved using both WSO2 ESB and MB.   

Transistor is typically used to switch electronic signals (typically between hight powered circuits to low powered once) i would like to apply that term to describe how WSO2 Enterprise Service Bus could act as a transistor to allow switching between both the protocols (AMQP protocol and MQTT) supported by WSO2 Message Broker, imagining the use case of an enterprise having AMQP as the high powered source and MQTT as the lightweight source deployed on devices running in constrained environments. 

I would like to describe 4 patterns which could be followed when protocol switching,

  1. Publishing to a AMQP queue and receiving from MQTT topic consumer
  2. Publishing to AMQP topic and receiving from both MQTT and AMQP topic consumers. 
  3. Publishing to MQTT topic and receiving from AMQP queue consumer 
  4. Publishing to MQTT topic and receiving from AMQP topic consumer
The above patterns would be described in a generic fashion which could be implied to various use cases,

Setting Up


  • Download WSO2 MB and  WSO2 ESB from the website. (Also note i used WSO2 ESB 4.9.0 and WSO2 MB 3.1.0 for this post)
  • To send/receive AMQP messages via JMS we use apache JMeter.
  • To send/receive MQTT messages we use MQTT Fx as the client.


  • Enable MQTT Transport sender and receiver in the ESB.
  • Enable JMS Transport sender and receiver in the ESB.
  • Copy the MB client libraries from $MB_HOME/client-lib to $JMeter_HOME/lib/ext and also copy the client libraries to $ESB_HOME/repository/components/lib/
  • Create the file and add the following to it. Make sure to track the url of the jndi file location. This location should be replaced in the ‘Provider URL’ of the jmeter script file. The jndi file could be found in
  • Startup MB server. Make sure the MB server is running on its default offset.
  • Startup ESB server. Make sure that you offset the ESB by 1.

1.0 AMQP Queue to MQTT Topic

Usage : Publishing message to AMQP Queue and consuming the message from MQTT client.

Message Flow

Use Case : This sample demonstrates how a message published to AMQP queue via JMS could be consumed by an MQTT consumer.

  1. Start the ESB server and log into its management console UI (https://localhost:9443/carbon). In the management console, navigate to the main menu and click source view in the service bus section. Next, copy the configuration which could be found in ESBConfiguration.xml .
  2. Replace the file which could be located in the ESB ($ESB_HOME /repository/conf) directory with
  3. Start JMeter and import the script file AMQPQueuePublisher.jmx.
  4. Start MQTT fx client and subscribe to the topic ‘LocationTrackerTopic’.
  5. Execute the Jmeter script and observe.


The JMS message which was published to the LocationQueue should be received by the MQTT client consumer who is subscribed to LocationTrackerTopic.

As illustrated below in the screenshots,

JMS Client (JMeter)

MQTT Client (MQTT Fx)

Next post would describe the pattern of having AMQP topic producer distributing messages to both AMQP and MQTT consumers. 

What would you chose AMQP or MQTT ?

WSO2 Message Broker currently supports both AMQP and MQTT protocols. whether to use AMQP or MQTT was a frequent question i came across. The answer would subjective in my opinion, in simple terms both protocols has its own advantages, hence i would like to share a few of my findings on various strengths of both these protocols.

Large Messages Transmission
Uses buffer oriented approach and supports fragmentation, hence it’s more suitable in comparison
Stream oriented protocol, does not support fragmentation. Hence more suitable for smaller size messages
Message Exchanges
Supports both topics and queues, suitable for long lived messages
Optimized for active routing, publisher-subscriber based.
Transactional Support
Supports different level of transactions and acknowledgment modes i.e distributed XA transactions, client acknowledgement, auto acknowledgment  
Supports three acknowledgment modes. Specifically focusses on exactly once delivery
Connection Security
Supports standard TLS through SASL.
Supports basic auth and SSL. Security is embedded into the protocol itself. This requires security changes to be included in the protocol itself.  
Last value queues
Does not support explicitly
Supports this in the form of RETAIN flag.
Reliable Messaging
Supports reliability using different acknowledgment modes and session durability
Supports reliability using different acknowledgment modes and session durability  
Supports native failover
Need to support failover explicitly

Depending on the enterprise needs the above selection of protocols could defer, AMQP is a more matured protocol in comparison to MQTT and its readily available for the use of enterprise requirements, however AMQP is more heavy weight than MQTT which makes it less suitable for devices running in low powered and constrained network environments.

What if you want to use both these protocols? I would be discussing on how protocols could be switched in my next post.

Saturday, May 10, 2014

MQTT Transport Architecture : WSO2 MB 3.x

Message Queuing and Telemetry Transport (MQTT) has found its way to be one of the protocols I've being exploring for some time. Keep your figures crossed, upcoming WSO2 Message Broker (MB) 3.x.x release will be supporting MQTT in addition to AMQP. 

MQTT in a Nutshell 

Focusing on Machine to Machine (M2M) paradigms. let's take a minute to picture how telemetry data are being exchanged between one system to the other. These systems use an integration layer to communicate back and forth. MQTT is introduced as a lightweight protocol to be applied on such integration use cases. 

MQTT as a protocol is lightweight in comparison to other queuing protocols, allowing systems to exchange information in a highly efficient manner. MQTT defines the reliability of message delivery through three levels of QoS. 

QoS 0 - > This level will ensure that the message delivery is efficient, however its not always guaranteed that the message will be delivered to its recipient/s. For instance, this level of QoS is applicable when it comes to data which is being exchanged through sensors where the published messages will be redundant and where efficiency is prioritized over reliability.  

QoS 1 -> This level will ensure that the message is delivered at least once. But messages can be duplicated. This will be a more guaranteed approach in comparison to QoS 0 but will not be efficient as much as it.   

QoS 2 ->  This is the highest level of QoS. This will guarantee that the message is delivered and its delivered only once. This option might not be efficient as much as the other levels of QoS. For use cases such as transaction done through an ATMs where atomicity is of concern, this level of QoS will be applicable. 

For more information on the protocol please refer  MQTT Specification 3.1 which will illustrate the protocol in detail.  

MB Transport Architecture  

WSO2 MB Architecture is designed such that the core of it  allows queues/topics to be globalized as well as scale across several instances. Moquette java based MQTT server implementation was partially adopted and integrated to the MB architecture to support MQTT in a scale-able manner.

The following section will explain how the implementations were combined to make WSO2 MB compatible to support MQTT in a distributed fashion.

Moquette Architecture  

Moquette is a java based Open Source light weight server capable to accept and handle MQTT based messages.  The following diagram depicts the architecture of the MQTT server.   

NettyAcceptor              : 

Netty is used as the NIO framework which handles TCP connections in the server. NettyAcceptor initiates the server connection through the provided attributes such as port number, the pipeline of handlers and spawns the acceptance and the worker threads.

ConnectDecoder         :

MQTT client streams messages as bytes in UTF encoded string format. ConnectDecoder as the name explains decodes these bytestream and validates the messages. For instance if the server is based on MQTT spec v 3.1 it will validate that the receiving bytes of the MQTT CONNECT message have "MQlsdp" as the protocol name and "3" as the version number. 

NettyMQTTHandler     :

Netty maintains a context for each connection, This context contains the state of each channel (active, inactive etc). NettyMQTTHandler maintains a map between these connection states and the channels, which in return allows identifying connection losses, sending of acknowledgments etc.  

SimpleMessaging     :

MQTT message has a fixed header explaining the type of the message (ex :- connection, publish) . Based on its type SimpleMessaging defines an event to each of the corresponding types such as PublishEvent , SubscribeEvent, OutputMessagingEvent,LostConnectionEven etc. Also this component will insert all of these events into a Disruptor ring buffer, which will again be processed sequentially and be processed through calling the necessary operations defined in the ProtocolProcessor. 

MessagingEvent         :

All of the above mentioned operations are generalized as MessagingEvent objects.

ProtocolProcessor      :

ProtocolProcessor is a centralized object which receives all the events and process them adhering to the MQTT protocol standards. The component basically persists all the subscription connections and map them with the corresponding topics, writes published messages to the corresponding subscription channels, manages persistence when required  (Messages with retain, QoS level > 0, clean session etc ). The out bound messages are also being distributed among the subscribers through a distruptor event ring. 

Moquette/WSO2 MB Integrated Architecture

WSO2 MB from its core offers scale ability across multiple nodes. In simple terms if there're three nodes in the cluster Node A, Node B and Node C. If a MQTT publisher, sends a message to Node A, not only the subscribers in Node A will receive the messages but rather all the subscribers who had subscribed to the relevant topic will receive the message. 

This approach makes the topics to be globalized allowing the broker to be scaled to meet high loads of requests and to cater to the enterprise needs. The diagram below depicts the integrated architecture. 

AndesBridge                        :  

As described above protocol events are being processed through ProtocolProcessor. For each event such as publish, subscribe, unsubscribe, disconnect there's a corresponding method virtualized in the AndesBridge component. Hence each event will result in triggering and informing the Kernal as well, the process will ensure that the Kernal could preserve the globalized aspect of the topic/s. 

MQTTChannel                    :

AndesBridge basically connects with the Kernal through this component. This ensures that the messages sent through the ProtocolProcessor are restructured  to objects which are accepted by the Kernal ex :- AndesMessagePart,AndesMessageMetadata. Also it will create the MQTTLocalSubscription objects accordingly and register it with the Kernal.   

MQTTLocalSubscription    : 

Each time when a message is published to the Kernal, it identifies the existence of subscriber to the corresponding topic through the registered LocalSubscription object/s. For each topic in a node there will be a corresponding LocalSubscription object registered, allowing it to trigger the callback to the AndesBridge connection which is specific to that local node. 

Let's say there're 10 subscribers to the topic "HelloMQTT" in Node A. For the Kernal it will be indicated as 1 MQTTLocalSubscription object to the topic "HelloMQTT" in Node A (Note that if there're subscribers in Node B for the same topic "HelloMQTT" a separate MQTTLocalSubscription will be created in that case, since though it has the same topic name it will have a separate AndesBridge connection specific to its own local instance). 

When a message is published to the topic it will trigger the AndesBridge connection relevant to the local node. The trigger will delegate the published message to the ProtocolProcessor instance of the local node which in return will write the message to all its subscribers.    

AndesKernal                         :

Allows topics to be globalized across the cluster. 

Consolidated Architecture