» Python: Build a REST API with Flask » 2. Development » 2.12 Authentication

Authentication

Authentication in an API server is typically needed when you want to control access to certain resources or functionalities.

In other words, if you want to restrict access to certain API endpoints or data to users with special roles (e.g., administrator, moderator, or premium user), authentication is required.

Traditional User Flow

Update code

Add User domain entity in domain/model/user.py:

from dataclasses import dataclass
from datetime import datetime


@dataclass
class User:
    id: int
    email: str
    password: str
    salt: str
    is_admin: bool
    created_at: datetime
    updated_at: datetime

Declare its business capabilities in domain/gateway/user_manager.py:

from abc import ABC, abstractmethod
from typing import Optional

from ..model import User


class UserManager(ABC):
    @abstractmethod
    def create_user(self, u: User) -> int:
        pass

    @abstractmethod
    def get_user_by_email(self, email: str) -> Optional[User]:
        pass

Add implementations in infrastructure/database/mysql.py:

@@ -3,11 +3,11 @@ from typing import Any, List, Optional
 
 from books.infrastructure.config import DBConfig
 
-from ...domain.gateway import BookManager
-from ...domain.model import Book
+from ...domain.gateway import BookManager, UserManager
+from ...domain.model import Book, User
 
 
-class MySQLPersistence(BookManager):
+class MySQLPersistence(BookManager, UserManager):
     def __init__(self, c: DBConfig, page_size: int):
         self.page_size = page_size
         self.conn = mysql.connector.connect(
@@ -73,3 +73,18 @@ class MySQLPersistence(BookManager):
         self.cursor.execute(query, tuple(params))
         results: List[Any] = self.cursor.fetchall()
         return [Book(**result) for result in results]
+
+    def create_user(self, u: User) -> int:
+        self.cursor.execute('''
+            INSERT INTO users (email, password, salt, is_admin, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s)
+        ''', (u.email, u.password, u.salt, u.is_admin, u.created_at, u.updated_at))
+        return self.cursor.lastrowid or 0
+
+    def get_user_by_email(self, email: str) -> Optional[User]:
+        self.cursor.execute('''
+            SELECT * FROM users WHERE email=%s
+        ''', (email,))
+        result: Any = self.cursor.fetchone()
+        if result is None:
+            return None
+        return User(**result)

Do the sign-up and sign-in preparation work in application/executor/user_operator.py:

from datetime import datetime
import hashlib
import random
import time
from typing import Optional

from books.application.dto import UserCredential, User
from books.domain.gateway import UserManager
from books.domain.model import User as DomainUser

SALT_LEN = 4
ERR_EMPTY_EMAIL = "empty email"
ERR_EMPTY_PASSWORD = "empty password"


class UserOperator:
    def __init__(self, user_manager: UserManager):
        self.user_manager = user_manager

    def create_user(self, uc: UserCredential) -> Optional[User]:
        if not uc.email:
            raise ValueError(ERR_EMPTY_EMAIL)
        if not uc.password:
            raise ValueError(ERR_EMPTY_PASSWORD)

        salt = random_string(SALT_LEN)
        password_hash = sha1_hash(uc.password + salt)

        now = datetime.now()
        user = DomainUser(id=0, email=uc.email,
                          password=password_hash, salt=salt, is_admin=False,
                          created_at=now, updated_at=now)
        uid = self.user_manager.create_user(user)
        return User(id=uid, email=uc.email)

    def sign_in(self, email: str, password: str) -> Optional[User]:
        if not email:
            raise ValueError(ERR_EMPTY_EMAIL)
        if not password:
            raise ValueError(ERR_EMPTY_PASSWORD)

        user = self.user_manager.get_user_by_email(email)
        if not user:
            return None
        password_hash = sha1_hash(password + user.salt)
        if user.password != password_hash:
            raise ValueError("wrong password")
        return User(id=user.id, email=user.email)


def random_string(length: int) -> str:
    source = random.Random(time.time())
    charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
    result = ''.join(source.choice(charset) for _ in range(length))
    return result


def sha1_hash(input_str: str) -> str:
    hash_object = hashlib.sha1(input_str.encode())
    hash_hex = hash_object.hexdigest()
    return hash_hex

func random_string and sha1_hash are helper functions for salt generation and password hashing.

Why do you need a salt?

Without a salt, an attacker could precompute hashes for common passwords and store them in a table (known as a rainbow table). If the database is compromised, the attacker can then compare the hashed passwords against the precomputed hashes to quickly identify passwords. Adding a salt to each password ensures that even if two users have the same password, their hashed passwords will be different due to the unique salt.

Even if a user chooses a weak password, such as a common dictionary word, the salt ensures that the resulting hash is unique. Without a salt, all instances of the same password would hash to the same value, making them vulnerable to attacks.

Add DTOs in application/dto/user.py:

from dataclasses import dataclass


@dataclass
class UserCredential:
    email: str
    password: str


@dataclass
class User:
    id: int
    email: str

DTO stands for Data Transfer Object.

Often, the internal representation of data within your application might differ from what you want to expose through your API. DTOs allow you to transform your internal data structures into a format that's more suitable for the API consumers. This helps in decoupling the internal representation from the external one, allowing for better flexibility and abstraction.

Wire in new dependencies in application/wire_helper.py:

@@ -1,4 +1,4 @@
-from books.domain.gateway import BookManager, ReviewManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager
 from books.infrastructure.cache import RedisCache, CacheHelper
 from ..infrastructure.config import Config
 from ..infrastructure.database import MySQLPersistence, MongoPersistence
@@ -25,3 +25,6 @@ class WireHelper:
 
     def cache_helper(self) -> CacheHelper:
         return self.kvStore
+
+    def user_manager(self) -> UserManager:
+        return self.sqlPersistence

Finnally, add routes in adapter/router.py:

@@ -1,17 +1,19 @@
 import logging
 from flask import Flask, request, jsonify
 
-from ..application.executor import BookOperator, ReviewOperator
+from books.application.dto import UserCredential
+from ..application.executor import BookOperator, ReviewOperator, UserOperator
 from ..application import WireHelper
 from ..domain.model import Book, Review
 from .util import dataclass_from_dict
 
 
 class RestHandler:
-    def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator):
+    def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator, user_operator: UserOperator):
         self._logger = logger
         self.book_operator = book_operator
         self.review_operator = review_operator
