Skip to the content.

TCP Client - Asynchronous I/O (Low-Level APIs)

Essentially, Transport and Protocol should only be used in libraries and frameworks and never in high-level asyncio applications.

Solution

"""TCP Client - Asynchronous I/O (Low-Level APIs).

Essentially, transports and protocols should only be used in libraries and frameworks
and never in high-level asyncio applications.
"""

from __future__ import annotations

import asyncio
import logging

from net import handle_socket_bufsize, handle_tcp_nodelay

logging.basicConfig(
    level=logging.DEBUG, style='{', format='[{threadName} ({thread})] {message}'
)


class EchoClientProtocol(asyncio.Protocol):
    def __init__(
        self,
        message: bytes,
        on_con_lost: asyncio.Future[bool],
        tcp_nodelay: bool = True,
        recv_buf_size: int | None = None,
        send_buf_size: int | None = None,
    ):
        self.message = message
        self.on_con_lost = on_con_lost
        self.tcp_nodelay = tcp_nodelay
        self.recv_buf_size = recv_buf_size
        self.send_buf_size = send_buf_size

    def connection_made(self, transport: asyncio.BaseTransport):
        assert isinstance(transport, asyncio.Transport)

        # `socket.getpeername()`
        server_address = transport.get_extra_info('peername')
        logging.debug(f'connected to {server_address}')

        sock = transport.get_extra_info('socket')
        assert sock.gettimeout() == 0.0
        handle_tcp_nodelay(sock, self.tcp_nodelay)
        handle_socket_bufsize(sock, self.recv_buf_size, self.send_buf_size)

        transport.write(self.message)
        logging.debug(f'sent: {self.message!r}')

    def data_received(self, data: bytes):
        logging.debug(f'recv: {data!r}')

    def connection_lost(self, exc: Exception | None):
        logging.debug('The server closed the connection')
        self.on_con_lost.set_result(True)


async def tcp_echo_client(data: bytes):
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(data, on_con_lost), '127.0.0.1', 8888
    )
    assert isinstance(protocol, asyncio.Protocol)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(tcp_echo_client(b'Hello World!'))  # Python 3.7+

More

References