Message Broker -Rabbit MQ

Message Broker -(AMQP) Rabbit MQ

  Imagine if you have N modules (where N is sufficiently large number) which needs to communicate with each other. Then you need (N*(n-1))/2 number of paths for communication. That is if you draw a Graph you will get a complete graph. Each such module may open socket (either Network socket or Unix Socket) and exchange xml, json or yml etc. format to communicate with each other. A problem with such approach is that not only the Graph has large number of edges but also each and every nodes must be up at the same time to make such communication possible. While designing distributed system and scalable architecture another problem that is posed is that nodes may be increased elastically to cater heavy load conditions. When dealing with micro services which needs to be scaled up and down elastically a complete graph approach does not looks good. What will happen if a message to trigger important database update is lost due to network failure or middle ware crash or reboot. For monolithic architecture having few components is good to go with traditional Mesh type communication. But for micro services on cloud this will be a very nasty solution. Using a simple bus type topology will solve large number of edge problems, but the messages in the bus has to persist so that sender service may not have to wait for the receiver service.The Advanced Message Queing Protocol is an open standard protocol which solves this problem

   There are several frameworks available to use AMQP such as1)Rabbit MQ2)Kafka3)NATS

Here I will discuss Rabbit MQ with Python Pika. You can use most of the popular languages to work with Rabbit MQ. I will prefer to use python . Pika is a Python libraby which implements AMQP and works very well with Rabbit MQ.

So lets discusss Rabbit MQ.


You start a Rabbit MQ service on a machine.
Producers posts messages to exchange in the Rabbit MQ service and consumers consume them from their queues. In a Rabbit MQ service you can configure many exchanges. Messages are published to exchanges, which are often compared to post offices or mailboxes. Exchanges then distribute message copies to queues using rules called bindings.Each exchange has several message queues tied with them and each of these queue has unique name and routing key to reach it. When producer posts a message it specifies the exchange and the routing key. The message is received by the exchange and is routed to the correct destination queue on the basis of the routing key. The consumers subscribes to these queues and whenever a message comes to the queue the consumer can consume. Note that the consumer process can be spawned after the producer process posts the message. The producer and consumer process are loosely coupled and can be designed with any independent technology.
Exchanges Bindings Routing Keys



Different Types of Exchanges

1) Direct Exchange:
The Direct exchange type routes messages with a routing key equal to the routing key declared by the binding queue.  The following illustrates how the direct exchange type works:

DirectExchange
2) Fanout Exchanges
The Fanout exchange type routes messages to all bound queues indiscriminately.  If a routing key is provided, it will simply be ignored.  The following illustrates how the fanout exchange type works:

FanoutExchange

3) Topic Exchanges

The Topic exchange type routes messages to queues whose routing key matches all, or a portion of a routing key. The keys can be eliminated by  '.' . Wildcards "*", "#" can be used to tell which queues to reach.

* substitutes for exactly one word
# substitue for one or more words separated by .

TopicExchange

4) Headers Exchanges

The Headers exchange type routes messages based upon a matching of message headers to the expected headers specified by the binding queue.'
    When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.
The Headers exchange type is useful for directing messages which may contain a subset of known criteria where the order is not established.
In the diagram below the message reaches first queue because condition is OR (x-match = any). It did not reached second queue because although for this queue condition is OR the {"key1","value1"} is not present. The third queue has condition as AND . As {key2,value2} is not present in the queue it did not reached the third queue.
HeadersExchange
5) Dead Letter Exchange
If no matching queue can be found for the message, the message is silently dropped. RabbitMQ provides an AMQP extension known as the "Dead Letter Exchange" - the dead letter exchange provides functionality to capture messages that are not deliverable.

The more is the overhead in matching the routing key the slower will be the exchange. For this reason Fanout is fastest of all.

Important Properties of a Queue


  • Name
  • Durable (the queue will survive a broker restart)
  • Exclusive (used by only one connection and the queue will be deleted when that connection closes)
  • Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
  • Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)

Producer Code

#!/usr/bin/env python
import pika
import sys
#Create Socket Connection to Rabbit MQ service runnig on local host
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
#Channel is logical connections on a socket. Many such logical connections can share a #TCP connection to limit system resource usage
channel = connection.channel()
#Declare the exchange name and its Type
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
#Close the network connection
connection.close()


Consumer Code


#!/usr/bin/env python
import pika

#Create Socket Connection to Rabbit MQ service runnig on local host

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

#Channel is logical connections on a socket. Many such logical connections can share a TCP connection to limit system resource usage

channel = connection.channel()
#Declare the exchange name and its Type

channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue #Bind the queue to the exchange
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
#Register callback with the queue. Whenever a message is posted to the queue the callback is called.
channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)
#Start the event processing loop which invokes the call back when a message is received. channel.start_consuming()




Post a Comment

0 Comments
* Please Don't Spam Here. All the Comments are Reviewed by Admin.