+        self.user_operator = user_operator
 
     def get_books(self):
         try:
@@ -104,6 +106,26 @@ class RestHandler:
             self._logger.error(f"Failed to delete: {e}")
             return jsonify({"error": "Failed to delete"}), 404
 
+    def user_sign_up(self):
+        try:
+            u = dataclass_from_dict(UserCredential, request.json)
+            user = self.user_operator.create_user(u)
+            return jsonify(user), 201
+        except Exception as e:
+            self._logger.error(f"Failed to create: {e}")
+            return jsonify({"error": "Failed to create"}), 400
+
+    def user_sign_in(self):
+        try:
+            u = dataclass_from_dict(UserCredential, request.json)
+            user = self.user_operator.sign_in(u.email, u.password)
+            if user is None:
+                return jsonify({"error": f"No user with email {u.email}"}), 404
+            return jsonify(user), 200
+        except Exception as e:
+            self._logger.error(f"Failed to sign in: {e}")
+            return jsonify({"error": f"Failed to sign in: {e}"}), 400
+
 
 def health():
     return jsonify({"status": "ok"})
@@ -115,7 +137,8 @@ def make_router(app: Flask, wire_helper: WireHelper):
         BookOperator(
             wire_helper.book_manager(),
             wire_helper.cache_helper()),
-        ReviewOperator(wire_helper.review_manager()))
+        ReviewOperator(wire_helper.review_manager()),
+        UserOperator(wire_helper.user_manager()))
     app.add_url_rule('/', view_func=health)
     app.add_url_rule('/books', view_func=rest_handler.get_books)
     app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
@@ -134,3 +157,7 @@ def make_router(app: Flask, wire_helper: WireHelper):
                      methods=['PUT'])
     app.add_url_rule('/reviews/<id>', view_func=rest_handler.delete_review,
                      methods=['DELETE'])
+    app.add_url_rule('/users', view_func=rest_handler.user_sign_up,
+                     methods=['POST'])
+    app.add_url_rule('/users/sign-in', view_func=rest_handler.user_sign_in,
+                     methods=['POST'])

