Skip to the content.

FastAPI App

Installation

motor

See MongoDB motor - Python Cookbook

ODM (Object-Document Model): beanie, pyodmongo

pipenv install beanie
pipenv install pyodmongo
# pyproject.toml

dependencies = [
    "beanie",
    "pyodmongo",
]

App

.env

# .env

APP_NAME="FastAPI App"
APP_VERSION="v0.0.1"
APP_DESCRIPTION="FastAPI app description."
DEBUG=true
MONGODB_URL="mongodb://localhost:27019/?replicaSet=xxx&maxPoolSize=4096&connectTimeoutMS=3000&socketTimeoutMS=3500&serverSelectionTimeoutMS=2000"
MONGODB_DB_NAME="xxx"
REDIS_URL="redis://:foobared@localhost:6379/0"
CACHE_MAX_CONNS=4096
CACHE_CONN_TIMEOUT=3.0
CACHE_TIMEOUT=3.5
CACHE_PREFIX="python-cookbook"
MQTT_TOPIC_PREFIX="python-cookbook"

settings

"""Settings."""

from functools import lru_cache

from pydantic import MongoDsn, RedisDsn
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file='.env', env_file_encoding='utf-8', extra='allow'
    )

    app_name: str
    app_version: str = '0.1.0'
    app_doc_url: str = '/docs'
    app_description: str = ''
    debug: bool = False

    # MongoDB
    mongodb_url: MongoDsn
    mongodb_db_name: str

    # Cache: Redis
    redis_url: RedisDsn
    cache_max_conns: int = 4096
    cache_conn_timeout: float | None = 3.0
    cache_timeout: float | None = 3.5
    cache_prefix: str

    # MQTT
    mqtt_host: str = "localhost"
    mqtt_port: int = 1883
    mqtt_username: str | None = None
    mqtt_password: str | None = None
    mqtt_timeout: float | None = 3.5
    mqtt_qos: int = 2
    mqtt_topic_prefix: str


@lru_cache()
def get_settings() -> Settings:
    return Settings()  # pyright: ignore[reportCallIssue]

App

"""FastAPI App."""

import asyncio
import json
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import TypedDict

import aiomqtt
import uvicorn
from fastapi import FastAPI, Request
from fastapi.openapi.docs import (
    get_redoc_html,
    get_swagger_ui_html,
    get_swagger_ui_oauth2_redirect_html,
)
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from motor.motor_asyncio import AsyncIOMotorClient
from redis.asyncio import Redis

from examples.web.fastapi.routers import router
from examples.web.fastapi.settings import get_settings
from examples.web.fastapi.workers import handle_bytes

settings = get_settings()

API_DOC_STATIC_DIR = 'examples/web/fastapi/static'
API_DOC_STATIC_PATH = f'{settings.app_doc_url}/{API_DOC_STATIC_DIR}'

MONGODB_CLIENT = AsyncIOMotorClient(str(settings.mongodb_url))
DB_XXX = MONGODB_CLIENT[settings.mongodb_db_name]
TB_XXX = DB_XXX['examples']


class State(TypedDict):
    redis_client: Redis
    mqtt_client: aiomqtt.Client


async def mqtt_listen(client: aiomqtt.Client) -> None:
    async with asyncio.TaskGroup() as tg:
        async for message in client.messages:
            msg = message.payload
            if isinstance(msg, bytes):
                tg.create_task(handle_bytes(msg), name=str(message.mid))


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncGenerator:

    loop = asyncio.get_event_loop()

    async with (
        Redis.from_url(
            url=str(settings.redis_url),
            encoding='utf-8',
            decode_responses=True,
            max_connections=settings.cache_max_conns,
            socket_connect_timeout=settings.cache_conn_timeout,
            socket_timeout=settings.cache_timeout,
        ) as redis_client,
        aiomqtt.Client(
            settings.mqtt_host,
            settings.mqtt_port,
            username=settings.mqtt_username,
            password=settings.mqtt_password,
            timeout=settings.mqtt_timeout,
        ) as mqtt_client,
    ):
        # Subscribe MQTT
        await mqtt_client.subscribe(f'{settings.mqtt_topic_prefix}/#')
        task = loop.create_task(mqtt_listen(mqtt_client))

        yield {'redis_client': redis_client, 'mqtt_client': mqtt_client}

        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

    MONGODB_CLIENT.close()


app: FastAPI = FastAPI(
    title=settings.app_name,
    docs_url=settings.app_doc_url,
    debug=settings.debug,
    openapi_url=f'{settings.app_doc_url}/openapi.json',
    description=settings.app_description,
    version=settings.app_version,
    lifespan=lifespan,
)

app.mount(API_DOC_STATIC_PATH, StaticFiles(directory=API_DOC_STATIC_DIR), name='static')
assert isinstance(app.swagger_ui_oauth2_redirect_url, str)


@app.head(settings.app_doc_url, include_in_schema=False)
@app.get(settings.app_doc_url, include_in_schema=False)
async def custom_swagger_ui_html() -> HTMLResponse:
    """Custom Swagger UI"""
    assert isinstance(app.openapi_url, str)
    return get_swagger_ui_html(
        openapi_url=app.openapi_url,
        title=app.title + ' - Swagger UI',
        oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
        swagger_js_url=f'{API_DOC_STATIC_PATH}/swagger-ui-bundle.js',
        swagger_css_url=f'{API_DOC_STATIC_PATH}/swagger-ui.css',
    )


@app.get(app.swagger_ui_oauth2_redirect_url, include_in_schema=False)
async def swagger_ui_redirect() -> HTMLResponse:
    """Swagger UI redirect"""
    return get_swagger_ui_oauth2_redirect_html()


@app.get(f'{settings.app_doc_url}/redoc', include_in_schema=False)
async def redoc_html() -> HTMLResponse:
    """Custom redoc html."""
    assert isinstance(app.openapi_url, str)
    return get_redoc_html(
        openapi_url=app.openapi_url,
        title=app.title + ' - ReDoc',
        redoc_js_url=f'{API_DOC_STATIC_PATH}/redoc.standalone.js',
    )


@app.get('/api')
async def root(request: Request) -> dict[str, str | None]:
    db_doc = await TB_XXX.find_one({'name': settings.app_name})
    cache_val = await request.state.redis_client.get(
        f'{settings.cache_prefix}:examples'
    )
    await request.state.mqtt_client.publish(
        f'{settings.mqtt_topic_prefix}/example',
        payload=json.dumps({'msg': 'hello'}, ensure_ascii=False),
        qos=settings.mqtt_qos,
    )
    return {'db': db_doc, 'cache': cache_val}


app.include_router(router, prefix='/api/router', tags=['router'])


# Only for develop environment
if __name__ == '__main__':
    uvicorn.run(app='main:app', host='', reload=True)

More

References