Skip to content

WebSocket Connection Pool (Async)

pool

Classes

ConnectionPoolExhaustedError

ConnectionPoolExhaustedError(message: str | None = None)

Bases: WebsocketError

Raised when the connection pool has reached its maximum capacity.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

ConnectionPoolUnavailableError

ConnectionPoolUnavailableError(message: str | None = None)

Bases: WebsocketError

Raised when attempting to use a closed or unavailable pool.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

PendingConnection

PendingConnection(connection: WebsocketConnection, drain_timeout: float, drain_condition: DrainConditionCallback)

Represents a connection that is being drained before pool return.

A pending connection is in the process of draining server-sent data before being returned to the available connection pool.

Attributes:

Name Type Description
connection

The WebsocketConnection being drained.

drain_timeout

Maximum time to spend draining in seconds.

drain_condition

Callback to determine what data should be drained.

created_at

Unix timestamp when this pending connection was created.

drain_task Task[None] | None

The asyncio Task performing the drain operation.

pending_id

Unique identifier for this pending connection.

Initialize a pending connection.

Parameters:

Name Type Description Default
connection WebsocketConnection

The WebsocketConnection to drain.

required
drain_timeout float

Maximum time to spend draining in seconds.

required
drain_condition DrainConditionCallback

Function to determine what constitutes server data.

required
Source code in audex/lib/websocket/pool.py
def __init__(
    self,
    connection: WebsocketConnection,
    drain_timeout: float,
    drain_condition: DrainConditionCallback,
):
    """Initialize a pending connection.

    Args:
        connection: The WebsocketConnection to drain.
        drain_timeout: Maximum time to spend draining in seconds.
        drain_condition: Function to determine what constitutes server data.
    """
    self.connection = connection
    self.drain_timeout = drain_timeout
    self.drain_condition = drain_condition
    self.created_at = time.time()
    self.drain_task: aio.Task[None] | None = None
    self.pending_id = uuid.uuid4().hex
    self._completed = False
    self._lock = aio.Lock()
Functions
mark_completed async
mark_completed() -> bool

Atomically mark this pending connection as completed.

Returns:

Type Description
bool

True if this call marked it as completed, False if already completed.

Source code in audex/lib/websocket/pool.py
async def mark_completed(self) -> bool:
    """Atomically mark this pending connection as completed.

    Returns:
        True if this call marked it as completed, False if already completed.
    """
    async with self._lock:
        if self._completed:
            return False
        self._completed = True
        return True

WebsocketConnectionPool

WebsocketConnectionPool(*, uri: str, headers: dict[str, str] | None = None, idle_timeout: float = 60.0, max_connections: int = 50, max_retries: int = 3, cleanup_interval: float = 5.0, connection_timeout: float = 10.0, warmup_connections: int = 0, check_server_data_on_release: bool = False, drain_timeout: float = 10.0, drain_quiet_period: float = 2.0, drain_condition: DrainConditionCallback | None = None, **kwargs: Any)

Bases: LoggingMixin

An asynchronous WebSocket connection pool for managing reusable connections.

This class provides efficient and fault-tolerant management of a pool of WebSocket connections to a specified URI. It supports connection reuse, automatic cleanup of idle or disconnected connections, retry mechanisms, optional warm-up on startup, and server data detection during connection release.

Key Features
  • Connection reuse for improved performance
  • Connection acquisition with automatic exponential backoff retries
  • Optional warm-up of initial connections on startup
  • Background cleanup of idle or disconnected connections
  • Asynchronous server data draining to prevent data loss
  • Supports context management for automatic connection release
  • Thread-safe operations with proper lock management

Attributes:

Name Type Description
uri

The WebSocket URI to connect to.

headers

Optional headers sent with each connection.

Initialize a WebSocket connection pool.

Parameters:

Name Type Description Default
uri str

The WebSocket URI to connect to.

required
headers dict[str, str] | None