Try with curl

User sign up

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users

Result:

{
  "email": "test-user@example.com",
  "id": 2
}

If you check out the records in the database, you will find something like this:

+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
| id | email                 | password                                 | salt | is_admin | created_at              | updated_at              |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
|  2 | test-user@example.com | acced10162fdabc812d830f2f1edf74a63b57a38 | 3qV4 |        0 | 2024-03-08 15:49:35.989 | 2024-03-08 15:49:35.989 |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+

User sign in successfully

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in

Result:

{
  "email": "test-user@example.com",
  "id": 2
}

User sign in with an incorrect password

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "wrong-pass"}' http://localhost:5000/users/sign-in

Result:

{
  "error": "Failed to sign in: wrong password"
}

Note:
It's good practice to use a uniform JSON response format for both successful and failed responses.
e.g.

{"code":0, "message":"ok", "data": {"id":2,"email":"test-user@example.com"}}
{"code":1001, "message":"wrong password", "data": null}

Use JWT (JSON Web Token)

JWT, or JSON Web Token, is a compact, URL-safe means of representing claims to be transferred between two parties. It's often used for authentication and authorization in web applications and APIs.

Install JWT dependency

pip3 install PyJWT

Update requirements.txt:

pip3 freeze > requirements.txt 

The file should be something to this at this moment:

async-timeout==4.0.3
blinker==1.7.0
click==8.1.7
dnspython==2.6.1
Flask==3.0.2
importlib-metadata==7.0.1
itsdangerous==2.1.2
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
PyJWT==2.8.0
pymongo==4.6.2
PyYAML==6.0.1
redis==5.0.2
Werkzeug==3.0.1
zipp==3.17.0

Update code

Add UserPermission enums in domain/model/user.py:

@@ -1,5 +1,6 @@
 from dataclasses import dataclass
 from datetime import datetime
+from enum import IntEnum
 
 
 @dataclass
@@ -11,3 +12,10 @@ class User:
     is_admin: bool
     created_at: datetime
     updated_at: datetime
+
+
+class UserPermission(IntEnum):
+    PermNone = 0
+    PermUser = 1
+    PermAuthor = 2
+    PermAdmin = 3

Declare its business capabilities in domain/gateway/user_manager.py:

@@ -1,7 +1,7 @@
 from abc import ABC, abstractmethod
 from typing import Optional
 
-from ..model import User
+from ..model import User, UserPermission
 
 
 class UserManager(ABC):
@@ -12,3 +12,13 @@ class UserManager(ABC):
     @abstractmethod
     def get_user_by_email(self, email: str) -> Optional[User]:
         pass
+
+
+class PermissionManager(ABC):
+    @abstractmethod
+    def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
+        pass
+
+    @abstractmethod
+    def has_permission(self, token: str, perm: UserPermission) -> bool:
+        pass

Implement these 2 methods in infrastructure/token/jwt.py:

import jwt
import time
from typing import Optional

from books.domain.gateway import PermissionManager
from ...domain.model import UserPermission

ERR_INVALID_TOKEN = "invalid token"
ERR_FAIL_TO_DECODE = "failed to decode token"


class TokenKeeper(PermissionManager):
    def __init__(self, secret_key: str, expire_in_hours: int):
        self.secret_key = secret_key.encode()
        self.expire_hours = expire_in_hours

    def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
        expiration_time = int(time.time()) + self.expire_hours * 3600
        claims = {
            "user_id": user_id,
            "user_name": email,
            "permission": perm,
            "exp": expiration_time
        }
        token_result = jwt.encode(claims, self.secret_key, algorithm='HS256')
        return token_result

    def extract_token(self, token_result: str) -> Optional[dict]:
        try:
            claims = jwt.decode(
                token_result, self.secret_key, algorithms=['HS256'])
            return claims
        except jwt.ExpiredSignatureError:
            raise ValueError(ERR_INVALID_TOKEN)
        except jwt.DecodeError:
            raise ValueError(ERR_FAIL_TO_DECODE)

    def has_permission(self, token_result: str, perm: UserPermission) -> bool:
        claims = self.extract_token(token_result)
        if not claims:
            return False
        return claims.get("permission", UserPermission.PermNone) >= perm

