Skip to the content.

Asynchronous I/O - Timeout

Recipes

import asyncio
import logging

logging.basicConfig(
    level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}'
)


async def do_task(name: str | int, delay: float) -> str:
    logging.debug(f'run task ({name}), sleep {delay} seconds')
    await asyncio.sleep(delay)
    return f'task ({name}) done'


async def coroutine_as_completed(arg: str) -> list[str]:
    """run coroutines concurrently: return result each task."""
    logging.debug(f'run coroutine_as_completed: {arg=}')

    rs: list[str] = []

    tasks = (do_task('10', 3.0), do_task('11', 3.5))
    for coroutine in asyncio.as_completed(tasks):
        r = await coroutine
        rs.append(r)

    return rs


async def coroutine_as_completed_timeout(arg: str) -> list[str]:
    """run coroutines concurrently: return result each task with timeout."""
    logging.debug(f'run coroutine_as_completed: {arg=}')

    rs: list[str] = []

    tasks = (do_task('12', 3.0), do_task('13', 3.5))
    for coroutine in asyncio.as_completed(tasks, timeout=3.2):
        try:
            r = await coroutine
        except asyncio.TimeoutError:
            logging.warning('timeout')
        else:
            rs.append(r)

    return rs


async def main() -> tuple[str, ...]:
    tasks1 = {asyncio.create_task(do_task(i, 1.0), name=f't{i}') for i in range(5)}
    done, _ = await asyncio.wait(tasks1)
    for t in done:
        logging.debug(f'result: {t.result()}')

    # with timeout
    tasks2 = {
        asyncio.create_task(do_task(i, i / 2), name=f't{i}') for i in range(5, 10)
    }
    done, pending = await asyncio.wait(tasks2, timeout=3.0)
    for t in done:
        logging.debug(f'result: {t.result()}')
    # cancel remaining tasks so they do not generate errors
    # as we exit wihout finishing them.
    for t in pending:
        logging.debug(f'cancel {t.get_name()}')
        t.cancel()

    r1 = await coroutine_as_completed('as_completed')
    r2 = await coroutine_as_completed_timeout('as_completed_timeout')

    return tuple(r1) + tuple(r2)


result: tuple[str, ...] = asyncio.run(main())
logging.debug(f'result: {result}')

References