Optional HTTP headers to include in connection requests.

None
idle_timeout float

Maximum time in seconds a connection can remain idle before being closed. Defaults to 60.0.

60.0
max_connections int

Maximum number of concurrent connections allowed in the pool. Defaults to 50.

50
max_retries int

Maximum number of retry attempts when acquiring a connection fails. Defaults to 3.

3
cleanup_interval float

Interval in seconds between cleanup operations for idle connections. Defaults to 5.0.

5.0
connection_timeout float

Timeout in seconds for establishing new connections. Defaults to 10.0.

10.0
warmup_connections int

Number of connections to create during pool initialization. Defaults to 0.

0
check_server_data_on_release bool

Whether to check for server data during connection release. Defaults to False.

False
drain_timeout float

Maximum time in seconds to drain server data during release. Defaults to 10.0.

10.0
drain_quiet_period float

Time in seconds to wait without receiving data before considering connection clean. Defaults to 2.0.

2.0
drain_condition DrainConditionCallback | None

Function to determine what constitutes server data that should be drained. Defaults to None (uses default condition).

None
**kwargs Any

Additional parameters to pass to WebsocketConnection.

{}
Source code in audex/lib/websocket/pool.py
def __init__(
    self,
    *,
    uri: str,
    headers: dict[str, str] | None = None,
    idle_timeout: float = 60.0,
    max_connections: int = 50,
    max_retries: int = 3,
    cleanup_interval: float = 5.0,
    connection_timeout: float = 10.0,
    warmup_connections: int = 0,
    check_server_data_on_release: bool = False,
    drain_timeout: float = 10.0,
    drain_quiet_period: float = 2.0,
    drain_condition: DrainConditionCallback | None = None,
    **kwargs: t.Any,
):
    """Initialize a WebSocket connection pool.

    Args:
        uri: The WebSocket URI to connect to.
        headers: Optional HTTP headers to include in connection requests.
        idle_timeout: Maximum time in seconds a connection can remain idle
            before being closed. Defaults to 60.0.
        max_connections: Maximum number of concurrent connections allowed
            in the pool. Defaults to 50.
        max_retries: Maximum number of retry attempts when acquiring a
            connection fails. Defaults to 3.
        cleanup_interval: Interval in seconds between cleanup operations
            for idle connections. Defaults to 5.0.
        connection_timeout: Timeout in seconds for establishing new
            connections. Defaults to 10.0.
        warmup_connections: Number of connections to create during pool
            initialization. Defaults to 0.
        check_server_data_on_release: Whether to check for server data
            during connection release. Defaults to False.
        drain_timeout: Maximum time in seconds to drain server data
            during release. Defaults to 10.0.
        drain_quiet_period: Time in seconds to wait without receiving data
            before considering connection clean. Defaults to 2.0.
        drain_condition: Function to determine what constitutes server data
            that should be drained. Defaults to None (uses default
            condition).
        **kwargs: Additional parameters to pass to WebsocketConnection.
    """
    super().__init__()
    self.uri = uri
    self.headers = headers or {}
    self._idle_timeout = idle_timeout
    self._max_connections = max_connections
    self._max_retries = max_retries
    self._cleanup_interval = cleanup_interval
    self._connection_timeout = connection_timeout
    self._warmup_connections = warmup_connections
    self._check_server_data_on_release = check_server_data_on_release
    self._drain_timeout = drain_timeout
    self._drain_quiet_period = drain_quiet_period
    self._drain_condition = drain_condition or self._default_drain_condition
    self._params = kwargs

    self._available: collections.deque[WebsocketConnection] = collections.deque()
    self._busy: set[WebsocketConnection] = set()
    # pending_id -> PendingConnection
    self._pending: dict[str, PendingConnection] = {}
    self._total = 0
    self._lock = aio.Lock()
    self._closed = False
    self._cleanup_task: aio.Task[None] | None = None
    self._started = False
