» Python: Building Event-Driven Microservices with Kafka » 4. Consumer: Recommendation Service » 4.2 Event Consumer

Event Consumer

Since the recommendation service‘s consumer has so many things in common with the trend service‘s counterpart, let‘s try to reuse them by moving those common parts into the higher level folders.

Restructure files

  • Tune service/trend/domain/gateway/trend_manager.py:
@@ -1,10 +1,8 @@
 from abc import ABC, abstractmethod
-from typing import Callable, List
+from typing import List
 
 from ....domain.model import Trend
 
-ConsumeCallback = Callable[[bytes, bytes], None]
-
 
 class TrendManager(ABC):
     @abstractmethod
@@ -14,13 +12,3 @@ class TrendManager(ABC):
     @abstractmethod
     def top_trends(self, page_size: int) -> List[Trend]:
         pass
-
-
-class TrendEventConsumer(ABC):
-    @abstractmethod
-    def consume_events(self, callback: ConsumeCallback):
-        pass
-
-    @abstractmethod
-    def stop(self):
-        pass

Move the common TrendEventConsumer and ConsumeCallback types out of this file.

  • Put those types in service/domain/gateway/event_consumer.py:
from abc import ABC, abstractmethod
from typing import Callable


ConsumeCallback = Callable[[bytes, bytes], None]


class TrendEventConsumer(ABC):
    @abstractmethod
    def consume_events(self, callback: ConsumeCallback):
        pass

    @abstractmethod
    def stop(self):
        pass
  • Move service/trend/infrastructure/mq/kafka.py to service/infrastructure/mq/kafka_consumer.py.
class KafkaConsumer(TrendEventConsumer):
    def __init__(self, brokers: List[str], topic: str, group_id: str):

Add a new parameter named group_id for the consumer group.

  • Add config items in service/trend/infrastructure/config/config.py:
@@ -8,6 +8,7 @@ import yaml
 class MQConfig:
     brokers: List[str]
     topic: str
+    group_id: str
 
 
 @dataclass
  • Put values in service/trend/config.yml:
@@ -7,3 +7,4 @@ mq:
   brokers:
     - localhost:9094
   topic: lr-book-searches
+  group_id: trend-svr
  • Tune import pathes in service/trend/application/consumer/trend.py:
@@ -1,7 +1,8 @@
 import json
 
 from ....domain.model import Trend
-from ...domain.gateway import TrendManager, TrendEventConsumer
+from ...domain.gateway import TrendManager
+from ....domain.gateway import TrendEventConsumer
 
 
 class TrendConsumer():
  • Tune import paths in service/trend/application/wire_helper.py:
@@ -1,7 +1,8 @@
-from ..domain.gateway import TrendManager, TrendEventConsumer
+from ..domain.gateway import TrendManager
+from ...domain.gateway import TrendEventConsumer
 from ..infrastructure.config import Config
 from ..infrastructure.cache import RedisCache
-from ..infrastructure.mq import KafkaConsumer
+from ...infrastructure.mq import KafkaConsumer
 
 
 class WireHelper:
  • Remember to create or update __init__.py files:
    • service/domain/gateway/__init__.py
    • service/infrastructure/mq/__init__.py
    • service/trend/domain/gateway/__init__.py

Tune the producer‘s events in the web service

Add the user_id parameter in service/web/application/executor/book_operator.py:

@@ -23,13 +23,14 @@ class BookOperator():
         book.id = id
         return book
 
-    def get_books(self, offset: int, query: str) -> List[Book]:
+    def get_books(self, offset: int, user_id: str, query: str) -> List[Book]:
         books = self.book_manager.get_books(offset, query)
-        # Send search query and its results
+        # Send a user's search query and its results
         if query:
+            k = query + ':' + user_id
             json_data = json.dumps([_convert(b)
                                    for b in books]).encode('utf-8')
-            self.mq_helper.send_event(query, json_data)
+            self.mq_helper.send_event(k, json_data)
         return books
 
     def get_trends(self, trend_url: str) -> List[Trend]:

Read the cookie value and pass in as the user_id in service/web/adapter/router.py:

@@ -1,4 +1,7 @@
 import logging
+import random
+import string
+import time
 
 from fastapi import FastAPI, HTTPException, Request
 from fastapi.responses import HTMLResponse
@@ -10,6 +13,16 @@ from ...domain.model import Book
 from ..infrastructure.config.config import RemoteServiceConfig
 
 
+FIELD_UID = "uid"
+
+
+def random_string(length):
+    random.seed(time.time())  # Using time as seed
+    charset = string.ascii_uppercase + "0123456789"
+    result = [random.choice(charset) for _ in range(length)]
+    return ''.join(result)
+
+
 class RestHandler:
     def __init__(self, logger: logging.Logger, remote: RemoteServiceConfig, book_operator: BookOperator):
         self._logger = logger
