Kafka fake data producer and consumer in python
Overview
Sometimes it can happen that we need quickly add some messages into kafka topic. e.g. need to propagate some fake data into development environment. Python api allow for quick integration with kafka topic.
Repository with working example could be found in here.
Setup Kafka
At the beginning we need to install and run kafka server. If you already have up and running either zookeeper and kafka you can skip this section.
The quickest way is to install in your project directory:
mkdir scripts && cd scripts || exit
curl -fsSL https://raw.githubusercontent.com/greencashew/kafka-python-random-data-example/main/scripts/install-kafka.sh -o install-kafka.sh && chmod 777 install-kafka.sh && ./install-kafka.sh
curl -fsSL https://raw.githubusercontent.com/greencashew/kafka-python-random-data-example/main/scripts/run-zookeeper.sh -o run-zookeeper.sh && chmod 777 run-zookeeper.sh
curl -fsSL https://raw.githubusercontent.com/greencashew/kafka-python-random-data-example/main/scripts/run-kafka.sh -o run-kafka.sh && chmod 777 run-kafka.sh
Kafka should be installed in vendor
directory. KAFKA_HOME
should be defined in .env
file. In scripts
dir it should appear run-zookeeper.sh
,run-kafka.sh
scripts.
To run kafka open terminal window under scripts
:
./run-zookeeper.sh
In another window run kafka server:
./run-kafka.sh
Python dependencies
Before start project it is needed to add required libraries into requirements.txt
file:
Faker==6.6.2
kafka-python
Another step is to install virtual environment and missing dependencies:
virtualenv venv
source venv/bin/activate
pip install -r requirements.txt
Producer
In example below we can see how to propagate fake trip invoices into kafka topic.
You have to remember to change BOOTSTRAP_SERVER
if it is different one.
from kafka import KafkaProducer
import json
import time
import datetime
from faker import Faker
KAFKA_TOPIC = "invoices"
BOOTSTRAP_SERVER = 'localhost:9092'
fake = Faker()
def get_random_invoice():
tax_percentage = 12
net_price = fake.random_int(min=3, max=15)
date_time_invoice_created = fake.date_time_between(start_date='-20y', end_date='now', tzinfo=None) \
.strftime('%Y-%m-%dT%H:%M:%S')
return {
"id": fake.random_number(digits=12),
"name": fake.name(),
"date": date_time_invoice_created,
"address": fake.address().replace("\n", " "),
"startGate": fake.city(),
"exitGate": fake.city(),
"price": {
"net": net_price,
"taxPercentage": tax_percentage,
"total": net_price + (net_price * (tax_percentage / 100))
}
}
def json_serializer(data):
return json.dumps(data).encode("utf-8")
producer = KafkaProducer(bootstrap_servers=[BOOTSTRAP_SERVER],
value_serializer=json_serializer)
if __name__ == "__main__":
while 1:
random_invoice = get_random_invoice()
print("{}: {}".format(datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S'), random_invoice))
producer.send(KAFKA_TOPIC, random_invoice)
time.sleep(fake.random_int(0, 3))
To run producer.py
simply use:
python3 ./producer.py
Consumer
from datetime import datetime
from kafka import KafkaConsumer
import sys
KAFKA_TOPIC = "invoices"
BOOTSTRAP_SERVER = 'localhost:9092'
GROUP_ID = 'consumerGroup1'
AUTO_OFFSET_RESET = 'earliest'
if __name__ == "__main__":
consumer = KafkaConsumer(KAFKA_TOPIC,
group_id=GROUP_ID,
bootstrap_servers=BOOTSTRAP_SERVER,
auto_offset_reset=AUTO_OFFSET_RESET)
try:
for message in consumer:
print("[{}][{}:{}:{}]: {}".format(
datetime.now().strftime('%d-%m-%Y %H:%M:%S'),
message.topic,
message.partition,
message.offset,
message.value))
except KeyboardInterrupt:
sys.exit()
To run consumer.py
simply use:
python3 ./consumer.py
End words
This is very basic example without serializers/deserializers applied. For more details i recommend visiting official documentation Python api also support avro schemas which is commonly used in kafka.
Sources
Maybe you want to share? :)