Here is some sample code on how t o use Python and RabbitMQ
Sources:
https://www.rabbitmq.com/tutorials/tutorial-one-python.html
https://medium.com/analytics-vidhya/how-to-use-rabbitmq-with-python-e0ccfe7fa959
Install PIP and Pika (AMQP python client)
(if you haven’t already)
1
2
sudo apt install pip -y
pip install pika
Producer: Connection and Send
Simple: Make a connection and send a short message
1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python3
#producer
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
More through connection and send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python3
#producer
import pika
#declaring the credentials needed for connection like host, port, username, password, exchange etc
credentials = pika.PlainCredentials(‘tester’,’secretPass’)
connection= pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’, credentials= credentials))
channel= connection.channel()
channel.exchange_declare(‘test’, durable=True, exchange_type=’topic’)
channel.queue_declare(queue= ‘A’)
channel.queue_bind(exchange=’test’, queue=’A’, routing_key=’A’)
channel.queue_declare(queue= ‘B’)
channel.queue_bind(exchange=’test’, queue=’B’, routing_key=’B’)
channel.queue_declare(queue= ‘C’)
channel.queue_bind(exchange=’test’, queue=’C’, routing_key=’C’)
#messaging to queue named C
message= ‘hello consumer!!!!!’
channel.basic_publish(exchange=’test’, routing_key=’C’, body= message)
channel.close()
Consumer: Retrieve a message
Simple consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
More through consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
#consumer
import pika
#declaring the credentials needed for connection like host, port, username, password, exchange etc
credentials = pika.PlainCredentials('tester','secretPass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port='5672', credentials= credentials))
channel = connection.channel()
channel.exchange_declare('test', durable=True, exchange_type='topic')
#defining callback functions responding to corresponding queue callbacks
def callbackFunctionForQueueA(ch,method,properties,body):
print('Got a message from Queue A: ', body)
def callbackFunctionForQueueB(ch,method,properties,body):
print('Got a message from Queue B: ', body)
def callbackFunctionForQueueC(ch,method,properties,body):
print('Got a message from Queue C: ', body)
#Attaching consumer callback functions to respective queues that we wrote above
channel.basic_consume(queue='A', on_message_callback=callbackFunctionForQueueA, auto_ack=True)
channel.basic_consume(queue='B', on_message_callback=callbackFunctionForQueueB, auto_ack=True)
channel.basic_consume(queue='C', on_message_callback=callbackFunctionForQueueC, auto_ack=True)
#this will be command for starting the consumer session
channel.start_consuming()