» Python: Building Event-Driven Microservices with Kafka » 2. Producer: Web Service » 2.5 Event Producer

Event Producer

In event-driven architecture, event producers and consumers play crucial roles in facilitating communication and interaction between different components or services within a system.

An event producer is any component or service within a system that generates or emits events.

An event consumer is a component or service within a system that subscribes to and processes events emitted by event producers.

Event Queue

In this diagram, the “Bookstore Web Service“ is the producer, and the other two services are the consumers.

Key benefits of event-driven architecture include:

  • Loose coupling: Event-driven architecture promotes loose coupling between components, as producers and consumers interact through asynchronous communication based on events.
  • Scalability: It enables scalable systems by allowing event consumers to scale independently from event producers, ensuring that the system can handle varying workloads efficiently.
  • Fault tolerance: Event-driven architecture supports fault tolerance and resilience, as events can be reliably processed even if some components or services are temporarily unavailable.

Use Apache Kafka

  1. Install Apache Kafka on your machine and start it.

Using docker image is the easist way to have kafka on your machine.

docker pull apache/kafka:3.7.0
docker run -p 9092:9092 apache/kafka:3.7.0
  1. Create a topic to store your events.

Events are organized and durably stored in topics. Very simplified, a topic is similar to a folder in a filesystem, and the events are the files in that folder.

bin/kafka-topics.sh --create --topic lr-book-searches --bootstrap-server localhost:9092

Topics in Kafka are always multi-producer and multi-subscriber: a topic can have zero, one, or many producers that write events to it, as well as zero, one, or many consumers that subscribe to these events.

  1. Add kafka dependency:
pip3 install confluent-kafka

Remember to update requirements.txt.

  1. Update code.

Create web/infrastructure/mq/helper.py:

from abc import ABC, abstractmethod


class MQHelper(ABC):
    @abstractmethod
    def send_event(self, key: str, value: bytes) -> bool:
        pass

Create web/infrastructure/mq/kafka.py:

from typing import List

from confluent_kafka import Producer

from .helper import MQHelper


class KafkaQueue(MQHelper):
    def __init__(self, brokers: List[str], topic: str):
        self.producer = Producer({'bootstrap.servers': ','.join(brokers)})
        self.topic = topic

    def send_event(self, key: str, value: bytes) -> bool:
        self.producer.produce(self.topic, value, key)
        return True

Add message queue config items in web/infrastructure/config/config.py:

@@ -1,4 +1,5 @@
 from dataclasses import dataclass
+from typing import List
 import yaml
 
 
@@ -11,6 +12,12 @@ class DBConfig:
     database: str
 
 
+@dataclass
+class MQConfig:
+    brokers: List[str]
+    topic: str
+
+
 @dataclass
 class ApplicationConfig:
     port: int
@@ -22,6 +29,7 @@ class ApplicationConfig:
 class Config:
     app: ApplicationConfig
     db: DBConfig
+    mq: MQConfig
 
 
 def parseConfig(filename: str) -> Config:
@@ -29,5 +37,6 @@ def parseConfig(filename: str) -> Config:
         data = yaml.safe_load(f)
         return Config(
             ApplicationConfig(**data['app']),
-            DBConfig(**data['db'])
+            DBConfig(**data['db']),
+            MQConfig(**data['mq'])
         )

Put in config values in web/config.yml:

@@ -7,4 +7,8 @@ db:
   port: 3306
   user: "test_user"
   password: "test_pass"
-  database: "lr_event_book"
\ No newline at end of file
+  database: "lr_event_book"
+mq:
+  brokers:
+    - localhost:9092
+  topic: "lr-book-searches"

Tune web/application/wire_helper.py:

@@ -1,16 +1,22 @@
 from ..domain.gateway import BookManager
 from ..infrastructure.config import Config
 from ..infrastructure.database import MySQLPersistence
+from ..infrastructure.mq import KafkaQueue, MQHelper
 
 
 class WireHelper:
-    def __init__(self, sqlPersistence: MySQLPersistence):
+    def __init__(self, sqlPersistence: MySQLPersistence, mq: KafkaQueue):
         self.sqlPersistence = sqlPersistence
+        self.mq = mq
 
     @classmethod
     def new(cls, c: Config):
         db = MySQLPersistence(c.db, c.app.page_size)
-        return cls(db)
+        mq = KafkaQueue(c.mq.brokers, c.mq.topic)
+        return cls(db, mq)
 
     def book_manager(self) -> BookManager:
         return self.sqlPersistence
+
+    def message_queue_helper(self) -> MQHelper:
+        return self.mq

