-
Notifications
You must be signed in to change notification settings - Fork 0
/
sample_producer.py
75 lines (57 loc) · 1.79 KB
/
sample_producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import random
import uuid
import json
import time
from confluent_kafka import Producer
bootstrap_servers = '172.18.0.2:30195'
producer = Producer({
'bootstrap.servers': bootstrap_servers,
'transactional.id': "sample_producer"
})
producer.init_transactions()
def create_account_event():
account_number = get_account_number()
amount = random.choice(list(range(1, 100)))
account_event = {
"id": f"{uuid.uuid4()}",
"transactionId": f"{uuid.uuid4()}",
"amount": amount,
"userId": f"{account_number}",
"status": "new",
"accountNumber": f"{account_number}",
"type": random.choice(["deposit", "withdraw"])
}
return account_event
def create_order_event():
amount = 20
account_number = get_account_number()
account_event = {
"id": f"{uuid.uuid4()}",
"transactionId": f"{uuid.uuid4()}",
"amount": amount,
"userId": f"{account_number}",
"status": "new",
"accountNumber": f"{account_number}",
"stock": random.choice(["StockA", "StockB"]),
"type": random.choice(["buy", "sell"])
}
return account_event
def create_price_event():
price = random.choice(list(range(1, 100)))
price_event = {
"id": f"{uuid.uuid4()}",
"newPrice": price,
"stock": random.choice(["StockA", "StockB"]),
}
return price_event
def get_account_number():
return str(random.choice(list(range(1000, 1003))))
for i in range(1):
event = create_order_event()
message = json.dumps(event)
print(f'Sending event {message}')
producer.begin_transaction()
producer.produce('incoming-orders', f'{message}'.encode('utf-8'), event['transactionId'].encode("utf-8"))
producer.flush()
producer.commit_transaction()
time.sleep(0.5)