Attributes
is_closed property
is_closed: bool

Check if the connection pool is closed.

Returns:

Type Description
bool

True if the pool is closed, False otherwise.

is_started property
is_started: bool

Check if the connection pool has been started.

Returns:

Type Description
bool

True if the pool has been started, False otherwise.

total_connections property
total_connections: int

Get the total number of connections in the pool.

Returns:

Type Description
int

The total number of connections (available, busy, and pending).

available_connections property
available_connections: int

Get the number of available connections.

Returns:

Type Description
int

The number of connections available for use.

busy_connections property
busy_connections: int

Get the number of busy connections.

Returns:

Type Description
int

The number of connections currently in use.

pending_connections property
pending_connections: int

Get the number of pending connections.

Returns:

Type Description
int

The number of connections being drained.

Functions
start async
start() -> None

Start the connection pool.

Initializes the pool, starts the cleanup task, and optionally creates warmup connections if configured.

Source code in audex/lib/websocket/pool.py
async def start(self) -> None:
    """Start the connection pool.

    Initializes the pool, starts the cleanup task, and optionally
    creates warmup connections if configured.
    """
    if self._started or self._closed:
        return

    self._started = True
    if self._cleanup_task is None or self._cleanup_task.done():
        self._cleanup_task = aio.create_task(self._cleanup_loop())

    # Warmup connections if specified
    if self._warmup_connections > 0:
        await self._warmup_pool()

    self.logger.info(f"Started WebSocket connection pool for {self.uri}")
acquire async
acquire() -> WebsocketConnection

Acquire a connection from the pool.

Attempts to reuse an existing available connection, or creates a new one if none are available and the pool limit hasn't been reached. Includes automatic retry logic for transient failures.

Returns:

Type Description
WebsocketConnection

An acquired WebsocketConnection ready for use.

Raises:

Type Description
ConnectionPoolUnavailableError

If the pool is closed.

ConnectionPoolExhaustedError

If maximum connections limit is reached.

ConnectionUnavailableError

If connection acquisition fails after retries.

Source code in audex/lib/websocket/pool.py
async def acquire(self) -> WebsocketConnection:
    """Acquire a connection from the pool.

    Attempts to reuse an existing available connection, or creates a new
    one if none are available and the pool limit hasn't been reached.
    Includes automatic retry logic for transient failures.

    Returns:
        An acquired WebsocketConnection ready for use.

    Raises:
        ConnectionPoolUnavailableError: If the pool is closed.
        ConnectionPoolExhaustedError: If maximum connections limit is
            reached.
        ConnectionUnavailableError: If connection acquisition fails
            after retries.
    """
    if not self._started:
        await self.start()

    retry = tenacity.retry(
        stop=tenacity.stop_after_attempt(self._max_retries),
        wait=tenacity.wait_exponential(multiplier=1, max=10),
        retry=tenacity.retry_if_exception_type((
            ConnectionUnavailableError,
            ConnectionClosedError,
            OSError,
            aio.TimeoutError,
        )),
    )
    return await retry(self._acquire)()
acquire_new async
acquire_new() -> WebsocketConnection

Force acquire a new connection, avoiding reuse of existing ones.

This method always creates a fresh connection and returns it in acquired state, similar to acquire() but without attempting to reuse existing connections.

Returns:

Type Description
WebsocketConnection

A newly created and acquired WebsocketConnection.

Raises:

Type Description
ConnectionPoolUnavailableError

If the pool is closed.

ConnectionPoolExhaustedError

If maximum connections limit is reached.

ConnectionUnavailableError

If connection creation fails after retries.

