» Python: Building Event-Driven Microservices with Kafka » 4. Consumer: Recommendation Service » 4.3 FastAPI API Server
FastAPI API Server
Add service/recommendation/application/executor/interest_operator.py:
from typing import List
from ....domain.model import Interest
from ...domain.gateway import InterestManager
class InterestOperator():
def __init__(self, interest_manager: InterestManager):
self.interest_manager = interest_manager
def interests_for_user(self, user_id: str) -> List[Interest]:
return self.interest_manager.list_interests(user_id)
Add service/recommendation/application/executor/__init__.py:
from .interest_operator import InterestOperator
Add service/recommendation/adapter/router.py:
import logging
from fastapi import FastAPI, HTTPException
from ..application.executor import InterestOperator
from ..application.wire_helper import WireHelper
class RestHandler:
def __init__(self, logger: logging.Logger, interest_operator: InterestOperator):
self._logger = logger
self.interest_operator = interest_operator
def get_interests(self, user_id: str):
try:
return self.interest_operator.interests_for_user(user_id)
except Exception as e:
self._logger.error(f"Failed to get interests for {user_id}: {e}")
raise HTTPException(
status_code=404, detail=f"Failed to get interests for {user_id}")
def make_router(app: FastAPI, wire_helper: WireHelper):
rest_handler = RestHandler(
logging.getLogger("lr-event"),
InterestOperator(wire_helper.interest_manager())
)
@app.get("/recommendations")
async def get_interests(uid: str = ""):
return rest_handler.get_interests(uid)
Start FastAPI server as well in service/recommendation/main.py:
@@ -1,5 +1,9 @@
-import signal
+from contextlib import asynccontextmanager
+import threading
+from fastapi import FastAPI
+
+from .adapter.router import make_router
from .application import WireHelper
from .application.consumer import InterestConsumer
from .infrastructure.config import parseConfig
@@ -13,15 +17,20 @@ wire_helper = WireHelper.new(c)
tc = InterestConsumer(wire_helper.interest_manager(),
wire_helper.trend_event_consumer())
event_consumer = tc.get_event_consumer()
+consumer_thread = threading.Thread(target=tc.start)
-def sigterm_handler(signal, frame):
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ # Run at startup
+ print("Started to consume events...")
+ consumer_thread.start()
+ yield
+ # Run on shutdown
event_consumer.stop()
+ consumer_thread.join()
print("Consumer stopped. Exiting gracefully...")
-
-signal.signal(signal.SIGTERM, sigterm_handler)
-signal.signal(signal.SIGINT, sigterm_handler)
-
-print("Started to consume events...")
-tc.start()
+# Run the FastAPI server
+app = FastAPI(lifespan=lifespan)
+make_router(app, wire_helper)
Again, we use the lifespan
function to start/stop the consumer gracefully.
Fix the error in service/recommendation/infrastructure/database/mongo.py:
@@ -28,6 +28,8 @@ class MongoPersistence(InterestManager):
def list_interests(self, user_id: str) -> List[Interest]:
filter_query = {"user_id": user_id}
- cursor = self.coll.find(filter_query).sort(
+ # Exclude the _id field from the result
+ projection = {"_id": 0}
+ cursor = self.coll.find(filter_query, projection).sort(
"score", DESCENDING).limit(self.page_size)
return list(cursor)
Run the server:
uvicorn service.recommendation.main:app --port 8002
Try to hit the URL http://localhost:8002/recommendations?uid=C32WB with curl.
“C32WB“ is my cookie
uid
value. Replace it with yours in the URL.
The result looks like this:
[
{
"title": "War and Peace",
"author": "Leo Tolstoy",
"user_id": "C32WB",
"score": 2
},
{
"title": "The Catcher in the Rye",
"author": "J.D. Salinger",
"user_id": "C32WB",
"score": 1
},
{
"title": "Pride and Prejudice",
"author": "Jane Austen",
"user_id": "C32WB",
"score": 1
}
]
Loading...
> code result goes here