» Python: Building Event-Driven Microservices with Kafka » 3. Consumer: Trend Service » 3.3 Event Consumer

Event Consumer

Tune service/domain/model/trend.py:

@@ -9,4 +9,3 @@ from .book import Book
 class Trend:
     query: str
     books: List[Book]
-    created_at: datetime | None

We‘ll use the timestamp from the kafka event for time tracking, so let‘s remove this field.

Tune service/trend/domain/gateway/trend_manager.py:

@@ -1,8 +1,10 @@
 from abc import ABC, abstractmethod
-from typing import List
+from typing import Callable, List
 
 from ....domain.model import Trend
 
+ConsumeCallback = Callable[[bytes, bytes], None]
+
 
 class TrendManager(ABC):
     @abstractmethod
@@ -10,5 +12,15 @@ class TrendManager(ABC):
         pass
 
     @abstractmethod
-    def top_trends(self, offset: int) -> List[Trend]:
+    def top_trends(self, page_size: int) -> List[Trend]:
+        pass
+
+
+class TrendEventConsumer(ABC):
+    @abstractmethod
+    def consume_events(self, callback: ConsumeCallback):
+        pass
+
+    @abstractmethod
+    def stop(self):
         pass

Implement TrendEventConsumer in service/trend/infrastructure/mq/kafka.py:

import sys
from typing import List

from confluent_kafka import Consumer, KafkaException

from ...domain.gateway import TrendEventConsumer, ConsumeCallback


GROUP_ID = "trend-svr"


class KafkaConsumer(TrendEventConsumer):
    def __init__(self, brokers: List[str], topic: str):
        self.consumer = Consumer(
            {'bootstrap.servers': ','.join(brokers),
             'group.id': GROUP_ID,
             'auto.offset.reset': 'smallest'})
        self.topic = topic
        self.running = False

    def consume_events(self, callback: ConsumeCallback):
        try:
            self.consumer.subscribe([self.topic])
            self.running = True
            while self.running:
                msg = self.consumer.poll(timeout=1.0)  # Poll for messages
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() == KafkaException._PARTITION_EOF:
                        # End of partition
                        sys.stderr.write('%% {} [{}] reached end at offset {} - {}\n'.format(
                            msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    # Process message
                    callback(msg.key(), msg.value())
        finally:
            self.consumer.close()

    def stop(self):
        self.running = False

Tune TrendManager implementations in service/trend/infrastructure/cache/redis.py:

@@ -29,13 +29,13 @@ class RedisCache(TrendManager):
         self.c.set(k, results)
         return score
 
-    def top_trends(self, offset: int) -> List[Trend]:
+    def top_trends(self, page_size: int) -> List[Trend]:
         top_items: Any = self.c.zrevrange(
-            trends_key, 0, offset, withscores=True)
+            trends_key, 0, page_size - 1, withscores=True)
         trends = []
         for item in top_items:
             query = item[0]
-            t = Trend(query=query, books=[], created_at=None)
+            t = Trend(query=query, books=[])
             k = query_key_prefix + query
             value: Any = self.c.get(k)
             if value is not None:

Tune service/trend/application/wire_helper.py:

@@ -1,17 +1,23 @@
-from ..domain.gateway import TrendManager
+from ..domain.gateway import TrendManager, TrendEventConsumer
 from ..infrastructure.config import Config
 from ..infrastructure.cache import RedisCache
+from ..infrastructure.mq import KafkaConsumer
 
 
 class WireHelper:
-    def __init__(self, kvStore: RedisCache):
+    def __init__(self, kvStore: RedisCache, consumer: KafkaConsumer):
         self.kvStore = kvStore
+        self.consumer = consumer
 
     @classmethod
     def new(cls, c: Config):
         kv = RedisCache(c.cache.host, c.cache.port,
                         c.cache.password, c.cache.db)
-        return cls(kv)
+        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic)
+        return cls(kv, consumer)
 
     def trend_manager(self) -> TrendManager:
         return self.kvStore
+
+    def trend_event_consumer(self) -> TrendEventConsumer:
+        return self.consumer

Add config items in service/trend/infrastructure/config/config.py:

@@ -1,8 +1,15 @@
 from dataclasses import dataclass
+from typing import List
 
 import yaml
 
 
+@dataclass
+class MQConfig:
+    brokers: List[str]
+    topic: str
+
+
 @dataclass
 class CacheConfig:
     host: str
@@ -14,11 +21,14 @@ class CacheConfig:
 @dataclass
 class Config:
     cache: CacheConfig
+    mq: MQConfig
 
 
 def parseConfig(filename: str) -> Config:
     with open(filename, 'r') as f:
         data = yaml.safe_load(f)
         return Config(
-            CacheConfig(**data['cache'])
+            CacheConfig(**data['cache']),
+            MQConfig(**data['mq'])
+
         )

Put in config values in service/web/config.yml:

@@ -3,3 +3,7 @@ cache:
   port: 6379
   password: test_pass
   db: 0
+mq:
+  brokers:
+    - localhost:9094
+  topic: lr-book-searches