Tune web/application/executor/book_operator.py:

@@ -1,15 +1,19 @@
+from dataclasses import asdict
 from datetime import datetime
-from typing import List
+import json
+from typing import Dict, List
 
 from .. import dto
 from ...domain.model import Book
 from ...domain.gateway import BookManager
+from ...infrastructure.mq import MQHelper
 
 
 class BookOperator():
 
-    def __init__(self, book_manager: BookManager):
+    def __init__(self, book_manager: BookManager, mq_helper: MQHelper):
         self.book_manager = book_manager
+        self.mq_helper = mq_helper
 
     def create_book(self, b: dto.Book) -> Book:
         book = Book(id=0, created_at=datetime.now(), **b.__dict__)
@@ -18,4 +22,16 @@ class BookOperator():
         return book
 
     def get_books(self, offset: int, query: str) -> List[Book]:
-        return self.book_manager.get_books(offset, query)
+        books = self.book_manager.get_books(offset, query)
+        # Send search query and its results
+        if query:
+            json_data = json.dumps([_convert(b)
+                                   for b in books]).encode('utf-8')
+            self.mq_helper.send_event(query, json_data)
+        return books
+
+
+def _convert(b: Book) -> Dict:
+    d = asdict(b)
+    d['created_at'] = d['created_at'].strftime('%Y-%m-%d %H:%M:%S')
+    return d

Tune web/adapter/router.py:

@@ -33,7 +33,8 @@ class RestHandler:
 def make_router(app: FastAPI, templates_dir: str, wire_helper: WireHelper):
     rest_handler = RestHandler(
         logging.getLogger("lr-event"),
-        BookOperator(wire_helper.book_manager())
+        BookOperator(wire_helper.book_manager(),
+                     wire_helper.message_queue_helper())
     )
 
     templates = Jinja2Templates(directory=templates_dir)

Restart your server and try some searches:

uvicorn web.main:app --reload

You‘ll see some kafka logs similar to this:

[2024-04-10 07:01:51,147] INFO Sent auto-creation request for Set(lr-book-searches) to the active controller. (kafka.server.DefaultAutoTopicCreationManager)
[2024-04-10 07:01:51,164] INFO [QuorumController id=0] CreateTopics result(s): CreatableTopic(name='lr-book-searches', numPartitions=1, replicationFactor=1, assignments=[], configs=[]): SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-10 07:01:51,165] INFO [QuorumController id=0] Replayed TopicRecord for topic lr-book-searches with topic ID cyQQePRVSjOHCbT5zKthQw. (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-10 07:01:51,165] INFO [QuorumController id=0] Replayed PartitionRecord for new partition lr-book-searches-0 with topic ID cyQQePRVSjOHCbT5zKthQw and PartitionRegistration(replicas=[0], directories=[JNQSQwG010NmYpIsyAYTzw], isr=[0], removingReplicas=[], addingReplicas=[], elr=[], lastKnownElr=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=0, partitionEpoch=0). (org.apache.kafka.controller.ReplicationControlManager)
[2024-04-10 07:01:51,196] INFO [Broker id=0] Transitioning 1 partition(s) to local leaders. (state.change.logger)
[2024-04-10 07:01:51,198] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(lr-book-searches-0) (kafka.server.ReplicaFetcherManager)
[2024-04-10 07:01:51,200] INFO [Broker id=0] Creating new partition lr-book-searches-0 with topic id cyQQePRVSjOHCbT5zKthQw. (state.change.logger)
[2024-04-10 07:01:51,217] INFO [LogLoader partition=lr-book-searches-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.UnifiedLog$)
[2024-04-10 07:01:51,220] INFO Created log for partition lr-book-searches-0 in /bitnami/kafka/data/lr-book-searches-0 with properties {} (kafka.log.LogManager)
[2024-04-10 07:01:51,222] INFO [Partition lr-book-searches-0 broker=0] No checkpointed highwatermark is found for partition lr-book-searches-0 (kafka.cluster.Partition)
[2024-04-10 07:01:51,224] INFO [Partition lr-book-searches-0 broker=0] Log loaded for partition lr-book-searches-0 with initial high watermark 0 (kafka.cluster.Partition)
[2024-04-10 07:01:51,227] INFO [Broker id=0] Leader lr-book-searches-0 with topic id Some(cyQQePRVSjOHCbT5zKthQw) starts at leader epoch 0 from offset 0 with partition epoch 0, high watermark 0, ISR [0], adding replicas [] and removing replicas [] . Previous leader None and previous leader epoch was -1. (state.change.logger)
PrevNext