Skip to the content.

motor - Asyncio Driver for MongoDB

Start

See MongoDB Overview - Linux Cookbook and MongoDB on Ubuntu - Linux Cookbook.

# pyproject.toml

dependencies = [
    "motor",
]

# mypy for MongoDB motor
[[tool.mypy.overrides]]
module = "motor.*"
ignore_missing_imports = true
pipenv install motor

Usage

import logging

import pymongo
from motor.core import AgnosticClient, AgnosticCollection, AgnosticDatabase
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.errors import DuplicateKeyError
from pymongo.results import InsertOneResult, UpdateResult


MONGO_URL = 'mongo://xxx'

MONGO_CLIENT: AgnosticClient = AsyncIOMotorClient(MONGO_URL)
MONGO_DB: AgnosticDatabase = MONGO_CLIENT['database-name']

tb_users: AgnosticCollection = MONGO_DB['table-name']

# create index
await tb_users.create_index(
    [('login_name', pymongo.ASCENDING)], unique=True, name='unique_login_name'
)

# find one
user_doc = await tb_users.find_one({'login_name': 'xxx'})
if user_doc is None:
    logging.error('not exists')

# insert one
try:
    rsp1: InsertOneResult = await tb_users.insert_one({'login_name': 'xxx'})
except DuplicateKeyError:
    logging.error('duplicated key')
logging.info(f'id={rsp1.inserted_id}')

# update one
try:
    rsp2: UpdateResult = await tb_users.update_one(
        {'login_name': 'xxx', '$set': {'login_name': 'yyy'}})
except DuplicateKeyError:
    logging.error('duplicated key')
if rsp2.modified_count != 1:
    logging.error('update failed')


# transaction
try:
    async with await MONGO_CLIENT.start_session() as session:
        async with session.start_transaction():
            await tb_users.insert_one(
                {'login_name': 'a'}, session=session
            )
            await tb_users.insert_one(
                {'login_name': 'b'}, session=session
            )
except DuplicateKeyError:
    logging.error('transaction failed')

More

References