» Python: Building Event-Driven Microservices with Kafka » 3. Consumer: Trend Service » 3.2 FastAPI API Server

FastAPI API Server

Create service/domain/model/trend.py:

from datetime import datetime
from dataclasses import dataclass
from typing import List

from .book import Book


@dataclass
class Trend:
    query: str
    books: List[Book]
    created_at: datetime | None

Create service/trend/domain/gateway/trend_manager.py:

from abc import ABC, abstractmethod
from typing import List

from ....domain.model import Trend


class TrendManager(ABC):
    @abstractmethod
    def create_trend(self, t: Trend) -> int:
        pass

    @abstractmethod
    def top_trends(self, offset: int) -> List[Trend]:
        pass

In Redis, a ZSET (sorted set) is a data structure that combines the features of both sets and sorted lists. It is similar to a regular set, but it also maintains a sorting order based on a score associated with each member. This score allows members to be sorted in ascending or descending order.

So, we can use a ZSET to store the trends.

Install redis dependency:

pip3 install redis

Remember to update requirements.txt:

annotated-types==0.6.0
anyio==4.3.0
click==8.1.7
confluent-kafka==2.3.0
fastapi==0.110.1
h11==0.14.0
httptools==0.6.1
idna==3.6
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
pydantic==2.6.4
pydantic_core==2.16.3
python-dotenv==1.0.1
PyYAML==6.0.1
redis==5.0.3
sniffio==1.3.1
starlette==0.37.2
typing_extensions==4.11.0
uvicorn==0.29.0
uvloop==0.19.0
watchfiles==0.21.0
websockets==12.0

Create service/trend/infrastructure/cache/redis.py:

import json
from typing import Any, List

import redis

from ...domain.gateway import TrendManager
from ....domain.model import Trend

trends_key = "trends"
query_key_prefix = "q-"


class RedisCache(TrendManager):
    def __init__(self, host: str, port: int, password: str, db: int):
        self.c = redis.Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            decode_responses=True
        )

    def create_trend(self, t: Trend) -> int:
        member = t.query
        score: Any = self.c.zincrby(trends_key, 1, member)

        k = query_key_prefix + t.query
        results = json.dumps(t.books)
        self.c.set(k, results)
        return score

    def top_trends(self, offset: int) -> List[Trend]:
        top_items: Any = self.c.zrevrange(
            trends_key, 0, offset, withscores=True)
        trends = []
        for item in top_items:
            query = item[0]
            t = Trend(query=query, books=[], created_at=None)
            k = query_key_prefix + query
            value: Any = self.c.get(k)
            if value is not None:
                t.books = json.loads(value)
            trends.append(t)
        return trends

Create service/trend/infrastructure/config/config.py:

from dataclasses import dataclass

import yaml


@dataclass
class CacheConfig:
    host: str
    port: int
    password: str
    db: int


@dataclass
class Config:
    cache: CacheConfig


def parseConfig(filename: str) -> Config:
    with open(filename, 'r') as f:
        data = yaml.safe_load(f)
        return Config(
            CacheConfig(**data['cache'])
        )

Create service/trend/config.yml:

cache:
  host: 127.0.0.1
  port: 6379
  password: test_pass
  db: 0

Create service/trend/application/wire_helper.py:

from ..domain.gateway import TrendManager
from ..infrastructure.config import Config
from ..infrastructure.cache import RedisCache


class WireHelper:
    def __init__(self, kvStore: RedisCache):
        self.kvStore = kvStore

    @classmethod
    def new(cls, c: Config):
        kv = RedisCache(c.cache.host, c.cache.port,
                        c.cache.password, c.cache.db)
        return cls(kv)

    def trend_manager(self) -> TrendManager:
        return self.kvStore

Create service/trend/application/executor/trend_operator.py:

from typing import List

from ....domain.model import Trend
from ...domain.gateway import TrendManager


class TrendOperator():

    def __init__(self, trend_manager: TrendManager):
        self.trend_manager = trend_manager

    def create_trend(self, t: Trend) -> int:
        return self.trend_manager.create_trend(t)

    def top_trends(self, offset: int) -> List[Trend]:
        return self.trend_manager.top_trends(offset)

Create service/trend/adapter/router.py:

import logging

from fastapi import FastAPI, HTTPException

from ..application.executor import TrendOperator
from ..application.wire_helper import WireHelper


class RestHandler:
    def __init__(self, logger: logging.Logger, trend_operator: TrendOperator):
        self._logger = logger
        self.trend_operator = trend_operator

    def get_trends(self, offset: int):
        try:
            return self.trend_operator.top_trends(offset)
        except Exception as e:
            self._logger.error(f"Failed to get trends: {e}")
            raise HTTPException(status_code=404, detail="Failed to get trends")


def make_router(app: FastAPI, wire_helper: WireHelper):
    rest_handler = RestHandler(
        logging.getLogger("lr-event"),
        TrendOperator(wire_helper.trend_manager())
    )

    @app.get("/trends")
    async def get_trends(o: int = 0):
        return rest_handler.get_trends(o)

Finally, add service/trend/main.py:

from fastapi import FastAPI

from .adapter.router import make_router
from .application import WireHelper
from .infrastructure.config import parseConfig

CONFIG_FILENAME = "service/trend/config.yml"

c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
app = FastAPI()
make_router(app, wire_helper)

Run the server:

 uvicorn service.trend.main:app --port 8001

Try to hit the URL http://localhost:8001/trends with curl.