Here is a sample python script to retrieve transactions from one queue, add an element to the data to approve the transaction and then submit the data to another queue. Think of this as a verification process for a transaction.
This script will “block” or wait for additional transactions. In my example “store” this would be desirable.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python3
import pika
import json
import ast
import time
import sys, os
with open('include.py') as f: exec(f.read())
# Maximum of transactions to approve.
max_approve = 50
q_in=transaction_q_name
q_out=approved_q_name
def main():
# Connect to the incoming queue
params = pika.ConnectionParameters(host='localhost')
connection_r = pika.BlockingConnection(params)
channel_r = connection_r.channel()
channel_r.queue_declare(queue=q_in)
channel_r.basic_qos(prefetch_count=1)
# Retrieve a transaction
for method_frame, properties, body in channel_r.consume(q_in):
channel_r.basic_ack(method_frame.delivery_tag)
# -------------------------------------------
# Do work here to "approve" the transaction.
# -------------------------------------------
# Add a tag to the data.
output = ast.literal_eval(body.decode('utf-8'))
output.update({'approved' : 'true'})
info_body = json.dumps(output, indent = 4)
# -------------------------------------
# Connect to the outbound queue
connection_s = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel_s = connection_s.channel()
channel_s.queue_declare(queue=q_out)
channel_s.basic_publish(exchange='',
routing_key=q_out,
body=info_body)
connection_s.close()
# Print a count every X times
if (method_frame.delivery_tag % 1000 == 0):
print("Tag number: ", "{:,.0f}".format(method_frame.delivery_tag))
# Escape out of the loop after X message
if (method_frame.delivery_tag == max_approve):
# Cancel the consumer and return any pending messages
requeued_messages = channel_r.cancel()
print('Requeued %i messages' % requeued_messages)
break
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)