We‘re using port 9094 for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration in bitnami/kafka.

Add service/trend/application/consumer/trend.py:

import json

from ....domain.model import Trend
from ...domain.gateway import TrendManager, TrendEventConsumer


class TrendConsumer():

    def __init__(self, trend_manager: TrendManager, event_consumer: TrendEventConsumer):
        self.trend_manager = trend_manager
        self.event_consumer = event_consumer

    def start(self):
        def process_event(key: bytes, data: bytes):
            t = Trend(query=key.decode('utf-8'), books=json.loads(data))
            self.trend_manager.create_trend(t)

        self.event_consumer.consume_events(process_event)

    def get_event_consumer(self) -> TrendEventConsumer:
        return self.event_consumer

Tune service/trend/application/executor/trend_operator.py:

@@ -9,8 +9,5 @@ class TrendOperator():
     def __init__(self, trend_manager: TrendManager):
         self.trend_manager = trend_manager
 
-    def create_trend(self, t: Trend) -> int:
-        return self.trend_manager.create_trend(t)
-
-    def top_trends(self, offset: int) -> List[Trend]:
-        return self.trend_manager.top_trends(offset)
+    def top_trends(self, page_size: int) -> List[Trend]:
+        return self.trend_manager.top_trends(page_size)

Tune service/trend/adapter/router.py:

@@ -11,9 +11,9 @@ class RestHandler:
         self._logger = logger
         self.trend_operator = trend_operator
 
-    def get_trends(self, offset: int):
+    def get_trends(self, page_size: int):
         try:
-            return self.trend_operator.top_trends(offset)
+            return self.trend_operator.top_trends(page_size)
         except Exception as e:
             self._logger.error(f"Failed to get trends: {e}")
             raise HTTPException(status_code=404, detail="Failed to get trends")
@@ -26,5 +26,5 @@ def make_router(app: FastAPI, wire_helper: WireHelper):
     )
 
     @app.get("/trends")
-    async def get_trends(o: int = 0):
-        return rest_handler.get_trends(o)
+    async def get_trends(ps: int = 10):
+        return rest_handler.get_trends(ps)

Modify main.py to allow both the consumer and the router stop gracefully:

@@ -1,12 +1,35 @@
+from contextlib import asynccontextmanager
+import threading
+
 from fastapi import FastAPI
 
 from .adapter.router import make_router
 from .application import WireHelper
+from .application.consumer import TrendConsumer
 from .infrastructure.config import parseConfig
 
 CONFIG_FILENAME = "service/trend/config.yml"
 
 c = parseConfig(CONFIG_FILENAME)
 wire_helper = WireHelper.new(c)
-app = FastAPI()
+
+# Run the consumer
+tc = TrendConsumer(wire_helper.trend_manager(),
+                   wire_helper.trend_event_consumer())
+event_consumer = tc.get_event_consumer()
+consumer_thread = threading.Thread(target=tc.start)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+    # Run at startup
+    consumer_thread.start()
+    yield
+    # Run on shutdown
+    event_consumer.stop()
+    consumer_thread.join()
+    print("Event consumer is closed! Bye!")
+
+# Run the FastAPI server
+app = FastAPI(lifespan=lifespan)
 make_router(app, wire_helper)

The kafka consumer and the FastAPI server are 2 threads running in the same process. We use FastAPI‘s lifespan function to manually stop both of them gracefully.

Tip: Try to use lifespan function to solve this issue when shutdowning the web service:
“Producer terminating with 1 message (248 bytes) still in queue or transit: use flush() to wait for outstanding message delivery“

Remember to create or modify __.init__.py files to re-export the symbols.

  • service/trend/domain/gateway/__init__.py
  • service/trend/application/consumer/__init__.py
  • service/trend/infrastructure/mq/__init__.py

Start both the web service and the trend service. Try searching keywords such as “love“, “war“ and “wealth“ in the web service‘s index page http://localhost:8000/?q=wealth.

Then you should be able to see some results like this from the trend service (http://localhost:8001/trends):

[
  {
    "query": "love",
    "books": [
      {
        "id": 4,
        "title": "Pride and Prejudice",
        "author": "Jane Austen",
        "published_at": "1813-01-28",
        "description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
        "created_at": "2024-04-02T21:02:59.314+08:00"
      },
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      }
    ]
  },
  {
    "query": "war",
    "books": [
      {
        "id": 10,
        "title": "War and Peace",
        "author": "Leo Tolstoy",
        "published_at": "1869-01-01",
        "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
        "created_at": "2024-04-02T21:02:59.42+08:00"
      },
      {
        "id": 12,
        "title": "The Odyssey",
        "author": "Homer",
        "published_at": "8th Century BC",
        "description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
        "created_at": "2024-04-02T21:02:59.455+08:00"
      }
    ]
  },
  {
    "query": "wealth",
    "books": [
      {
        "id": 1,
        "title": "The Great Gatsby",
        "author": "F. Scott Fitzgerald",
        "published_at": "1925-04-10",
        "description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
        "created_at": "2024-04-02T20:36:01.015+08:00"
      }
    ]
  }
]