Event Consumer
Tune service/domain/model/trend.py:
@@ -9,4 +9,3 @@ from .book import Book
class Trend:
query: str
books: List[Book]
- created_at: datetime | None
We‘ll use the timestamp from the kafka event for time tracking, so let‘s remove this field.
Tune service/trend/domain/gateway/trend_manager.py:
@@ -1,8 +1,10 @@
from abc import ABC, abstractmethod
-from typing import List
+from typing import Callable, List
from ....domain.model import Trend
+ConsumeCallback = Callable[[bytes, bytes], None]
+
class TrendManager(ABC):
@abstractmethod
@@ -10,5 +12,15 @@ class TrendManager(ABC):
pass
@abstractmethod
- def top_trends(self, offset: int) -> List[Trend]:
+ def top_trends(self, page_size: int) -> List[Trend]:
+ pass
+
+
+class TrendEventConsumer(ABC):
+ @abstractmethod
+ def consume_events(self, callback: ConsumeCallback):
+ pass
+
+ @abstractmethod
+ def stop(self):
pass
Implement TrendEventConsumer
in service/trend/infrastructure/mq/kafka.py:
import sys
from typing import List
from confluent_kafka import Consumer, KafkaException
from ...domain.gateway import TrendEventConsumer, ConsumeCallback
GROUP_ID = "trend-svr"
class KafkaConsumer(TrendEventConsumer):
def __init__(self, brokers: List[str], topic: str):
self.consumer = Consumer(
{'bootstrap.servers': ','.join(brokers),
'group.id': GROUP_ID,
'auto.offset.reset': 'smallest'})
self.topic = topic
self.running = False
def consume_events(self, callback: ConsumeCallback):
try:
self.consumer.subscribe([self.topic])
self.running = True
while self.running:
msg = self.consumer.poll(timeout=1.0) # Poll for messages
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
# End of partition
sys.stderr.write('%% {} [{}] reached end at offset {} - {}\n'.format(
msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
# Process message
callback(msg.key(), msg.value())
finally:
self.consumer.close()
def stop(self):
self.running = False
Tune TrendManager
implementations in service/trend/infrastructure/cache/redis.py:
@@ -29,13 +29,13 @@ class RedisCache(TrendManager):
self.c.set(k, results)
return score
- def top_trends(self, offset: int) -> List[Trend]:
+ def top_trends(self, page_size: int) -> List[Trend]:
top_items: Any = self.c.zrevrange(
- trends_key, 0, offset, withscores=True)
+ trends_key, 0, page_size - 1, withscores=True)
trends = []
for item in top_items:
query = item[0]
- t = Trend(query=query, books=[], created_at=None)
+ t = Trend(query=query, books=[])
k = query_key_prefix + query
value: Any = self.c.get(k)
if value is not None:
Tune service/trend/application/wire_helper.py:
@@ -1,17 +1,23 @@
-from ..domain.gateway import TrendManager
+from ..domain.gateway import TrendManager, TrendEventConsumer
from ..infrastructure.config import Config
from ..infrastructure.cache import RedisCache
+from ..infrastructure.mq import KafkaConsumer
class WireHelper:
- def __init__(self, kvStore: RedisCache):
+ def __init__(self, kvStore: RedisCache, consumer: KafkaConsumer):
self.kvStore = kvStore
+ self.consumer = consumer
@classmethod
def new(cls, c: Config):
kv = RedisCache(c.cache.host, c.cache.port,
c.cache.password, c.cache.db)
- return cls(kv)
+ consumer = KafkaConsumer(c.mq.brokers, c.mq.topic)
+ return cls(kv, consumer)
def trend_manager(self) -> TrendManager:
return self.kvStore
+
+ def trend_event_consumer(self) -> TrendEventConsumer:
+ return self.consumer
Add config items in service/trend/infrastructure/config/config.py:
@@ -1,8 +1,15 @@
from dataclasses import dataclass
+from typing import List
import yaml
+@dataclass
+class MQConfig:
+ brokers: List[str]
+ topic: str
+
+
@dataclass
class CacheConfig:
host: str
@@ -14,11 +21,14 @@ class CacheConfig:
@dataclass
class Config:
cache: CacheConfig
+ mq: MQConfig
def parseConfig(filename: str) -> Config:
with open(filename, 'r') as f:
data = yaml.safe_load(f)
return Config(
- CacheConfig(**data['cache'])
+ CacheConfig(**data['cache']),
+ MQConfig(**data['mq'])
+
)
Put in config values in service/web/config.yml:
@@ -3,3 +3,7 @@ cache:
port: 6379
password: test_pass
db: 0
+mq:
+ brokers:
+ - localhost:9094
+ topic: lr-book-searches
We‘re using port
9094
for external access to the kafka running in a docker container.
See “Accessing Apache Kafka with internal and external clients“ configuration inbitnami/kafka
.
Add service/trend/application/consumer/trend.py:
import json
from ....domain.model import Trend
from ...domain.gateway import TrendManager, TrendEventConsumer
class TrendConsumer():
def __init__(self, trend_manager: TrendManager, event_consumer: TrendEventConsumer):
self.trend_manager = trend_manager
self.event_consumer = event_consumer
def start(self):
def process_event(key: bytes, data: bytes):
t = Trend(query=key.decode('utf-8'), books=json.loads(data))
self.trend_manager.create_trend(t)
self.event_consumer.consume_events(process_event)
def get_event_consumer(self) -> TrendEventConsumer:
return self.event_consumer
Tune service/trend/application/executor/trend_operator.py:
@@ -9,8 +9,5 @@ class TrendOperator():
def __init__(self, trend_manager: TrendManager):
self.trend_manager = trend_manager
- def create_trend(self, t: Trend) -> int:
- return self.trend_manager.create_trend(t)
-
- def top_trends(self, offset: int) -> List[Trend]:
- return self.trend_manager.top_trends(offset)
+ def top_trends(self, page_size: int) -> List[Trend]:
+ return self.trend_manager.top_trends(page_size)
Tune service/trend/adapter/router.py:
@@ -11,9 +11,9 @@ class RestHandler:
self._logger = logger
self.trend_operator = trend_operator
- def get_trends(self, offset: int):
+ def get_trends(self, page_size: int):
try:
- return self.trend_operator.top_trends(offset)
+ return self.trend_operator.top_trends(page_size)
except Exception as e:
self._logger.error(f"Failed to get trends: {e}")
raise HTTPException(status_code=404, detail="Failed to get trends")
@@ -26,5 +26,5 @@ def make_router(app: FastAPI, wire_helper: WireHelper):
)
@app.get("/trends")
- async def get_trends(o: int = 0):
- return rest_handler.get_trends(o)
+ async def get_trends(ps: int = 10):
+ return rest_handler.get_trends(ps)
Modify main.py to allow both the consumer and the router stop gracefully:
@@ -1,12 +1,35 @@
+from contextlib import asynccontextmanager
+import threading
+
from fastapi import FastAPI
from .adapter.router import make_router
from .application import WireHelper
+from .application.consumer import TrendConsumer
from .infrastructure.config import parseConfig
CONFIG_FILENAME = "service/trend/config.yml"
c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
-app = FastAPI()
+
+# Run the consumer
+tc = TrendConsumer(wire_helper.trend_manager(),
+ wire_helper.trend_event_consumer())
+event_consumer = tc.get_event_consumer()
+consumer_thread = threading.Thread(target=tc.start)
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ # Run at startup
+ consumer_thread.start()
+ yield
+ # Run on shutdown
+ event_consumer.stop()
+ consumer_thread.join()
+ print("Event consumer is closed! Bye!")
+
+# Run the FastAPI server
+app = FastAPI(lifespan=lifespan)
make_router(app, wire_helper)
The kafka consumer and the FastAPI server are 2 threads running in the same process. We use FastAPI‘s lifespan
function to manually stop both of them gracefully.
Tip: Try to use
lifespan
function to solve this issue when shutdowning the web service:
“Producer terminating with 1 message (248 bytes) still in queue or transit: use flush() to wait for outstanding message delivery“
Remember to create or modify __.init__.py files to re-export the symbols.
- service/trend/domain/gateway/__init__.py
- service/trend/application/consumer/__init__.py
- service/trend/infrastructure/mq/__init__.py
Start both the web service and the trend service. Try searching keywords such as “love“, “war“ and “wealth“ in the web service‘s index page http://localhost:8000/?q=wealth.
Then you should be able to see some results like this from the trend service (http://localhost:8001/trends):
[
{
"query": "love",
"books": [
{
"id": 4,
"title": "Pride and Prejudice",
"author": "Jane Austen",
"published_at": "1813-01-28",
"description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
"created_at": "2024-04-02T21:02:59.314+08:00"
},
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
}
]
},
{
"query": "war",
"books": [
{
"id": 10,
"title": "War and Peace",
"author": "Leo Tolstoy",
"published_at": "1869-01-01",
"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
"created_at": "2024-04-02T21:02:59.42+08:00"
},
{
"id": 12,
"title": "The Odyssey",
"author": "Homer",
"published_at": "8th Century BC",
"description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War.",
"created_at": "2024-04-02T21:02:59.455+08:00"
}
]
},
{
"query": "wealth",
"books": [
{
"id": 1,
"title": "The Great Gatsby",
"author": "F. Scott Fitzgerald",
"published_at": "1925-04-10",
"description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
"created_at": "2024-04-02T20:36:01.015+08:00"
}
]
}
]