Source code in audex/lib/websocket/pool.py
async def acquire_new(self) -> WebsocketConnection:
    """Force acquire a new connection, avoiding reuse of existing
    ones.

    This method always creates a fresh connection and returns it in
    acquired state, similar to acquire() but without attempting to
    reuse existing connections.

    Returns:
        A newly created and acquired WebsocketConnection.

    Raises:
        ConnectionPoolUnavailableError: If the pool is closed.
        ConnectionPoolExhaustedError: If maximum connections limit is
            reached.
        ConnectionUnavailableError: If connection creation fails after
            retries.
    """
    if not self._started:
        await self.start()

    retry = tenacity.retry(
        stop=tenacity.stop_after_attempt(self._max_retries),
        wait=tenacity.wait_exponential(multiplier=1, max=10),
        retry=tenacity.retry_if_exception_type((
            ConnectionUnavailableError,
            ConnectionClosedError,
            OSError,
            aio.TimeoutError,
        )),
    )
    return await retry(self._acquire_new)()
release async
release(connection: WebsocketConnection, *, force_remove: bool = False) -> None

Release a connection back to the pool.

The connection will be either: 1. Returned to the available pool immediately if data draining is disabled 2. Put into pending state for background draining if data draining is enabled 3. Removed from the pool if force_remove is True or the connection is unhealthy

Parameters:

Name Type Description Default
connection WebsocketConnection

The WebsocketConnection to release.

required
force_remove bool

If True, the connection will be removed from the pool regardless of its state. Defaults to False.

False
Source code in audex/lib/websocket/pool.py
async def release(self, connection: WebsocketConnection, *, force_remove: bool = False) -> None:
    """Release a connection back to the pool.

    The connection will be either:
    1. Returned to the available pool immediately if data draining is disabled
    2. Put into pending state for background draining if data draining is enabled
    3. Removed from the pool if force_remove is True or the connection is unhealthy

    Args:
        connection: The WebsocketConnection to release.
        force_remove: If True, the connection will be removed from the pool
            regardless of its state. Defaults to False.
    """
    async with self._lock:
        if connection not in self._busy:
            self.logger.warning(
                f"Attempting to release connection not in busy set: {connection}"
            )
            return

        self._busy.remove(connection)

    # Release connection outside lock
    try:
        await connection.release()
    except Exception as e:
        self.logger.warning(f"Error releasing connection: {e}")
        await self._remove_connection(connection)
        return

    async with self._lock:
        if force_remove or not connection.is_connected or self._closed:
            # Connection should be removed
            if connection.is_connected:
                self._total -= 1
            await connection.close()
            self.logger.debug(f"Removed connection from pool: {connection}")
            return

        # If we don't need to check for server data, directly return to available
        if not self._check_server_data_on_release:
            self._available.append(connection)
            self.logger.debug(f"Released connection directly to available: {connection}")
            return

        # Put connection into pending state for background draining
        pending_conn = PendingConnection(
            connection=connection,
            drain_timeout=self._drain_timeout,
            drain_condition=self._drain_condition,
        )

        self._pending[pending_conn.pending_id] = pending_conn

    # Start background drain task outside lock
    pending_conn.drain_task = aio.create_task(self._drain_connection(pending_conn))

    self.logger.debug(f"Put connection into pending state for draining: {connection}")
get_connection async
get_connection() -> AsyncGenerator[WebsocketConnection, None]

Get a connection from the pool as an async context manager.

The connection will be automatically released back to the pool when the context manager exits.

Yields:

Type Description
AsyncGenerator[WebsocketConnection, None]

A WebsocketConnection from the pool.

Raises:

Type Description
ConnectionPoolUnavailableError

If the pool is closed.

ConnectionPoolExhaustedError

If maximum connections limit is reached.

ConnectionUnavailableError

If connection acquisition fails.

Example

async with pool.get_connection() as conn: await conn.send("Hello") response = await conn.recv()