Add config items for Tokens in infrastructure/config/config.py:

@@ -26,6 +26,8 @@ class CacheConfig:
 class ApplicationConfig:
     port: int
     page_size: int
+    token_secret: str
+    token_hours: int
 
 
 @dataclass

Put in setting values in config.yml:

@@ -1,6 +1,8 @@
 app:
   port: 5000
   page_size: 5
+  token_secret: "I_Love_LiteRank"
+  token_hours: 24
 db:
   file_name: "test.db"
   host: "127.0.0.1"

Generate tokens and return them in application/executor/user_operator.py:

@@ -4,9 +4,9 @@ import random
 import time
 from typing import Optional
 
-from books.application.dto import UserCredential, User
-from books.domain.gateway import UserManager
-from books.domain.model import User as DomainUser
+from books.application.dto import UserCredential, User, UserToken
+from books.domain.gateway import UserManager, PermissionManager
+from books.domain.model import User as DomainUser, UserPermission
 
 SALT_LEN = 4
 ERR_EMPTY_EMAIL = "empty email"
@@ -14,8 +14,9 @@ ERR_EMPTY_PASSWORD = "empty password"
 
 
 class UserOperator:
-    def __init__(self, user_manager: UserManager):
+    def __init__(self, user_manager: UserManager, perm_manager: PermissionManager):
         self.user_manager = user_manager
+        self.perm_manager = perm_manager
 
     def create_user(self, uc: UserCredential) -> Optional[User]:
         if not uc.email:
@@ -33,7 +34,7 @@ class UserOperator:
         uid = self.user_manager.create_user(user)
         return User(id=uid, email=uc.email)
 
-    def sign_in(self, email: str, password: str) -> Optional[User]:
+    def sign_in(self, email: str, password: str) -> Optional[UserToken]:
         if not email:
             raise ValueError(ERR_EMPTY_EMAIL)
         if not password:
@@ -45,7 +46,12 @@ class UserOperator:
         password_hash = sha1_hash(password + user.salt)
         if user.password != password_hash:
             raise ValueError("wrong password")
-        return User(id=user.id, email=user.email)
+        token = self.perm_manager.generate_token(
+            user.id, user.email,
+            UserPermission.PermAdmin if user.is_admin else UserPermission.PermUser)
+        return UserToken(
+            User(id=user.id, email=user.email),
+            token)
 
 
 def random_string(length: int) -> str:

Add DTOs in application/dto/user.py:

@@ -11,3 +11,9 @@ class UserCredential:
 class User:
     id: int
     email: str
+
+
+@dataclass
+class UserToken:
+    user: User
+    token: str

Tune application/wire_helper.py:

@@ -1,25 +1,35 @@
-from books.domain.gateway import BookManager, ReviewManager, UserManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager, PermissionManager
 from books.infrastructure.cache import RedisCache, CacheHelper
+from books.infrastructure.token import TokenKeeper
 from ..infrastructure.config import Config
 from ..infrastructure.database import MySQLPersistence, MongoPersistence
 
 
 class WireHelper:
-    def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence, kvStore: RedisCache):
+    def __init__(self,
+                 sqlPersistence: MySQLPersistence,
+                 noSQLPersistence: MongoPersistence,
+                 kvStore: RedisCache,
+                 tokenKeeper: TokenKeeper):
         self.sqlPersistence = sqlPersistence
         self.noSQLPersistence = noSQLPersistence
         self.kvStore = kvStore
+        self.tokenKeeper = tokenKeeper
 
     @classmethod
     def new(cls, c: Config):
         db = MySQLPersistence(c.db, c.app.page_size)
         mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
         kv = RedisCache(c.cache)
-        return cls(db, mdb, kv)
+        tk = TokenKeeper(c.app.token_secret, c.app.token_hours)
+        return cls(db, mdb, kv, tk)
 
     def book_manager(self) -> BookManager:
         return self.sqlPersistence
 
+    def perm_manager(self) -> PermissionManager:
+        return self.tokenKeeper
+
     def review_manager(self) -> ReviewManager:
         return self.noSQLPersistence

