FastAPI API Server
Create service/domain/model/trend.py:
from datetime import datetime
from dataclasses import dataclass
from typing import List
from .book import Book
@dataclass
class Trend:
query: str
books: List[Book]
created_at: datetime | None
Create service/trend/domain/gateway/trend_manager.py:
from abc import ABC, abstractmethod
from typing import List
from ....domain.model import Trend
class TrendManager(ABC):
@abstractmethod
def create_trend(self, t: Trend) -> int:
pass
@abstractmethod
def top_trends(self, offset: int) -> List[Trend]:
pass
In Redis, a ZSET (sorted set) is a data structure that combines the features of both sets and sorted lists. It is similar to a regular set, but it also maintains a sorting order based on a score associated with each member. This score allows members to be sorted in ascending or descending order.
So, we can use a ZSET to store the trends.
Install redis dependency:
pip3 install redis
Remember to update requirements.txt:
annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0
Create service/trend/infrastructure/cache/redis.py:
import json
from typing import Any, List
import redis
from ...domain.gateway import TrendManager
from ....domain.model import Trend
trends_key = "trends"
query_key_prefix = "q-"
class RedisCache(TrendManager):
def __init__(self, host: str, port: int, password: str, db: int):
self.c = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
def create_trend(self, t: Trend) -> int:
member = t.query
score: Any = self.c.zincrby(trends_key, 1, member)
k = query_key_prefix + t.query
results = json.dumps(t.books)
self.c.set(k, results)
return score
def top_trends(self, offset: int) -> List[Trend]:
top_items: Any = self.c.zrevrange(
trends_key, 0, offset, withscores=True)
trends = []
for item in top_items:
query = item[0]
t = Trend(query=query, books=[], created_at=None)
k = query_key_prefix + query
value: Any = self.c.get(k)
if value is not None:
t.books = json.loads(value)
trends.append(t)
return trends
Create service/trend/infrastructure/config/config.py:
from dataclasses import dataclass
import yaml
@dataclass
class CacheConfig:
host: str
port: int
password: str
db: int
@dataclass
class Config:
cache: CacheConfig
def parseConfig(filename: str) -> Config:
with open(filename, 'r') as f:
data = yaml.safe_load(f)
return Config(
CacheConfig(**data['cache'])
)
Create service/trend/config.yml:
cache:
host: 127.0.0.1
port: 6379
password: test_pass
db: 0
Create service/trend/application/wire_helper.py:
from ..domain.gateway import TrendManager
from ..infrastructure.config import Config
from ..infrastructure.cache import RedisCache
class WireHelper:
def __init__(self, kvStore: RedisCache):
self.kvStore = kvStore
@classmethod
def new(cls, c: Config):
kv = RedisCache(c.cache.host, c.cache.port,
c.cache.password, c.cache.db)
return cls(kv)
def trend_manager(self) -> TrendManager:
return self.kvStore
Create service/trend/application/executor/trend_operator.py:
from typing import List
from ....domain.model import Trend
from ...domain.gateway import TrendManager
class TrendOperator():
def __init__(self, trend_manager: TrendManager):
self.trend_manager = trend_manager
def create_trend(self, t: Trend) -> int:
return self.trend_manager.create_trend(t)
def top_trends(self, offset: int) -> List[Trend]:
return self.trend_manager.top_trends(offset)
Create service/trend/adapter/router.py:
import logging
from fastapi import FastAPI, HTTPException
from ..application.executor import TrendOperator
from ..application.wire_helper import WireHelper
class RestHandler:
def __init__(self, logger: logging.Logger, trend_operator: TrendOperator):
self._logger = logger
self.trend_operator = trend_operator
def get_trends(self, offset: int):
try:
return self.trend_operator.top_trends(offset)
except Exception as e:
self._logger.error(f"Failed to get trends: {e}")
raise HTTPException(status_code=404, detail="Failed to get trends")
def make_router(app: FastAPI, wire_helper: WireHelper):
rest_handler = RestHandler(
logging.getLogger("lr-event"),
TrendOperator(wire_helper.trend_manager())
)
@app.get("/trends")
async def get_trends(o: int = 0):
return rest_handler.get_trends(o)
Finally, add service/trend/main.py:
from fastapi import FastAPI
from .adapter.router import make_router
from .application import WireHelper
from .infrastructure.config import parseConfig
CONFIG_FILENAME = "service/trend/config.yml"
c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
app = FastAPI()
make_router(app, wire_helper)
Run the server:
uvicorn service.trend.main:app --port 8001
Try to hit the URL http://localhost:8001/trends with curl.