Home Event Driven Architecture - Python Transaction "Approval"
Post
Cancel

Event Driven Architecture - Python Transaction "Approval"

Here is a sample python script to retrieve transactions from one queue, add an element to the data to approve the transaction and then submit the data to another queue. Think of this as a verification process for a transaction.

This script will “block” or wait for additional transactions. In my example “store” this would be desirable.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python3
import pika
import json
import ast
import time
import sys, os

with open('include.py') as f: exec(f.read())

# Maximum of transactions to approve.
max_approve = 50
q_in=transaction_q_name
q_out=approved_q_name

def main():

    # Connect to the incoming queue
    params = pika.ConnectionParameters(host='localhost')
    connection_r = pika.BlockingConnection(params)
    channel_r = connection_r.channel()
    channel_r.queue_declare(queue=q_in)
    channel_r.basic_qos(prefetch_count=1)

    # Retrieve a transaction
    for method_frame, properties, body in channel_r.consume(q_in):
        channel_r.basic_ack(method_frame.delivery_tag)

        # -------------------------------------------
        # Do work here to "approve" the transaction.
        # -------------------------------------------
        # Add a tag to the data.
        output = ast.literal_eval(body.decode('utf-8'))
        output.update({'approved' : 'true'})
        info_body = json.dumps(output, indent = 4)
        # -------------------------------------

        # Connect to the outbound queue
        connection_s = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel_s = connection_s.channel()
        channel_s.queue_declare(queue=q_out)
        channel_s.basic_publish(exchange='',
                              routing_key=q_out,
                              body=info_body)
        connection_s.close()

        # Print a count every X times
        if (method_frame.delivery_tag % 1000 == 0):
            print("Tag number: ", "{:,.0f}".format(method_frame.delivery_tag))

        # Escape out of the loop after X message
        if (method_frame.delivery_tag == max_approve):
            # Cancel the consumer and return any pending messages
            requeued_messages = channel_r.cancel()
            print('Requeued %i messages' % requeued_messages)
            break


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)
This post is licensed under CC BY 4.0 by the author.