» Python: Building Event-Driven Microservices with Kafka » 2. Producer: Web Service » 2.4 Create and Search Books

Create and Search Books

Let's add the APIs for creating books and searching for books, then we can input some test data for later use.

Create API

Move domain/model to web/domain/model.

Create web/domain/gateway/book_manager.py:

from abc import ABC, abstractmethod

from ..model import Book


class BookManager(ABC):
    @abstractmethod
    def create_book(self, b: Book) -> int:
        pass

Python does not have a built-in construct called "interface" like some other programming languages. However, Python allows you to achieve similar functionality using Abstract Base Classes (ABCs) from the abc module.

Again, we‘re using 4 Layers Architecture Pattern. It will pay off when your project grows bigger and bigger. Read more here.

Add web/domain/gateway/__init__.py:

from .book_manager import BookManager

Add mysql dependency:

pip3 install mysql-connector-python

It’s the official MySQL driver written in Python.

Update requirements.txt:

pip3 freeze > requirements.txt

Prepare the MySQL database:

  • Install MySQL on your machine and start it.
  • Create a database named lr_event_book.
CREATE DATABASE lr_event_book CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
  • Create the user test_user:
CREATE USER 'test_user'@'localhost' IDENTIFIED BY 'test_pass';
GRANT ALL PRIVILEGES ON lr_event_book.* TO 'test_user'@'localhost';
FLUSH PRIVILEGES;

Create web/infrastructure/database/mysql.py:

import mysql.connector

from ...domain.gateway import BookManager
from ...domain.model import Book
from ..config import DBConfig


