Event Consumer
Since the recommendation service‘s consumer has so many things in common with the trend service‘s counterpart, let‘s try to reuse them by moving those common parts into the higher level folders.
Restructure files
- Tune service/trend/domain/gateway/trend_manager.py:
@@ -1,10 +1,8 @@
from abc import ABC, abstractmethod
-from typing import Callable, List
+from typing import List
from ....domain.model import Trend
-ConsumeCallback = Callable[[bytes, bytes], None]
-
class TrendManager(ABC):
@abstractmethod
@@ -14,13 +12,3 @@ class TrendManager(ABC):
@abstractmethod
def top_trends(self, page_size: int) -> List[Trend]:
pass
-
-
-class TrendEventConsumer(ABC):
- @abstractmethod
- def consume_events(self, callback: ConsumeCallback):
- pass
-
- @abstractmethod
- def stop(self):
- pass
Move the common TrendEventConsumer
and ConsumeCallback
types out of this file.
- Put those types in service/domain/gateway/event_consumer.py:
from abc import ABC, abstractmethod
from typing import Callable
ConsumeCallback = Callable[[bytes, bytes], None]
class TrendEventConsumer(ABC):
@abstractmethod
def consume_events(self, callback: ConsumeCallback):
pass
@abstractmethod
def stop(self):
pass
- Move service/trend/infrastructure/mq/kafka.py to service/infrastructure/mq/kafka_consumer.py.
class KafkaConsumer(TrendEventConsumer):
def __init__(self, brokers: List[str], topic: str, group_id: str):
Add a new parameter named group_id
for the consumer group.
- Add config items in service/trend/infrastructure/config/config.py:
@@ -8,6 +8,7 @@ import yaml
class MQConfig:
brokers: List[str]
topic: str
+ group_id: str
@dataclass
- Put values in service/trend/config.yml:
@@ -7,3 +7,4 @@ mq:
brokers:
- localhost:9094
topic: lr-book-searches
+ group_id: trend-svr
- Tune import pathes in service/trend/application/consumer/trend.py:
@@ -1,7 +1,8 @@
import json
from ....domain.model import Trend
-from ...domain.gateway import TrendManager, TrendEventConsumer
+from ...domain.gateway import TrendManager
+from ....domain.gateway import TrendEventConsumer
class TrendConsumer():
- Tune import paths in service/trend/application/wire_helper.py:
@@ -1,7 +1,8 @@
-from ..domain.gateway import TrendManager, TrendEventConsumer
+from ..domain.gateway import TrendManager
+from ...domain.gateway import TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.cache import RedisCache
-from ..infrastructure.mq import KafkaConsumer
+from ...infrastructure.mq import KafkaConsumer
class WireHelper:
- Remember to create or update __init__.py files:
- service/domain/gateway/__init__.py
- service/infrastructure/mq/__init__.py
- service/trend/domain/gateway/__init__.py
Tune the producer‘s events in the web service
Add the user_id
parameter in service/web/application/executor/book_operator.py:
@@ -23,13 +23,14 @@ class BookOperator():
book.id = id
return book
- def get_books(self, offset: int, query: str) -> List[Book]:
+ def get_books(self, offset: int, user_id: str, query: str) -> List[Book]:
books = self.book_manager.get_books(offset, query)
- # Send search query and its results
+ # Send a user's search query and its results
if query:
+ k = query + ':' + user_id
json_data = json.dumps([_convert(b)
for b in books]).encode('utf-8')
- self.mq_helper.send_event(query, json_data)
+ self.mq_helper.send_event(k, json_data)
return books
def get_trends(self, trend_url: str) -> List[Trend]:
Read the cookie value and pass in as the user_id
in service/web/adapter/router.py:
@@ -1,4 +1,7 @@
import logging
+import random
+import string
+import time
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
@@ -10,6 +13,16 @@ from ...domain.model import Book
from ..infrastructure.config.config import RemoteServiceConfig
+FIELD_UID = "uid"
+
+
+def random_string(length):
+ random.seed(time.time()) # Using time as seed
+ charset = string.ascii_uppercase + "0123456789"
+ result = [random.choice(charset) for _ in range(length)]
+ return ''.join(result)
+
+
class RestHandler:
def __init__(self, logger: logging.Logger, remote: RemoteServiceConfig, book_operator: BookOperator):
self._logger = logger
@@ -23,9 +36,9 @@ class RestHandler:
self._logger.error(f"Failed to create: {e}")
raise HTTPException(status_code=400, detail="Failed to create")
- def get_books(self, offset: int, query: str):
+ def get_books(self, offset: int, user_id: str, query: str):
try:
- books = self.book_operator.get_books(offset, query)
+ books = self.book_operator.get_books(offset, user_id, query)
return books
except Exception as e:
self._logger.error(f"Failed to get books: {e}")
@@ -44,10 +57,17 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
@app.get("/", response_class=HTMLResponse)
async def index_page(request: Request, q: str = ""):
- books = rest_handler.book_operator.get_books(0, q)
- trends = rest_handler.book_operator.get_trends(
- rest_handler.remote.trend_url)
- return templates.TemplateResponse(
+ user_id = request.cookies.get(FIELD_UID)
+ if not user_id:
+ user_id = random_string(5)
+ books = rest_handler.book_operator.get_books(0, user_id, q)
+ try:
+ trends = rest_handler.book_operator.get_trends(
+ rest_handler.remote.trend_url)
+ except Exception as e:
+ rest_handler._logger.warn(f"Failed to get trends: {e}")
+ trends = []
+ resp = templates.TemplateResponse(
name="index.html", context={
"request": request,
"title": "LiteRank Book Store",
@@ -56,6 +76,8 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
"q": q,
}
)
+ resp.set_cookie(FIELD_UID, user_id, 3600*24*30)
+ return resp
@app.post("/api/books", response_model=Book)
async def create_book(b: dto.Book):
@@ -63,4 +85,4 @@ def make_router(app: FastAPI, templates_dir: str, remote: RemoteServiceConfig, w
@app.get("/api/books")
async def get_books(o: int = 0, q: str = ""):
- return rest_handler.get_books(o, q)
+ return rest_handler.get_books(o, "", q)
Tune the consumer in the trend service
Since the key
‘s format has changed, we need to adapt to that in the trend service.
Update service/trend/application/consumer/trend.py:
@@ -13,7 +13,9 @@ class TrendConsumer():
def start(self):
def process_event(key: bytes, data: bytes):
- t = Trend(query=key.decode('utf-8'), books=json.loads(data))
+ parts = key.decode('utf-8').split(':')
+ query = parts[0]
+ t = Trend(query=query, books=json.loads(data))
self.trend_manager.create_trend(t)
self.event_consumer.consume_events(process_event)
Add group_id
in service/trend/application/wire_helper.py:
@@ -14,7 +14,7 @@ class WireHelper:
def new(cls, c: Config):
kv = RedisCache(c.cache.host, c.cache.port,
c.cache.password, c.cache.db)
- consumer = KafkaConsumer(c.mq.brokers, c.mq.topic)
+ consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
return cls(kv, consumer)
def trend_manager(self) -> TrendManager:
Recommendation Service‘s consumer
Add service/domain/model/interest.py:
from dataclasses import dataclass
@dataclass
class Interest:
user_id: str
title: str
author: str
score: float
Add service/recommendation/domain/gateway/interest_manager.py:
from abc import ABC, abstractmethod
from typing import List
from ....domain.model import Interest
class InterestManager(ABC):
@abstractmethod
def increase_interest(self, i: Interest):
pass
@abstractmethod
def list_interests(self, user_id: str) -> List[Interest]:
pass
Install mongo
dependencies:
pip3 install pymongo
Prepare the mongoDB database:
- Install mongoDB on your machine and start it.
- Create a database named
lr_event_rec
.- Create necessary indexes for your collections.
Implement the InterestManager
in service/recommendation/infrastructure/database/mongo.py:
from typing import List
from pymongo import MongoClient, DESCENDING
from ...domain.gateway import InterestManager
from ....domain.model import Interest
COLL_REVIEW = "interests"
class MongoPersistence(InterestManager):
def __init__(self, uri: str, db_name: str, page_size: int):
self.client = MongoClient(uri)
self.db = self.client[db_name]
self.coll = self.db[COLL_REVIEW]
self.page_size = page_size
def increase_interest(self, i: Interest):
filter_query = {
"user_id": i.user_id,
"title": i.title,
"author": i.author,
}
update_query = {
"$inc": {"score": 1}
}
self.coll.update_one(filter_query, update_query, upsert=True)
def list_interests(self, user_id: str) -> List[Interest]:
filter_query = {"user_id": user_id}
cursor = self.coll.find(filter_query).sort(
"score", DESCENDING).limit(self.page_size)
return list(cursor)
Add service/recommendation/infrastructure/config/config.py:
from dataclasses import dataclass
from typing import List
import yaml
@dataclass
class DBConfig:
mongo_uri: str
mongo_db_name: str
@dataclass
class MQConfig:
brokers: List[str]
topic: str
group_id: str
@dataclass
class ApplicationConfig:
page_size: int
@dataclass
class Config:
app: ApplicationConfig
db: DBConfig
mq: MQConfig
def parseConfig(filename: str) -> Config:
with open(filename, 'r') as f:
data = yaml.safe_load(f)
return Config(
ApplicationConfig(**data['app']),
DBConfig(**data['db']),
MQConfig(**data['mq'])
)
Add service/recommendation/config.yml:
app:
page_size: 10
db:
mongo_uri: "mongodb://localhost:27017"
mongo_db_name: "lr_event_rec"
mq:
brokers:
- localhost:9094
topic: "lr-book-searches"
group_id: "rec-svr"
Add service/recommendation/application/consumer/interest.py:
import json
from typing import Dict, List
from ....domain.model import Interest
from ...domain.gateway import InterestManager
from ....domain.gateway import TrendEventConsumer
class InterestConsumer():
def __init__(self, interest_manager: InterestManager, event_consumer: TrendEventConsumer):
self.interest_manager = interest_manager
self.event_consumer = event_consumer
def start(self):
def process_event(key: bytes, data: bytes):
parts = key.decode('utf-8').split(':')
if len(parts) == 1:
# no user_id, ignore it
return
books: List[Dict] = json.loads(data)
user_id = parts[1]
for b in books:
self.interest_manager.increase_interest(Interest(
user_id=user_id,
title=b['title'],
author=b['author'],
score=0
))
self.event_consumer.consume_events(process_event)
def get_event_consumer(self) -> TrendEventConsumer:
return self.event_consumer
Add service/recommendation/application/wire_helper.py:
from ..domain.gateway import InterestManager
from ...domain.gateway import TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.database import MongoPersistence
from ...infrastructure.mq import KafkaConsumer
class WireHelper:
def __init__(self, noSQLPersistence: MongoPersistence, consumer: KafkaConsumer):
self.noSQLPersistence = noSQLPersistence
self.consumer = consumer
@classmethod
def new(cls, c: Config):
mdb = MongoPersistence(
c.db.mongo_uri, c.db.mongo_db_name, c.app.page_size)
consumer = KafkaConsumer(c.mq.brokers, c.mq.topic, c.mq.group_id)
return cls(mdb, consumer)
def interest_manager(self) -> InterestManager:
return self.noSQLPersistence
def trend_event_consumer(self) -> TrendEventConsumer:
return self.consumer
Add service/recommendation/main.py:
import signal
from .application import WireHelper
from .application.consumer import InterestConsumer
from .infrastructure.config import parseConfig
CONFIG_FILENAME = "service/recommendation/config.yml"
c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
# Run the consumer
tc = InterestConsumer(wire_helper.interest_manager(),
wire_helper.trend_event_consumer())
event_consumer = tc.get_event_consumer()
def sigterm_handler(signal, frame):
event_consumer.stop()
print("Consumer stopped. Exiting gracefully...")
signal.signal(signal.SIGTERM, sigterm_handler)
signal.signal(signal.SIGINT, sigterm_handler)
print("Started to consume events...")
tc.start()
Remember to create or update __init__.py files:
- service/domain/model/__init__.py
- service/recommendation/application/__init__.py
- service/recommendation/application/consumer/__init__.py
- service/recommendation/domain/gateway/__init__.py
- service/recommendation/infrastructure/config/__init__.py
- service/recommendation/infrastructure/database/__init__.py
Latest requirements.txt:
annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
dnspython==2.6.1
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
pymongo==4.6.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0
Start the recommendation consumer:
python3 -m service.recommendation.main
You should see something similar to this:
Started to consume events...
From another terminal, restart your web service and try searching with terms like “love“ and “war“ a couple of times.
Then, you will have records like below in your mongoDB:
lr_event_rec> db.interests.find();
[
{
_id: ObjectId('6617c3df9f5e3692de429083'),
title: 'The Catcher in the Rye',
author: 'J.D. Salinger',
user_id: 'C32WB',
score: 1
},
{
_id: ObjectId('6617c3ef9f5e3692de42908e'),
title: 'Pride and Prejudice',
author: 'Jane Austen',
user_id: 'C32WB',
score: 1
},
{
_id: ObjectId('6617c3ef9f5e3692de429090'),
title: 'War and Peace',
author: 'Leo Tolstoy',
user_id: 'C32WB',
score: 2
}
]
That means your consumer is doing well.