Home Using Python with RabbitMQ
Post
Cancel

Using Python with RabbitMQ

Here is some sample code on how t o use Python and RabbitMQ

Sources: https://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://medium.com/analytics-vidhya/how-to-use-rabbitmq-with-python-e0ccfe7fa959

Install PIP and Pika (AMQP python client)

(if you haven’t already)

1
2
sudo apt install pip -y
pip install pika

Producer: Connection and Send

Simple: Make a connection and send a short message

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python3
#producer
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

More through connection and send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python3
#producer
import pika
#declaring the credentials needed for connection like host, port, username, password, exchange etc
credentials = pika.PlainCredentials(tester,secretPass)
connection= pika.BlockingConnection(pika.ConnectionParameters(host=localhost, credentials= credentials))
channel= connection.channel()
channel.exchange_declare(test, durable=True, exchange_type=topic)
channel.queue_declare(queue= A)
channel.queue_bind(exchange=test, queue=A, routing_key=A)
channel.queue_declare(queue= B)
channel.queue_bind(exchange=test, queue=B, routing_key=B)
channel.queue_declare(queue= C)
channel.queue_bind(exchange=test, queue=C, routing_key=C)
#messaging to queue named C
message= hello consumer!!!!!’
channel.basic_publish(exchange=test, routing_key=C, body= message)
channel.close()

Consumer: Retrieve a message

Simple consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

More through consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
#consumer
import pika
#declaring the credentials needed for connection like host, port, username, password, exchange etc
credentials = pika.PlainCredentials('tester','secretPass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port='5672', credentials= credentials))
channel = connection.channel()
channel.exchange_declare('test', durable=True, exchange_type='topic')
#defining callback functions responding to corresponding queue callbacks
def callbackFunctionForQueueA(ch,method,properties,body):
 print('Got a message from Queue A: ', body)
def callbackFunctionForQueueB(ch,method,properties,body):
 print('Got a message from Queue B: ', body)
def callbackFunctionForQueueC(ch,method,properties,body):
 print('Got a message from Queue C: ', body)
#Attaching consumer callback functions to respective queues that we wrote above
channel.basic_consume(queue='A', on_message_callback=callbackFunctionForQueueA, auto_ack=True)
channel.basic_consume(queue='B', on_message_callback=callbackFunctionForQueueB, auto_ack=True)
channel.basic_consume(queue='C', on_message_callback=callbackFunctionForQueueC, auto_ack=True)
#this will be command for starting the consumer session
channel.start_consuming()
This post is licensed under CC BY 4.0 by the author.