Overview

Sometimes it can happen that we need quickly add some messages into kafka topic. e.g. need to propagate some fake data into development environment. Python api allow for quick integration with kafka topic.

Repository with working example could be found in here.

Setup Kafka

At the beginning we need to install and run kafka server. If you already have up and running either zookeeper and kafka you can skip this section.

The quickest way is to install in your project directory:

mkdir scripts && cd scripts || exit
curl -fsSL https://raw.githubusercontent.com/greencashew/kafka-python-random-data-example/main/scripts/install-kafka.sh -o install-kafka.sh && chmod 777 install-kafka.sh && ./install-kafka.sh
curl -fsSL https://raw.githubusercontent.com/greencashew/kafka-python-random-data-example/main/scripts/run-zookeeper.sh -o run-zookeeper.sh && chmod 777 run-zookeeper.sh
curl -fsSL https://raw.githubusercontent.com/greencashew/kafka-python-random-data-example/main/scripts/run-kafka.sh -o run-kafka.sh && chmod 777 run-kafka.sh

Kafka should be installed in vendor directory. KAFKA_HOME should be defined in .env file. In scripts dir it should appear run-zookeeper.sh ,run-kafka.sh scripts.

To run kafka open terminal window under scripts:

./run-zookeeper.sh

In another window run kafka server:

./run-kafka.sh

Python dependencies

Before start project it is needed to add required libraries into requirements.txt file:

Faker==6.6.2
kafka-python

Another step is to install virtual environment and missing dependencies:

virtualenv venv
source venv/bin/activate
pip install -r requirements.txt

Producer

In example below we can see how to propagate fake trip invoices into kafka topic. You have to remember to change BOOTSTRAP_SERVER if it is different one.

from kafka import KafkaProducer
import json
import time
import datetime
from faker import Faker

KAFKA_TOPIC = "invoices"
BOOTSTRAP_SERVER = 'localhost:9092'

fake = Faker()

def get_random_invoice():
    tax_percentage = 12
    net_price = fake.random_int(min=3, max=15)
    date_time_invoice_created = fake.date_time_between(start_date='-20y', end_date='now', tzinfo=None) \
        .strftime('%Y-%m-%dT%H:%M:%S')
    return {
        "id": fake.random_number(digits=12),
        "name": fake.name(),
        "date": date_time_invoice_created,
        "address": fake.address().replace("\n", " "),
        "startGate": fake.city(),
        "exitGate": fake.city(),
        "price": {
            "net": net_price,
            "taxPercentage": tax_percentage,
            "total": net_price + (net_price * (tax_percentage / 100))
        }
    }


def json_serializer(data):
    return json.dumps(data).encode("utf-8")


producer = KafkaProducer(bootstrap_servers=[BOOTSTRAP_SERVER],
                         value_serializer=json_serializer)

if __name__ == "__main__":
    while 1:
        random_invoice = get_random_invoice()
        print("{}: {}".format(datetime.datetime.now().strftime('%d-%m-%Y %H:%M:%S'), random_invoice))
        producer.send(KAFKA_TOPIC, random_invoice)
        time.sleep(fake.random_int(0, 3))

To run producer.py simply use:

python3 ./producer.py

Consumer

from datetime import datetime

from kafka import KafkaConsumer
import sys

KAFKA_TOPIC = "invoices"
BOOTSTRAP_SERVER = 'localhost:9092'
GROUP_ID = 'consumerGroup1'
AUTO_OFFSET_RESET = 'earliest'

if __name__ == "__main__":
    consumer = KafkaConsumer(KAFKA_TOPIC,
                             group_id=GROUP_ID,
                             bootstrap_servers=BOOTSTRAP_SERVER,
                             auto_offset_reset=AUTO_OFFSET_RESET)
    try:
        for message in consumer:
            print("[{}][{}:{}:{}]: {}".format(
                datetime.now().strftime('%d-%m-%Y %H:%M:%S'),
                message.topic,
                message.partition,
                message.offset,
                message.value))

    except KeyboardInterrupt:
        sys.exit()

To run consumer.py simply use:

python3 ./consumer.py

End words

This is very basic example without serializers/deserializers applied. For more details i recommend visiting official documentation Python api also support avro schemas which is commonly used in kafka.

Sources