Authentication
Authentication in an API server is typically needed when you want to control access to certain resources or functionalities.
In other words, if you want to restrict access to certain API endpoints or data to users with special roles (e.g., administrator, moderator, or premium user), authentication is required.
Traditional User Flow
Update code
Add User
domain entity in domain/model/user.py:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
id: int
email: str
password: str
salt: str
is_admin: bool
created_at: datetime
updated_at: datetime
Declare its business capabilities in domain/gateway/user_manager.py:
from abc import ABC, abstractmethod
from typing import Optional
from ..model import User
class UserManager(ABC):
@abstractmethod
def create_user(self, u: User) -> int:
pass
@abstractmethod
def get_user_by_email(self, email: str) -> Optional[User]:
pass
Add implementations in infrastructure/database/mysql.py:
@@ -3,11 +3,11 @@ from typing import Any, List, Optional
from books.infrastructure.config import DBConfig
-from ...domain.gateway import BookManager
-from ...domain.model import Book
+from ...domain.gateway import BookManager, UserManager
+from ...domain.model import Book, User
-class MySQLPersistence(BookManager):
+class MySQLPersistence(BookManager, UserManager):
def __init__(self, c: DBConfig, page_size: int):
self.page_size = page_size
self.conn = mysql.connector.connect(
@@ -73,3 +73,18 @@ class MySQLPersistence(BookManager):
self.cursor.execute(query, tuple(params))
results: List[Any] = self.cursor.fetchall()
return [Book(**result) for result in results]
+
+ def create_user(self, u: User) -> int:
+ self.cursor.execute('''
+ INSERT INTO users (email, password, salt, is_admin, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s)
+ ''', (u.email, u.password, u.salt, u.is_admin, u.created_at, u.updated_at))
+ return self.cursor.lastrowid or 0
+
+ def get_user_by_email(self, email: str) -> Optional[User]:
+ self.cursor.execute('''
+ SELECT * FROM users WHERE email=%s
+ ''', (email,))
+ result: Any = self.cursor.fetchone()
+ if result is None:
+ return None
+ return User(**result)
Do the sign-up and sign-in preparation work in application/executor/user_operator.py:
from datetime import datetime
import hashlib
import random
import time
from typing import Optional
from books.application.dto import UserCredential, User
from books.domain.gateway import UserManager
from books.domain.model import User as DomainUser
SALT_LEN = 4
ERR_EMPTY_EMAIL = "empty email"
ERR_EMPTY_PASSWORD = "empty password"
class UserOperator:
def __init__(self, user_manager: UserManager):
self.user_manager = user_manager
def create_user(self, uc: UserCredential) -> Optional[User]:
if not uc.email:
raise ValueError(ERR_EMPTY_EMAIL)
if not uc.password:
raise ValueError(ERR_EMPTY_PASSWORD)
salt = random_string(SALT_LEN)
password_hash = sha1_hash(uc.password + salt)
now = datetime.now()
user = DomainUser(id=0, email=uc.email,
password=password_hash, salt=salt, is_admin=False,
created_at=now, updated_at=now)
uid = self.user_manager.create_user(user)
return User(id=uid, email=uc.email)
def sign_in(self, email: str, password: str) -> Optional[User]:
if not email:
raise ValueError(ERR_EMPTY_EMAIL)
if not password:
raise ValueError(ERR_EMPTY_PASSWORD)
user = self.user_manager.get_user_by_email(email)
if not user:
return None
password_hash = sha1_hash(password + user.salt)
if user.password != password_hash:
raise ValueError("wrong password")
return User(id=user.id, email=user.email)
def random_string(length: int) -> str:
source = random.Random(time.time())
charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
result = ''.join(source.choice(charset) for _ in range(length))
return result
def sha1_hash(input_str: str) -> str:
hash_object = hashlib.sha1(input_str.encode())
hash_hex = hash_object.hexdigest()
return hash_hex
func random_string
and sha1_hash
are helper functions for salt generation and password hashing.
Why do you need a salt?
Without a salt, an attacker could precompute hashes for common passwords and store them in a table (known as a rainbow table). If the database is compromised, the attacker can then compare the hashed passwords against the precomputed hashes to quickly identify passwords. Adding a salt to each password ensures that even if two users have the same password, their hashed passwords will be different due to the unique salt.
Even if a user chooses a weak password, such as a common dictionary word, the salt ensures that the resulting hash is unique. Without a salt, all instances of the same password would hash to the same value, making them vulnerable to attacks.
Add DTOs in application/dto/user.py:
from dataclasses import dataclass
@dataclass
class UserCredential:
email: str
password: str
@dataclass
class User:
id: int
email: str
DTO stands for Data Transfer Object.
Often, the internal representation of data within your application might differ from what you want to expose through your API. DTOs allow you to transform your internal data structures into a format that's more suitable for the API consumers. This helps in decoupling the internal representation from the external one, allowing for better flexibility and abstraction.
Wire in new dependencies in application/wire_helper.py:
@@ -1,4 +1,4 @@
-from books.domain.gateway import BookManager, ReviewManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager
from books.infrastructure.cache import RedisCache, CacheHelper
from ..infrastructure.config import Config
from ..infrastructure.database import MySQLPersistence, MongoPersistence
@@ -25,3 +25,6 @@ class WireHelper:
def cache_helper(self) -> CacheHelper:
return self.kvStore
+
+ def user_manager(self) -> UserManager:
+ return self.sqlPersistence
Finnally, add routes in adapter/router.py:
@@ -1,17 +1,19 @@
import logging
from flask import Flask, request, jsonify
-from ..application.executor import BookOperator, ReviewOperator
+from books.application.dto import UserCredential
+from ..application.executor import BookOperator, ReviewOperator, UserOperator
from ..application import WireHelper
from ..domain.model import Book, Review
from .util import dataclass_from_dict
class RestHandler:
- def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator):
+ def __init__(self, logger: logging.Logger, book_operator: BookOperator, review_operator: ReviewOperator, user_operator: UserOperator):
self._logger = logger
self.book_operator = book_operator
self.review_operator = review_operator
+ self.user_operator = user_operator
def get_books(self):
try:
@@ -104,6 +106,26 @@ class RestHandler:
self._logger.error(f"Failed to delete: {e}")
return jsonify({"error": "Failed to delete"}), 404
+ def user_sign_up(self):
+ try:
+ u = dataclass_from_dict(UserCredential, request.json)
+ user = self.user_operator.create_user(u)
+ return jsonify(user), 201
+ except Exception as e:
+ self._logger.error(f"Failed to create: {e}")
+ return jsonify({"error": "Failed to create"}), 400
+
+ def user_sign_in(self):
+ try:
+ u = dataclass_from_dict(UserCredential, request.json)
+ user = self.user_operator.sign_in(u.email, u.password)
+ if user is None:
+ return jsonify({"error": f"No user with email {u.email}"}), 404
+ return jsonify(user), 200
+ except Exception as e:
+ self._logger.error(f"Failed to sign in: {e}")
+ return jsonify({"error": f"Failed to sign in: {e}"}), 400
+
def health():
return jsonify({"status": "ok"})
@@ -115,7 +137,8 @@ def make_router(app: Flask, wire_helper: WireHelper):
BookOperator(
wire_helper.book_manager(),
wire_helper.cache_helper()),
- ReviewOperator(wire_helper.review_manager()))
+ ReviewOperator(wire_helper.review_manager()),
+ UserOperator(wire_helper.user_manager()))
app.add_url_rule('/', view_func=health)
app.add_url_rule('/books', view_func=rest_handler.get_books)
app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
@@ -134,3 +157,7 @@ def make_router(app: Flask, wire_helper: WireHelper):
methods=['PUT'])
app.add_url_rule('/reviews/<id>', view_func=rest_handler.delete_review,
methods=['DELETE'])
+ app.add_url_rule('/users', view_func=rest_handler.user_sign_up,
+ methods=['POST'])
+ app.add_url_rule('/users/sign-in', view_func=rest_handler.user_sign_in,
+ methods=['POST'])
Try with curl
User sign up
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users
Result:
{
"email": "test-user@example.com",
"id": 2
}
If you check out the records in the database, you will find something like this:
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
| id | email | password | salt | is_admin | created_at | updated_at |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
| 2 | test-user@example.com | acced10162fdabc812d830f2f1edf74a63b57a38 | 3qV4 | 0 | 2024-03-08 15:49:35.989 | 2024-03-08 15:49:35.989 |
+----+-----------------------+------------------------------------------+------+----------+-------------------------+-------------------------+
User sign in successfully
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in
Result:
{
"email": "test-user@example.com",
"id": 2
}
User sign in with an incorrect password
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "wrong-pass"}' http://localhost:5000/users/sign-in
Result:
{
"error": "Failed to sign in: wrong password"
}
Note:
It's good practice to use a uniform JSON response format for both successful and failed responses.
e.g.{"code":0, "message":"ok", "data": {"id":2,"email":"test-user@example.com"}} {"code":1001, "message":"wrong password", "data": null}
Use JWT (JSON Web Token)
JWT, or JSON Web Token, is a compact, URL-safe means of representing claims to be transferred between two parties. It's often used for authentication and authorization in web applications and APIs.
Install JWT dependency
pip3 install PyJWT
Update requirements.txt:
pip3 freeze > requirements.txt
The file should be something to this at this moment:
async-timeout==4.0.3
blinker==1.7.0
click==8.1.7
dnspython==2.6.1
Flask==3.0.2
importlib-metadata==7.0.1
itsdangerous==2.1.2
Jinja2==3.1.3
MarkupSafe==2.1.5
mysql-connector-python==8.3.0
PyJWT==2.8.0
pymongo==4.6.2
PyYAML==6.0.1
redis==5.0.2
Werkzeug==3.0.1
zipp==3.17.0
Update code
Add UserPermission
enums in domain/model/user.py:
@@ -1,5 +1,6 @@
from dataclasses import dataclass
from datetime import datetime
+from enum import IntEnum
@dataclass
@@ -11,3 +12,10 @@ class User:
is_admin: bool
created_at: datetime
updated_at: datetime
+
+
+class UserPermission(IntEnum):
+ PermNone = 0
+ PermUser = 1
+ PermAuthor = 2
+ PermAdmin = 3
Declare its business capabilities in domain/gateway/user_manager.py:
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Optional
-from ..model import User
+from ..model import User, UserPermission
class UserManager(ABC):
@@ -12,3 +12,13 @@ class UserManager(ABC):
@abstractmethod
def get_user_by_email(self, email: str) -> Optional[User]:
pass
+
+
+class PermissionManager(ABC):
+ @abstractmethod
+ def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
+ pass
+
+ @abstractmethod
+ def has_permission(self, token: str, perm: UserPermission) -> bool:
+ pass
Implement these 2 methods in infrastructure/token/jwt.py:
import jwt
import time
from typing import Optional
from books.domain.gateway import PermissionManager
from ...domain.model import UserPermission
ERR_INVALID_TOKEN = "invalid token"
ERR_FAIL_TO_DECODE = "failed to decode token"
class TokenKeeper(PermissionManager):
def __init__(self, secret_key: str, expire_in_hours: int):
self.secret_key = secret_key.encode()
self.expire_hours = expire_in_hours
def generate_token(self, user_id: int, email: str, perm: UserPermission) -> str:
expiration_time = int(time.time()) + self.expire_hours * 3600
claims = {
"user_id": user_id,
"user_name": email,
"permission": perm,
"exp": expiration_time
}
token_result = jwt.encode(claims, self.secret_key, algorithm='HS256')
return token_result
def extract_token(self, token_result: str) -> Optional[dict]:
try:
claims = jwt.decode(
token_result, self.secret_key, algorithms=['HS256'])
return claims
except jwt.ExpiredSignatureError:
raise ValueError(ERR_INVALID_TOKEN)
except jwt.DecodeError:
raise ValueError(ERR_FAIL_TO_DECODE)
def has_permission(self, token_result: str, perm: UserPermission) -> bool:
claims = self.extract_token(token_result)
if not claims:
return False
return claims.get("permission", UserPermission.PermNone) >= perm
Add config items for Tokens in infrastructure/config/config.py:
@@ -26,6 +26,8 @@ class CacheConfig:
class ApplicationConfig:
port: int
page_size: int
+ token_secret: str
+ token_hours: int
@dataclass
Put in setting values in config.yml:
@@ -1,6 +1,8 @@
app:
port: 5000
page_size: 5
+ token_secret: "I_Love_LiteRank"
+ token_hours: 24
db:
file_name: "test.db"
host: "127.0.0.1"
Generate tokens and return them in application/executor/user_operator.py:
@@ -4,9 +4,9 @@ import random
import time
from typing import Optional
-from books.application.dto import UserCredential, User
-from books.domain.gateway import UserManager
-from books.domain.model import User as DomainUser
+from books.application.dto import UserCredential, User, UserToken
+from books.domain.gateway import UserManager, PermissionManager
+from books.domain.model import User as DomainUser, UserPermission
SALT_LEN = 4
ERR_EMPTY_EMAIL = "empty email"
@@ -14,8 +14,9 @@ ERR_EMPTY_PASSWORD = "empty password"
class UserOperator:
- def __init__(self, user_manager: UserManager):
+ def __init__(self, user_manager: UserManager, perm_manager: PermissionManager):
self.user_manager = user_manager
+ self.perm_manager = perm_manager
def create_user(self, uc: UserCredential) -> Optional[User]:
if not uc.email:
@@ -33,7 +34,7 @@ class UserOperator:
uid = self.user_manager.create_user(user)
return User(id=uid, email=uc.email)
- def sign_in(self, email: str, password: str) -> Optional[User]:
+ def sign_in(self, email: str, password: str) -> Optional[UserToken]:
if not email:
raise ValueError(ERR_EMPTY_EMAIL)
if not password:
@@ -45,7 +46,12 @@ class UserOperator:
password_hash = sha1_hash(password + user.salt)
if user.password != password_hash:
raise ValueError("wrong password")
- return User(id=user.id, email=user.email)
+ token = self.perm_manager.generate_token(
+ user.id, user.email,
+ UserPermission.PermAdmin if user.is_admin else UserPermission.PermUser)
+ return UserToken(
+ User(id=user.id, email=user.email),
+ token)
def random_string(length: int) -> str:
Add DTOs in application/dto/user.py:
@@ -11,3 +11,9 @@ class UserCredential:
class User:
id: int
email: str
+
+
+@dataclass
+class UserToken:
+ user: User
+ token: str
Tune application/wire_helper.py:
@@ -1,25 +1,35 @@
-from books.domain.gateway import BookManager, ReviewManager, UserManager
+from books.domain.gateway import BookManager, ReviewManager, UserManager, PermissionManager
from books.infrastructure.cache import RedisCache, CacheHelper
+from books.infrastructure.token import TokenKeeper
from ..infrastructure.config import Config
from ..infrastructure.database import MySQLPersistence, MongoPersistence
class WireHelper:
- def __init__(self, sqlPersistence: MySQLPersistence, noSQLPersistence: MongoPersistence, kvStore: RedisCache):
+ def __init__(self,
+ sqlPersistence: MySQLPersistence,
+ noSQLPersistence: MongoPersistence,
+ kvStore: RedisCache,
+ tokenKeeper: TokenKeeper):
self.sqlPersistence = sqlPersistence
self.noSQLPersistence = noSQLPersistence
self.kvStore = kvStore
+ self.tokenKeeper = tokenKeeper
@classmethod
def new(cls, c: Config):
db = MySQLPersistence(c.db, c.app.page_size)
mdb = MongoPersistence(c.db.mongo_uri, c.db.mongo_db_name)
kv = RedisCache(c.cache)
- return cls(db, mdb, kv)
+ tk = TokenKeeper(c.app.token_secret, c.app.token_hours)
+ return cls(db, mdb, kv, tk)
def book_manager(self) -> BookManager:
return self.sqlPersistence
+ def perm_manager(self) -> PermissionManager:
+ return self.tokenKeeper
+
def review_manager(self) -> ReviewManager:
return self.noSQLPersistence
Add perm_check
middleware into routes for Create/Update/Delete operations in adapter/router.py:
@@ -1,7 +1,9 @@
import logging
from flask import Flask, request, jsonify
+from books.adapter.middleware import perm_check
from books.application.dto import UserCredential
+from books.domain.model.user import UserPermission
from ..application.executor import BookOperator, ReviewOperator, UserOperator
from ..application import WireHelper
from ..domain.model import Book, Review
@@ -138,15 +140,15 @@ def make_router(app: Flask, wire_helper: WireHelper):
wire_helper.book_manager(),
wire_helper.cache_helper()),
ReviewOperator(wire_helper.review_manager()),
- UserOperator(wire_helper.user_manager()))
+ UserOperator(wire_helper.user_manager(), wire_helper.perm_manager()))
app.add_url_rule('/', view_func=health)
app.add_url_rule('/books', view_func=rest_handler.get_books)
app.add_url_rule('/books/<int:id>', view_func=rest_handler.get_book)
- app.add_url_rule('/books', view_func=rest_handler.create_book,
+ app.add_url_rule('/books', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.create_book),
methods=['POST'])
- app.add_url_rule('/books/<int:id>', view_func=rest_handler.update_book,
+ app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.update_book),
methods=['PUT'])
- app.add_url_rule('/books/<int:id>', view_func=rest_handler.delete_book,
+ app.add_url_rule('/books/<int:id>', view_func=perm_check(wire_helper.perm_manager(), UserPermission.PermAuthor)(rest_handler.delete_book),
methods=['DELETE'])
app.add_url_rule('/books/<int:book_id>/reviews',
view_func=rest_handler.get_reviews_of_book)
perm_check
is implemented in adapter/middleware.py:
from functools import wraps
from flask import request, jsonify
from books.domain.gateway import PermissionManager
from books.domain.model.user import UserPermission
TOKEN_PREFIX = "Bearer "
def perm_check(perm_manager: PermissionManager, allow_perm: UserPermission):
def middleware(view_func):
@wraps(view_func)
def wrapped_view(*args, **kwargs):
auth_header = request.headers.get("Authorization")
if not auth_header:
return jsonify({"error": "Token is required"}), 401
token = auth_header.replace(TOKEN_PREFIX, "", 1)
try:
has_perm = perm_manager.has_permission(token, allow_perm)
except Exception as e:
return jsonify({"error": f"{e}"}), 401
if not has_perm:
return jsonify({"error": "Unauthorized"}), 401
return view_func(*args, **kwargs)
return wrapped_view
return middleware
Those are all the changes you need to utilize JWT in your api server. Let's try it out again!
Try with curl
User sign in and get a Token
curl -X POST -H "Content-Type: application/json" -d '{"email": "test-user@example.com", "password": "test-pass"}' http://localhost:5000/users/sign-in
Result:
{
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk",
"user": {
"email": "test-user@example.com",
"id": 2
}
}
Put the token into Debugger on https://jwt.io/, then you can check the payload and verify the signature of your token.
Create a new book with a valid and proper Token
curl -X POST \
http://localhost:5000/books \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoyLCJ1c2VyX25hbWUiOiJ0ZXN0LXVzZXJAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjozLCJleHAiOjE3MDk5ODk1NTB9.aUTu4YI2_9aUWEvffe7pRQbAuivIq73NsPy6IXYypCk' \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
Successful result:
{
"author": "John Doe",
"created_at": null,
"description": "A sample book description",
"id": 18,
"isbn": "1234567890",
"published_at": "2003-01-01",
"title": "Test Book",
"total_pages": 100,
"updated_at": null
}
Create a new book with a Token, but it doesn't have proper permissions
curl -X POST \
http://localhost:5000/books \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo2LCJ1c2VyX25hbWUiOiJ0ZXN0LW5vZGVAZXhhbXBsZS5jb20iLCJwZXJtaXNzaW9uIjoxLCJleHAiOjE3MDk5ODk5MjN9.KhNCNRFYVAe67pyrFPH_bCFMbn83y55AsCtlDa3v0Sk' \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
Result:
{
"error": "Unauthorized"
}
Create a new book without a Token
curl -X POST \
http://localhost:5000/books \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
Result:
{
"error": "Token is required"
}
Create a new book with a fake Token
curl -X POST \
http://localhost:5000/books \
-H 'Authorization: Bearer FAKE_TOKEN' \
-H 'Content-Type: application/json' \
-d '{
"title": "Test Book",
"author": "John Doe",
"published_at": "2003-01-01",
"description": "A sample book description",
"isbn": "1234567890",
"total_pages": 100
}'
Result:
{
"error": "failed to decode token"
}
Perfect! 🎉
Now, some of your api endpoints are protected by the authentication mechanism.