Add perm_check middleware into routes for Create/Update/Delete operations in adapter/router.py:

@@ -1,7 +1,9 @@
 import logging
 from flask import Flask, request, jsonify
+from books.adapter.middleware import perm_check
 
 from books.application.dto import UserCredential
+from books.domain.model.user import UserPermission
 from ..application.executor import BookOperator, ReviewOperator, UserOperator
 from ..application import WireHelper
 from ..domain.model import Book, Review
@@ -138,15 +140,15 @@ def make_router(app: Flask, wire_helper: WireHelper):
             wire_helper.book_manager(),
             wire_helper.cache_helper()),
         ReviewOperator(wire_helper.review_manager()),
-        UserOperator(wire_helper.user_manager()))
+        UserOperator(wire_helper.user_manager(), wire_helper.perm_manager()))
     app.add_url_rule('/', view_func=health)
     app.add_url_rule('/books', view_func=rest_handler.get_books)
     app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
-    app.add_url_rule('/books', view_func=rest_handler.create_book,
+    app.add_url_rule('/books', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.create_book),
                      methods=['POST'])
-    app.add_url_rule('/books/<int:id>', view_func=rest_handler.update_book,
+    app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.update_book),
                      methods=['PUT'])
-    app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
+    app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.delete_book),
                      methods=['DELETE'])
     app.add_url_rule('/books/<int:book_id>/reviews',
                      view_func=rest_handler.get_reviews_of_book)

perm_check is implemented in adapter/middleware.py:

from functools import wraps
from flask import request, jsonify
from books.domain.gateway import PermissionManager
from books.domain.model.user import UserPermission

TOKEN_PREFIX = "Bearer "


def perm_check(perm_manager: PermissionManager, allow_perm: UserPermission):
    def middleware(view_func):
        @wraps(view_func)
        def wrapped_view(*args, **kwargs):
            auth_header = request.headers.get("Authorization")
            if not auth_header:
                return jsonify({"error": "Token is required"}), 401

            token = auth_header.replace(TOKEN_PREFIX, "", 1)
            try:
                has_perm = perm_manager.has_permission(token, allow_perm)
            except Exception as e:
                return jsonify({"error": f"{e}"}), 401
            if not has_perm:
                return jsonify({"error": "Unauthorized"}), 401
            return view_func(*args, **kwargs)
        return wrapped_view
    return middleware

Those are all the changes you need to utilize JWT in your api server. Let's try it out again!

Try with curl

User sign in and get a Token

curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in

Result:

{
  "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk",
  "user": {
    "email": "test-user@example.com",
    "id": 2
  }
}

Put the token into Debugger on https://jwt.io/, then you can check the payload and verify the signature of your token.

JWT Debugger

Create a new book with a valid and proper Token

curl -X POST \
  http://localhost:5000/books \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk' \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

Successful result:

{
  "author": "John Doe",
  "created_at": null,
  "description": "A sample book description",
  "id": 18,
  "isbn": "1234567890",
  "published_at": "2003-01-01",
  "title": "Test Book",
  "total_pages": 100,
  "updated_at": null
}

Create a new book with a Token, but it doesn't have proper permissions

curl -X POST \
  http://localhost:5000/books \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2LCJ1c2VyX25hbWUiOiJ0ZXN0LW5vZGVAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjoxLCJleHAiOjE3MDk5ODk5MjN9.KhNCNRFYVAe67pyrFPH_bCFMbn83y55AsCtlDa3v0Sk' \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

Result:

{
  "error": "Unauthorized"
}

Create a new book without a Token

curl -X POST \
  http://localhost:5000/books \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

Result:

{
  "error": "Token is required"
}

Create a new book with a fake Token

curl -X POST \
  http://localhost:5000/books \
  -H 'Authorization: Bearer FAKE_TOKEN' \
  -H 'Content-Type: application/json' \
  -d '{
    "title": "Test Book",
    "author": "John Doe",
    "published_at": "2003-01-01",
    "description": "A sample book description",
    "isbn": "1234567890",
    "total_pages": 100
}'

Result:

{
  "error": "failed to decode token"
}

Perfect! 🎉

Now, some of your api endpoints are protected by the authentication mechanism.

PrevNext