class MySQLPersistence(BookManager):
    def __init__(self, c: DBConfig):
        self.conn = mysql.connector.connect(
            host=c.host,
            port=c.port,
            user=c.user,
            password=c.password,
            database=c.database,
            autocommit=True
        )
        self.cursor = self.conn.cursor(dictionary=True)
        self._create_table()

    def _create_table(self):
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS books (
            id INT AUTO_INCREMENT PRIMARY KEY,
            title VARCHAR(255) NOT NULL,
            author VARCHAR(255) NOT NULL,
            published_at DATE NOT NULL,
            description TEXT NOT NULL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        ''')

    def create_book(self, b: Book) -> int:
        self.cursor.execute('''
            INSERT INTO books (title, author, published_at, description) VALUES (%s, %s, %s, %s)
        ''', (b.title, b.author, b.published_at, b.description))
        return self.cursor.lastrowid or 0

Add web/infrastructure/database/__init__.py:

from .mysql import MySQLPersistence

Add yaml dependency:

pip3 install pyyaml

Update requirements.txt:

pip3 freeze > requirements.txt 

Create web/infrastructure/config/config.py:

from dataclasses import dataclass
import yaml


@dataclass
class DBConfig:
    host: str
    port: int
    user: str
    password: str
    database: str


@dataclass
class ApplicationConfig:
    port: int
    page_size: int
    templates_dir: str


@dataclass
class Config:
    app: ApplicationConfig
    db: DBConfig


def parseConfig(filename: str) -> Config:
    with open(filename, 'r') as f:
        data = yaml.safe_load(f)
        return Config(
            ApplicationConfig(**data['app']),
            DBConfig(**data['db'])
        )

Add web/infrastructure/config/__init__.py:

from .config import Config, DBConfig, parseConfig

Create web/config.yml:

app:
  port: 8000
  page_size: 5
  templates_dir: "web/adapter/templates/"
db:
  host: 127.0.0.1
  port: 3306
  user: "test_user"
  password: "test_pass"
  database: "lr_event_book"

Caution: Do not directly track your config.yml file with Git, as this could potentially leak sensitive data. If necessary, only track the template of the configuration file.
e.g.

app:
  port: 8000
db:
  host: ""
  ...

Create web/application/executor/book_operator.py:

from datetime import datetime

from .. import dto
from ...domain.model import Book
from ...domain.gateway import BookManager


class BookOperator():

    def __init__(self, book_manager: BookManager):
        self.book_manager = book_manager

    def create_book(self, b: dto.Book) -> Book:
        book = Book(id=0, created_at=datetime.now(), **b.__dict__)
        id = self.book_manager.create_book(book)
        book.id = id
        return book

Remember to add the __init__.py for each sub-package, which helps to re-export your symbols.

Create web/application/wire_helper.py:

from ..domain.gateway import BookManager
from ..infrastructure.config import Config
from ..infrastructure.database import MySQLPersistence


class WireHelper:
    def __init__(self, sqlPersistence: MySQLPersistence):
        self.sqlPersistence = sqlPersistence

    @classmethod
    def new(cls, c: Config):
        db = MySQLPersistence(c.db)
        return cls(db)

    def book_manager(self) -> BookManager:
        return self.sqlPersistence

Create web/application/dto/book.py:

from dataclasses import dataclass


@dataclass
class Book:
    title: str
    author: str
    published_at: str
    description: str

DTO stands for Data Transfer Object. Often, the internal representation of data within your application might differ from what you want to expose through your API. DTOs allow you to transform your internal data structures into a format that's more suitable for the API consumers.

Create web/application/__init__.py:

from .wire_helper import WireHelper
from . import dto

Create web/adapter/router.py:

import logging

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates

from ..application.executor import BookOperator
from ..application import WireHelper, dto
from ..domain.model import Book


class RestHandler:
    def __init__(self, logger: logging.Logger, book_operator: BookOperator):
        self._logger = logger
        self.book_operator = book_operator

    def create_book(self, b: dto.Book):
        try:
            return self.book_operator.create_book(b)
        except Exception as e:
            self._logger.error(f"Failed to create: {e}")
            raise HTTPException(status_code=400, detail="Failed to create")


def make_router(app: FastAPI, templates_dir: str, wire_helper: WireHelper):
    rest_handler = RestHandler(
        logging.getLogger("lr-event"),
        BookOperator(wire_helper.book_manager())
    )

    templates = Jinja2Templates(directory=templates_dir)

    @app.get("/", response_class=HTMLResponse)
    async def index_page(request: Request):
        return templates.TemplateResponse(
            name="index.html", context={"request": request, "title": "LiteRank Book Store"}
        )

    @app.post("/api/books", response_model=Book)
    async def create_book(b: dto.Book):
        return rest_handler.create_book(b)

Move templates to web/adapter/templates.

Move main.py to web/main.py and replace its content with the following code:

from fastapi import FastAPI

from .adapter.router import make_router
from .application import WireHelper
from .infrastructure.config import parseConfig

CONFIG_FILENAME = "web/config.yml"

c = parseConfig(CONFIG_FILENAME)
wire_helper = WireHelper.new(c)
app = FastAPI()
make_router(app, c.app.templates_dir, wire_helper)

Run the server and try it with curl:

uvicorn web.main:app --reload
curl --request POST \
  --url http://localhost:8000/api/books \
  --header 'Content-Type: application/json' \
  --data '{
	"title": "The Great Gatsby",
	"author": "F. Scott Fitzgerald",
	"published_at": "1925-04-10",
	"description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age."
}'

Example response:

{
  "id": 1,
  "title": "The Great Gatsby",
  "author": "F. Scott Fitzgerald",
  "published_at": "1925-04-10",
  "description": "A novel depicting the opulent lives of wealthy Long Island residents during the Jazz Age.",
  "created_at": "2024-04-10T01:20:52.505836"
}

Put in some data for test

curl -X POST -H "Content-Type: application/json" -d '{"title": "To Kill a Mockingbird", "author": "Harper Lee", "published_at": "1960-07-11", "description": "A novel set in the American South during the 1930s, dealing with themes of racial injustice and moral growth."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "1984", "author": "George Orwell", "published_at": "1949-06-08", "description": "A dystopian novel depicting a totalitarian regime, surveillance, and propaganda."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "Pride and Prejudice", "author": "Jane Austen", "published_at": "1813-01-28", "description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "The Catcher in the Rye", "author": "J.D. Salinger", "published_at": "1951-07-16", "description": "A novel narrated by a disaffected teenager, exploring themes of alienation and identity."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "The Lord of the Rings", "author": "J.R.R. Tolkien", "published_at": "1954-07-29", "description": "A high fantasy epic following the quest to destroy the One Ring and defeat the Dark Lord Sauron."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "Moby-Dick", "author": "Herman Melville", "published_at": "1851-10-18", "description": "A novel exploring themes of obsession, revenge, and the nature of good and evil."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "The Hobbit", "author": "J.R.R. Tolkien", "published_at": "1937-09-21", "description": "A fantasy novel set in Middle-earth, following the adventure of Bilbo Baggins and the quest for treasure."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "The Adventures of Huckleberry Finn", "author": "Mark Twain", "published_at": "1884-12-10", "description": "A novel depicting the journey of a young boy and an escaped slave along the Mississippi River."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "War and Peace", "author": "Leo Tolstoy", "published_at": "1869-01-01", "description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "Alice’s Adventures in Wonderland", "author": "Lewis Carroll", "published_at": "1865-11-26", "description": "A children’s novel featuring a young girl named Alice who falls into a fantastical world populated by peculiar creatures."}' http://localhost:8000/api/books
curl -X POST -H "Content-Type: application/json" -d '{"title": "The Odyssey", "author": "Homer", "published_at": "8th Century BC", "description": "An ancient Greek epic poem attributed to Homer, detailing the journey of Odysseus after the Trojan War."}' http://localhost:8000/api/books

Search API

Update web/domain/gateway/book_manager.py:

@@ -1,4 +1,5 @@
 from abc import ABC, abstractmethod
+from typing import List
 
 from ..model import Book
 
@@ -7,3 +8,7 @@ class BookManager(ABC):
     @abstractmethod
     def create_book(self, b: Book) -> int:
         pass
+
+    @abstractmethod
+    def get_books(self, offset: int, keyword: str) -> List[Book]:
+        pass

Update web/infrastructure/database/mysql.py:

@@ -1,3 +1,5 @@
+from typing import Any, List
+
 import mysql.connector
 
 from ...domain.gateway import BookManager
@@ -6,7 +8,8 @@ from ..config import DBConfig
 
 
 class MySQLPersistence(BookManager):
-    def __init__(self, c: DBConfig):
+    def __init__(self, c: DBConfig, page_size: int):
+        self.page_size = page_size
         self.conn = mysql.connector.connect(
             host=c.host,
             port=c.port,
@@ -35,3 +38,16 @@ class MySQLPersistence(BookManager):
             INSERT INTO books (title, author, published_at, description) VALUES (%s, %s, %s, %s)
         ''', (b.title, b.author, b.published_at, b.description))
         return self.cursor.lastrowid or 0
