Home Event Driven Architecture - Python Producer
Post
Cancel

Event Driven Architecture - Python Producer

In the real world, producers/publishers can be imaged as people who send their mail to the post office. In the RabbitMQ world, producers/publishers are operators who send the job to RabbitMQ exchanges. Just like postmen, who simply route mail to specified postboxes, RabbitMQ exchanges simply route tasks to certain queues.

Here is a bare-bones sample python script to “produce” messages to a specific queue. The “message” is a python “dictionary” (or an array) called “purchase_dict”. The elements of that dictionary contain several pseudo-random parts from a function defined in include.py. The queue names are defined in the same file.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#!/usr/bin/env python3

import array as arr
import random
from datetime import datetime
import pika
import time
import sys
import json

with open('include.py') as f: exec(f.read())

# number of message to submit.
max_count = 5

def main():

    q=transaction_q_name
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=q)

    count = 0
    while (count < max_count):
        count = count + 1
        purchase_dict = {
                "id" : int(round(random.uniform(1000,9999),0)),
                "name" : random_element(1),
                "Ship to" : random_element(2),
                "Item name" : random_element(3),
                "Payment type" : "credit card",
                "timestamp" : str(datetime.now())
            }

        info_body = json.dumps(purchase_dict, indent = 4)
        channel.basic_publish(exchange='',
                              routing_key=q,
                              body=info_body)

        #print("Message sent: ", list_num, info_body)
        # print a message count every 50 messages if over 50 messages sent.
        if (count % 50 == 0):
          print("Count: ", "{:,.0f}".format(count))

    connection.close()

if __name__ == "__main__":
    main()

include.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def random_element (list_num: int):
    array_names = ["Fred", "James", "Susan", "Paul", "Donella", "John", "Abel", "Cain", "Joseph", "Mary", "Jane", "Nancy", "Lewis", "George"]
    state_names = ["Alaska", "Alabama", "Arkansas", "American Samoa", "Arizona", "California", "Colorado", "Connecticut", "District of Columbia", "Delaware",
                    "Florida", "Georgia", "Guam", "Hawaii", "Iowa", "Idaho", "Illinois", "Indiana", "Kansas", "Kentucky", "Louisiana",
                    "Massachusetts", "Maryland", "Maine", "Michigan", "Minnesota", "Missouri", "Mississippi", "Montana", "North Carolina",
                    "North Dakota", "Nebraska", "New Hampshire", "New Jersey", "New Mexico", "Nevada", "New York", "Ohio", "Oklahoma", "Oregon",
                    "Pennsylvania", "Puerto Rico", "Rhode Island", "South Carolina", "South Dakota", "Tennessee", "Texas", "Utah",
                    "Virginia", "Virgin Islands", "Vermont", "Washington", "Wisconsin", "West Virginia", "Wyoming"]
    item_names =  ["stove", "range hood", "carpet", "wrench", "printer", "computer", "sofa","door","keyboard", "phone","mouse", "speakers", "bins","labor charge","blinds"]

    if (list_num == 1):
        return random.choice(array_names)
    if (list_num == 2):
        return random.choice(state_names)
    if (list_num == 3):
        return random.choice(item_names)

# queue names
transaction_q_name = "transaction"
approved_q_name = "approved"
ship_q_name="ship"
shipped_q_name="shipped"
email_q_name="email"
emailed_q_name="emailed"
inv_change_q_name="inv-change"
inv_changed_q_name="inv-changed"
This post is licensed under CC BY 4.0 by the author.