@@ -23,9 +36,9 @@ class RestHandler:
             self._logger.error(f"Failed to create: {e}")
             raise HTTPException(status_code=400, detail="Failed to create")
 
-    def get_books(self, offset: int, query: str):
+    def get_books(self, offset: int, user_id: str, query: str):
         try:
-            books = self.book_operator.get_books(offset, query)
+            books = self.book_operator.get_books(offset, user_id, query)
             return books
         except Exception as e:
             self._logger.error(f"Failed to get books: {e}")
@@ -44,10 +57,17 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
 
     @app.get("/", response_class=HTMLResponse)
     async def index_page(request: Request, q: str = ""):
-        books = rest_handler.book_operator.get_books(0, q)
-        trends = rest_handler.book_operator.get_trends(
-            rest_handler.remote.trend_url)
-        return templates.TemplateResponse(
+        user_id = request.cookies.get(FIELD_UID)
+        if not user_id:
+            user_id = random_string(5)
+        books = rest_handler.book_operator.get_books(0, user_id, q)
+        try:
+            trends = rest_handler.book_operator.get_trends(
+                rest_handler.remote.trend_url)
+        except Exception as e:
+            rest_handler._logger.warn(f"Failed to get trends: {e}")
+            trends = []
+        resp = templates.TemplateResponse(
             name="index.html", context={
                 "request": request,
                 "title": "LiteRank Book Store",
@@ -56,6 +76,8 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
                 "q": q,
             }
         )
+        resp.set_cookie(FIELD_UID, user_id, 3600*24*30)
+        return resp
 
     @app.post("/api/books", response_model=Book)
     async def create_book(b: dto.Book):
@@ -63,4 +85,4 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
 
     @app.get("/api/books")
     async def get_books(o: int = 0, q: str = ""):
-        return rest_handler.get_books(o, q)
+        return rest_handler.get_books(o, "", q)

Tune the consumer in the trend service

Since the key‘s format has changed, we need to adapt to that in the trend service.

Update service/trend/application/consumer/trend.py:

@@ -13,7 +13,9 @@ class TrendConsumer():
 
     def start(self):
         def process_event(key: bytes, data: bytes):
-            t = Trend(query=key.decode('utf-8'), books=json.loads(data))
+            parts = key.decode('utf-8').split(':')
+            query = parts[0]
+            t = Trend(query=query, books=json.loads(data))
             self.trend_manager.create_trend(t)
 
         self.event_consumer.consume_events(process_event)

Add group_id in service/trend/application/wire_helper.py:

@@ -14,7 +14,7 @@ class WireHelper:
     def new(cls, c: Config):
         kv = RedisCache(c.cache.host, c.cache.port,
                         c.cache.password, c.cache.db)
-        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic)
+        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
         return cls(kv, consumer)
 
     def trend_manager(self) -> TrendManager:

Recommendation Service‘s consumer

Add service/domain/model/interest.py:

from dataclasses import dataclass


@dataclass
class Interest:
    user_id: str
    title: str
    author: str
    score: float

Add service/recommendation/domain/gateway/interest_manager.py:

from abc import ABC, abstractmethod
from typing import List

from ....domain.model import Interest


class InterestManager(ABC):
    @abstractmethod
    def increase_interest(self, i: Interest):
        pass

    @abstractmethod
    def list_interests(self, user_id: str) -> List[Interest]:
        pass

Install mongo dependencies:

pip3 install pymongo

Prepare the mongoDB database:

  • Install mongoDB on your machine and start it.
  • Create a database named lr_event_rec.
  • Create necessary indexes for your collections.

Implement the InterestManager in service/recommendation/infrastructure/database/mongo.py:

from typing import List

from pymongo import MongoClient, DESCENDING

from ...domain.gateway import InterestManager
from ....domain.model import Interest

COLL_REVIEW = "interests"


class MongoPersistence(InterestManager):
    def __init__(self, uri: str, db_name: str, page_size: int):
        self.client = MongoClient(uri)
        self.db = self.client[db_name]
        self.coll = self.db[COLL_REVIEW]
        self.page_size = page_size

    def increase_interest(self, i: Interest):
        filter_query = {
            "user_id": i.user_id,
            "title": i.title,
            "author": i.author,
        }
        update_query = {
            "$inc": {"score": 1}
        }
        self.coll.update_one(filter_query, update_query, upsert=True)

    def list_interests(self, user_id: str) -> List[Interest]:
        filter_query = {"user_id": user_id}
        cursor = self.coll.find(filter_query).sort(
            "score", DESCENDING).limit(self.page_size)
        return list(cursor)