+
+    def get_books(self, offset: int, keyword: str) -> List[Book]:
+        query = "SELECT * FROM books"
+        params: List[Any] = []
+        if keyword:
+            query += " WHERE title LIKE %s OR author LIKE %s OR description LIKE %s"
+            params = [f"%{keyword}%", f"%{keyword}%", f"%{keyword}%"]
+        query += " LIMIT %s, %s"
+        params.extend([offset, self.page_size])
+
+        self.cursor.execute(query, tuple(params))
+        results: List[Any] = self.cursor.fetchall()
+        return [Book(**result) for result in results]

Update web/application/executor/book_operator.py:

@@ -1,4 +1,5 @@
 from datetime import datetime
+from typing import List
 
 from .. import dto
 from ...domain.model import Book
@@ -15,3 +16,6 @@ class BookOperator():
         id = self.book_manager.create_book(book)
         book.id = id
         return book
+
+    def get_books(self, offset: int, query: str) -> List[Book]:
+        return self.book_manager.get_books(offset, query)

Update web/adapter/router.py:

@@ -21,6 +21,14 @@ class RestHandler:
             self._logger.error(f"Failed to create: {e}")
             raise HTTPException(status_code=400, detail="Failed to create")
 
+    def get_books(self, offset: int, query: str):
+        try:
+            books = self.book_operator.get_books(offset, query)
+            return books
+        except Exception as e:
+            self._logger.error(f"Failed to get books: {e}")
+            raise HTTPException(status_code=404, detail="Failed to get books")
+
 
 def make_router(app: FastAPI, templates_dir: str, wire_helper: WireHelper):
     rest_handler = RestHandler(
@@ -31,11 +39,21 @@ def make_router(app: FastAPI, templates_dir: str, wire_helper: WireHelper):
     templates = Jinja2Templates(directory=templates_dir)
 
     @app.get("/", response_class=HTMLResponse)
-    async def index_page(request: Request):
+    async def index_page(request: Request, q: str = ""):
+        books = rest_handler.book_operator.get_books(0, q)
         return templates.TemplateResponse(
-            name="index.html", context={"request": request, "title": "LiteRank Book Store"}
+            name="index.html", context={
+                "request": request,
+                "title": "LiteRank Book Store",
+                "books": books,
+                "q": q,
+            }
         )
 
     @app.post("/api/books", response_model=Book)
     async def create_book(b: dto.Book):
         return rest_handler.create_book(b)
+
+    @app.get("/api/books")
+    async def get_books(o: int = 0, q: str = ""):
+        return rest_handler.get_books(o, q)

Run the server again and try it with curl:

curl --request GET --url 'http://localhost:8000/api/books?q=love'

Example response:

[
	{
		"id": 4,
		"title": "Pride and Prejudice",
		"author": "Jane Austen",
		"published_at": "1813-01-28",
		"description": "A classic novel exploring the themes of love, reputation, and social class in Georgian England.",
		"created_at": "2024-04-02T21:02:59.314+08:00"
	},
	{
		"id": 10,
		"title": "War and Peace",
		"author": "Leo Tolstoy",
		"published_at": "1869-01-01",
		"description": "A novel depicting the Napoleonic era in Russia, exploring themes of love, war, and historical determinism.",
		"created_at": "2024-04-02T21:02:59.42+08:00"
	}
]

Show books on index page

As shown in last section‘s changes:

@@ -31,11 +39,21 @@ def make_router(app: FastAPI, templates_dir: str, wire_helper: WireHelper):
     templates = Jinja2Templates(directory=templates_dir)
 
     @app.get("/", response_class=HTMLResponse)
-    async def index_page(request: Request):
+    async def index_page(request: Request, q: str = ""):
+        books = rest_handler.book_operator.get_books(0, q)
         return templates.TemplateResponse(
-            name="index.html", context={"request": request, "title": "LiteRank Book Store"}
+            name="index.html", context={
+                "request": request,
+                "title": "LiteRank Book Store",
+                "books": books,
+                "q": q,
+            }
         )

Update web/adapter/templates/index.html:

@@ -10,12 +10,29 @@
 </head>
 <body class="bg-gray-100 p-2">
     <div class="container mx-auto py-8">
-        <h1 class="text-4xl font-bold">{{ title }}</h1>
+        <h1 class="text-4xl font-bold"><a href="/">{{ title }}</a></h1>
 
         <!-- Search Bar Section -->
         <div class="mb-8">
             <h2 class="text-2xl font-bold mb-4 mt-6">Search</h2>
-            <input type="text" placeholder="Search for books..." class="w-full px-4 py-2 rounded-md border-gray-300 focus:outline-none focus:border-blue-500">
+            <form class="flex">
+                <input type="text" name="q" value="{{q}}" placeholder="Search for books..." class="flex-grow px-4 py-2 rounded-l-md border-gray-300 focus:outline-none focus:border-blue-500">
+                <button type="submit" class="px-4 py-2 bg-blue-500 text-white rounded-r-md">Search</button>
+            </form>
+        </div>
+
+        <!-- Books Section -->
+        <div class="mb-8">
+            <h2 class="text-2xl font-bold mb-4">{%if q%}Keyword: “{{q}}“{%else%}Books{%endif%}</h2>
+            <div class="grid grid-cols-4 gap-2">
+                {% for book in books %}
+                    <div class="bg-white p-4 rounded-md border-gray-300 shadow mt-2">
+                        <div><b>{{book.title}}</b></div>
+                        <div class="text-gray-500 text-sm">{{book.published_at}}</div>
+                        <div class="italic text-sm">{{book.author}}</div>
+                    </div>
+                {% endfor %}
+            </div>
         </div>
 
         <!-- Trends Section -->

Restart the server and refresh the index page in your browser, you‘ll see the book list.

Search books on index page

As shown in last section‘s changes, web/adapter/router.py:

-    async def index_page(request: Request):
+    async def index_page(request: Request, q: str = ""):
+        books = rest_handler.book_operator.get_books(0, q)
         return templates.TemplateResponse(
             name="index.html", context={
                 "request": request,
                 "title": "LiteRank Book Store",
                 "books": books,
+                "q": q,
             }
         )

As shown in previous sections‘ changes, web/dapter/templates/index.html:

@@ -15,12 +15,15 @@
         <!-- Search Bar Section -->
         <div class="mb-8">
             <h2 class="text-2xl font-bold mb-4 mt-6">Search</h2>
-            <input type="text" placeholder="Search for books..." class="w-full px-4 py-2 rounded-md border-gray-300 focus:outline-none focus:border-blue-500">
+            <form class="flex">
+                <input type="text" name="q" value="{{q}}" placeholder="Search for books..." class="flex-grow px-4 py-2 rounded-l-md border-gray-300 focus:outline-none focus:border-blue-500">
+                <button type="submit" class="px-4 py-2 bg-blue-500 text-white rounded-r-md">Search</button>
+            </form>
         </div>
 
         <!-- Books Section -->
         <div class="mb-8">
-            <h2 class="text-2xl font-bold mb-4">Books</h2>
+            <h2 class="text-2xl font-bold mb-4">{%if q%}Keyword: “{{q}}“{%else%}Books{%endif%}</h2>
             <div class="grid grid-cols-4 gap-2">
                 {{range .books}}
                     <div class="bg-white p-4 rounded-md border-gray-300 shadow mt-2">

Restart the server and refresh the index page in your browser.

The search results look like this:

search results

PrevNext