Skip to the content.

FastAPI App

Installation

uv add beanie
uv add pyodmongo

App

.env

# .env

APP_NAME="FastAPI App"
APP_VERSION="v0.0.1"
APP_DESCRIPTION="FastAPI app description."
DEBUG=true
MONGODB_URL="mongodb://localhost:27019/?replicaSet=xxx&maxPoolSize=4096&connectTimeoutMS=3000&socketTimeoutMS=3500&serverSelectionTimeoutMS=2000"
MONGODB_DB_NAME="xxx"
MQTT_TOPIC_PREFIX="python-cookbook"

settings

"""Settings."""

from functools import lru_cache

from pydantic import MongoDsn, SecretStr
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file='.env', env_file_encoding='utf-8', extra='allow'
    )

    app_name: str
    app_version: str = '0.1.0'
    app_doc_url: str = '/docs'
    app_description: str = ''
    debug: bool = False

    # MongoDB
    mongodb_url: MongoDsn
    mongodb_db_name: str

    # MQTT
    mqtt_host: str = "localhost"
    mqtt_port: int = 1883
    mqtt_username: str = 'admin'
    mqtt_password: SecretStr = SecretStr('public')
    mqtt_timeout: float | None = 3.5
    mqtt_qos: int = 2
    mqtt_topic_prefix: str


@lru_cache()
def get_settings() -> Settings:
    return Settings()  # pyright: ignore[reportCallIssue]

App

"""FastAPI App."""

import asyncio
import json
import logging
import os
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import TypedDict

import aiomqtt
from fastapi import FastAPI, Request
from fastapi.openapi.docs import (
    get_redoc_html,
    get_swagger_ui_html,
    get_swagger_ui_oauth2_redirect_html,
)
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from pymongo import AsyncMongoClient

from examples.web.fastapi.routers import router
from examples.web.fastapi.settings import get_settings
from examples.web.fastapi.workers import handle_bytes

settings = get_settings()

API_DOC_STATIC_DIR = 'examples/web/fastapi/static'
API_DOC_STATIC_PATH = f'{settings.app_doc_url}/{API_DOC_STATIC_DIR}'

LOGGER = logging.getLogger('uvicorn')

mongodb_client: AsyncMongoClient = AsyncMongoClient(str(settings.mongodb_url))
db_xxx = mongodb_client[settings.mongodb_db_name]
tb_xxx = db_xxx['examples']


class State(TypedDict):
    mqtt_client: aiomqtt.Client


async def mqtt_listen(client: aiomqtt.Client) -> None:
    async with asyncio.TaskGroup() as tg:
        async for message in client.messages:
            msg = message.payload
            if isinstance(msg, bytes):
                tg.create_task(handle_bytes(msg), name=str(message.mid))


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[State, Any]:

    loop = asyncio.get_event_loop()

    async with (
        aiomqtt.Client(
            settings.mqtt_host,
            settings.mqtt_port,
            username=settings.mqtt_username,
            password=settings.mqtt_password.get_secret_value(),
            timeout=settings.mqtt_timeout,
            identifier=f'python-cookbook-{os.getpid()}',
        ) as mqtt_client,
    ):
        # Subscribe MQTT
        await mqtt_client.subscribe(f'{settings.mqtt_topic_prefix}/#')
        task = loop.create_task(mqtt_listen(mqtt_client))

        yield {'mqtt_client': mqtt_client}

        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            pass

    LOGGER.debug(f'MQTT client [python-cookbook-{os.getpid()}] disconected')
    await mongodb_client.close()


app: FastAPI = FastAPI(
    title=settings.app_name,
    docs_url=settings.app_doc_url,
    debug=settings.debug,
    openapi_url=f'{settings.app_doc_url}/openapi.json',
    description=settings.app_description,
    version=settings.app_version,
    lifespan=lifespan,
)

app.mount(API_DOC_STATIC_PATH, StaticFiles(directory=API_DOC_STATIC_DIR), name='static')
assert isinstance(app.swagger_ui_oauth2_redirect_url, str)


@app.head(settings.app_doc_url, include_in_schema=False)
@app.get(settings.app_doc_url, include_in_schema=False)
async def custom_swagger_ui_html() -> HTMLResponse:
    """Custom Swagger UI"""
    assert isinstance(app.openapi_url, str)
    return get_swagger_ui_html(
        openapi_url=app.openapi_url,
        title=app.title + ' - Swagger UI',
        oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
        swagger_js_url=f'{API_DOC_STATIC_PATH}/swagger-ui-bundle.js',
        swagger_css_url=f'{API_DOC_STATIC_PATH}/swagger-ui.css',
    )


@app.get(app.swagger_ui_oauth2_redirect_url, include_in_schema=False)
async def swagger_ui_redirect() -> HTMLResponse:
    """Swagger UI redirect"""
    return get_swagger_ui_oauth2_redirect_html()


@app.get(f'{settings.app_doc_url}/redoc', include_in_schema=False)
async def redoc_html() -> HTMLResponse:
    """Custom redoc html."""
    assert isinstance(app.openapi_url, str)
    return get_redoc_html(
        openapi_url=app.openapi_url,
        title=app.title + ' - ReDoc',
        redoc_js_url=f'{API_DOC_STATIC_PATH}/redoc.standalone.js',
    )


@app.get('/api')
async def root(request: Request) -> dict[str, str | None]:
    db_doc = await tb_xxx.find_one({'name': settings.app_name})
    await request.state.mqtt_client.publish(
        f'{settings.mqtt_topic_prefix}/example',
        payload=json.dumps({'msg': 'hello'}, ensure_ascii=False),
        qos=settings.mqtt_qos,
    )
    return {'db': db_doc}


app.include_router(router, prefix='/api/router', tags=['router'])

Run

uv run uvicorn --host 0.0.0.0 --port 8000 \
    --proxy-headers \
    --forwarded-allow-ips "*" \
    --workers 8 \
    --limit-concurrency 1024 \
    --backlog 4096 \
    --log-level debug \
    --timeout-keep-alive 5 \
    --use-colors \
    --no-server-header \
    examples.web.fastapi.main:app \
    --log-config examples/web/uvicorn_logging.json

More

References