Source code in audex/lib/websocket/pool.py
@contextlib.asynccontextmanager
async def get_connection(self) -> t.AsyncGenerator[WebsocketConnection, None]:
    """Get a connection from the pool as an async context manager.

    The connection will be automatically released back to the pool when
    the context manager exits.

    Yields:
        A WebsocketConnection from the pool.

    Raises:
        ConnectionPoolUnavailableError: If the pool is closed.
        ConnectionPoolExhaustedError: If maximum connections limit is reached.
        ConnectionUnavailableError: If connection acquisition fails.

    Example:
        async with pool.get_connection() as conn:
            await conn.send("Hello")
            response = await conn.recv()
    """
    connection = await self.acquire()
    try:
        yield connection
    finally:
        await self.release(connection)
get_new_connection async
get_new_connection() -> AsyncGenerator[WebsocketConnection, None]

Get a new connection from the pool as an async context manager.

Forces creation of a new connection without reusing existing ones. The connection will be automatically released back to the pool when the context manager exits.

Yields:

Type Description
AsyncGenerator[WebsocketConnection, None]

A newly created WebsocketConnection.

Raises:

Type Description
ConnectionPoolUnavailableError

If the pool is closed.

ConnectionPoolExhaustedError

If maximum connections limit is reached.

ConnectionUnavailableError

If connection creation fails.

Example

async with pool.get_new_connection() as conn: await conn.send("Hello") response = await conn.recv()

Source code in audex/lib/websocket/pool.py
@contextlib.asynccontextmanager
async def get_new_connection(self) -> t.AsyncGenerator[WebsocketConnection, None]:
    """Get a new connection from the pool as an async context
    manager.

    Forces creation of a new connection without reusing existing ones.
    The connection will be automatically released back to the pool when
    the context manager exits.

    Yields:
        A newly created WebsocketConnection.

    Raises:
        ConnectionPoolUnavailableError: If the pool is closed.
        ConnectionPoolExhaustedError: If maximum connections limit is
            reached.
        ConnectionUnavailableError: If connection creation fails.

    Example:
        async with pool.get_new_connection() as conn:
            await conn.send("Hello")
            response = await conn.recv()
    """
    connection = await self.acquire_new()
    try:
        yield connection
    finally:
        await self.release(connection)
close_all async
close_all() -> None

Close all connections and shut down the pool.

Stops the cleanup task and closes all connections in the pool. After calling this method, the pool cannot be used again.

Source code in audex/lib/websocket/pool.py
async def close_all(self) -> None:
    """Close all connections and shut down the pool.

    Stops the cleanup task and closes all connections in the pool.
    After calling this method, the pool cannot be used again.
    """
    async with self._lock:
        if self._closed:
            return

        self._closed = True

        # Cancel cleanup task
        if self._cleanup_task and not self._cleanup_task.done():
            self._cleanup_task.cancel()

        # Collect all connections to close
        connections_to_close = []
        connections_to_close.extend(list(self._available))
        connections_to_close.extend(list(self._busy))

        drain_tasks_to_cancel = []
        for pending_conn in self._pending.values():
            connections_to_close.append(pending_conn.connection)
            if pending_conn.drain_task and not pending_conn.drain_task.done():
                drain_tasks_to_cancel.append(pending_conn.drain_task)

        # Clear all tracking structures
        self._available.clear()
        self._busy.clear()
        self._pending.clear()
        self._total = 0

    # Wait for cleanup task to finish
    if self._cleanup_task:
        with contextlib.suppress(aio.CancelledError):
            await self._cleanup_task
        self._cleanup_task = None

    # Cancel all drain tasks
    for task in drain_tasks_to_cancel:
        task.cancel()

    # Wait for all drain tasks to finish
    if drain_tasks_to_cancel:
        await aio.gather(*drain_tasks_to_cancel, return_exceptions=True)

    # Close all connections
    for conn in connections_to_close:
        try:
            await conn.close()
        except Exception as e:
            self.logger.warning(f"Error closing connection during pool shutdown: {e}")

    self.logger.info(f"Closed WebSocket connection pool for {self.uri}")

options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true