Add service/recommendation/infrastructure/config/config.py:

from dataclasses import dataclass
from typing import List
import yaml


@dataclass
class DBConfig:
    mongo_uri: str
    mongo_db_name: str


@dataclass
class MQConfig:
    brokers: List[str]
    topic: str
    group_id: str


@dataclass
class ApplicationConfig:
    page_size: int


@dataclass
class Config:
    app: ApplicationConfig
    db: DBConfig
    mq: MQConfig


def parseConfig(filename: str) -> Config:
    with open(filename, 'r') as f:
        data = yaml.safe_load(f)
        return Config(
            ApplicationConfig(**data['app']),
            DBConfig(**data['db']),
            MQConfig(**data['mq'])
        )

Add service/recommendation/config.yml:

app:
  page_size: 10
db:
  mongo_uri: "mongodb://localhost:27017"
  mongo_db_name: "lr_event_rec"
mq:
  brokers:
    - localhost:9094
  topic: "lr-book-searches"
  group_id: "rec-svr"

Add service/recommendation/application/consumer/interest.py:

import json
from typing import Dict, List

from ....domain.model import Interest
from ...domain.gateway import InterestManager
from ....domain.gateway import TrendEventConsumer


class InterestConsumer():

    def __init__(self, interest_manager: InterestManager, event_consumer: TrendEventConsumer):
        self.interest_manager = interest_manager
        self.event_consumer = event_consumer

    def start(self):
        def process_event(key: bytes, data: bytes):
            parts = key.decode('utf-8').split(':')
            if len(parts) == 1:
                # no user_id, ignore it
                return
            books: List[Dict] = json.loads(data)
            user_id = parts[1]
            for b in books:
                self.interest_manager.increase_interest(Interest(
                    user_id=user_id,
                    title=b['title'],
                    author=b['author'],
                    score=0
                ))
        self.event_consumer.consume_events(process_event)

    def get_event_consumer(self) -> TrendEventConsumer:
        return self.event_consumer

Add service/recommendation/application/wire_helper.py:

from ..domain.gateway import InterestManager
from ...domain.gateway import TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.database import MongoPersistence
from ...infrastructure.mq import KafkaConsumer


class WireHelper:
    def __init__(self, noSQLPersistence: MongoPersistence, consumer: KafkaConsumer):
        self.noSQLPersistence = noSQLPersistence
        self.consumer = consumer

    @classmethod
    def new(cls, c: Config):
        mdb = MongoPersistence(
            c.db.mongo_uri, c.db.mongo_db_name, c.app.page_size)
        consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
        return cls(mdb, consumer)

    def interest_manager(self) -> InterestManager:
        return self.noSQLPersistence

    def trend_event_consumer(self) -> TrendEventConsumer:
        return self.consumer

Add service/recommendation/main.py:

import signal

from .application import WireHelper
from .application.consumer import InterestConsumer
from .infrastructure.config import parseConfig

CONFIG_FILENAME = "service/recommendation/config.yml"

c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)

# Run the consumer
tc = InterestConsumer(wire_helper.interest_manager(),
                      wire_helper.trend_event_consumer())
event_consumer = tc.get_event_consumer()


def sigterm_handler(signal, frame):
    event_consumer.stop()
    print("Consumer stopped. Exiting gracefully...")


signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)

print("Started to consume events...")
tc.start()

Remember to create or update __init__.py files:

  • service/domain/model/__init__.py
  • service/recommendation/application/__init__.py
  • service/recommendation/application/consumer/__init__.py
  • service/recommendation/domain/gateway/__init__.py
  • service/recommendation/infrastructure/config/__init__.py
  • service/recommendation/infrastructure/database/__init__.py

Latest requirements.txt:

annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
dnspython==2.6.1
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
pymongo==4.6.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0

Start the recommendation consumer:

python3 -m service.recommendation.main

You should see something similar to this:

Started to consume events...

From another terminal, restart your web service and try searching with terms like “love“ and “war“ a couple of times.

Then, you will have records like below in your mongoDB:

lr_event_rec> db.interests.find();
[
  {
    _id: ObjectId('6617c3df9f5e3692de429083'),
    title: 'The Catcher in the Rye',
    author: 'J.D. Salinger',
    user_id: 'C32WB',
    score: 1
  },
  {
    _id: ObjectId('6617c3ef9f5e3692de42908e'),
    title: 'Pride and Prejudice',
    author: 'Jane Austen',
    user_id: 'C32WB',
    score: 1
  },
  {
    _id: ObjectId('6617c3ef9f5e3692de429090'),
    title: 'War and Peace',
    author: 'Leo Tolstoy',
    user_id: 'C32WB',
    score: 2
  }
]

That means your consumer is doing well.