WebSocket Connection Pool (Async)¶
pool ¶
Classes¶
ConnectionPoolExhaustedError ¶
Bases: WebsocketError
Raised when the connection pool has reached its maximum capacity.
Source code in audex/exceptions.py
ConnectionPoolUnavailableError ¶
Bases: WebsocketError
Raised when attempting to use a closed or unavailable pool.
Source code in audex/exceptions.py
PendingConnection ¶
PendingConnection(connection: WebsocketConnection, drain_timeout: float, drain_condition: DrainConditionCallback)
Represents a connection that is being drained before pool return.
A pending connection is in the process of draining server-sent data before being returned to the available connection pool.
Attributes:
| Name | Type | Description |
|---|---|---|
connection | The WebsocketConnection being drained. | |
drain_timeout | Maximum time to spend draining in seconds. | |
drain_condition | Callback to determine what data should be drained. | |
created_at | Unix timestamp when this pending connection was created. | |
drain_task | Task[None] | None | The asyncio Task performing the drain operation. |
pending_id | Unique identifier for this pending connection. |
Initialize a pending connection.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection | WebsocketConnection | The WebsocketConnection to drain. | required |
drain_timeout | float | Maximum time to spend draining in seconds. | required |
drain_condition | DrainConditionCallback | Function to determine what constitutes server data. | required |
Source code in audex/lib/websocket/pool.py
WebsocketConnectionPool ¶
WebsocketConnectionPool(*, uri: str, headers: dict[str, str] | None = None, idle_timeout: float = 60.0, max_connections: int = 50, max_retries: int = 3, cleanup_interval: float = 5.0, connection_timeout: float = 10.0, warmup_connections: int = 0, check_server_data_on_release: bool = False, drain_timeout: float = 10.0, drain_quiet_period: float = 2.0, drain_condition: DrainConditionCallback | None = None, **kwargs: Any)
Bases: LoggingMixin
An asynchronous WebSocket connection pool for managing reusable connections.
This class provides efficient and fault-tolerant management of a pool of WebSocket connections to a specified URI. It supports connection reuse, automatic cleanup of idle or disconnected connections, retry mechanisms, optional warm-up on startup, and server data detection during connection release.
Key Features
- Connection reuse for improved performance
- Connection acquisition with automatic exponential backoff retries
- Optional warm-up of initial connections on startup
- Background cleanup of idle or disconnected connections
- Asynchronous server data draining to prevent data loss
- Supports context management for automatic connection release
- Thread-safe operations with proper lock management
Attributes:
| Name | Type | Description |
|---|---|---|
uri | The WebSocket URI to connect to. | |
headers | Optional headers sent with each connection. |
Initialize a WebSocket connection pool.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
uri | str | The WebSocket URI to connect to. | required |
headers | dict[str, str] | None | Optional HTTP headers to include in connection requests. | None |
idle_timeout | float | Maximum time in seconds a connection can remain idle before being closed. Defaults to 60.0. | 60.0 |
max_connections | int | Maximum number of concurrent connections allowed in the pool. Defaults to 50. | 50 |
max_retries | int | Maximum number of retry attempts when acquiring a connection fails. Defaults to 3. | 3 |
cleanup_interval | float | Interval in seconds between cleanup operations for idle connections. Defaults to 5.0. | 5.0 |
connection_timeout | float | Timeout in seconds for establishing new connections. Defaults to 10.0. | 10.0 |
warmup_connections | int | Number of connections to create during pool initialization. Defaults to 0. | 0 |
check_server_data_on_release | bool | Whether to check for server data during connection release. Defaults to False. | False |
drain_timeout | float | Maximum time in seconds to drain server data during release. Defaults to 10.0. | 10.0 |
drain_quiet_period | float | Time in seconds to wait without receiving data before considering connection clean. Defaults to 2.0. | 2.0 |
drain_condition | DrainConditionCallback | None | Function to determine what constitutes server data that should be drained. Defaults to None (uses default condition). | None |
**kwargs | Any | Additional parameters to pass to WebsocketConnection. | {} |
Source code in audex/lib/websocket/pool.py
Attributes¶
is_closed property ¶
Check if the connection pool is closed.
Returns:
| Type | Description |
|---|---|
bool | True if the pool is closed, False otherwise. |
is_started property ¶
Check if the connection pool has been started.
Returns:
| Type | Description |
|---|---|
bool | True if the pool has been started, False otherwise. |
total_connections property ¶
Get the total number of connections in the pool.
Returns:
| Type | Description |
|---|---|
int | The total number of connections (available, busy, and pending). |
available_connections property ¶
Get the number of available connections.
Returns:
| Type | Description |
|---|---|
int | The number of connections available for use. |
busy_connections property ¶
Get the number of busy connections.
Returns:
| Type | Description |
|---|---|
int | The number of connections currently in use. |
pending_connections property ¶
Get the number of pending connections.
Returns:
| Type | Description |
|---|---|
int | The number of connections being drained. |
Functions¶
start async ¶
Start the connection pool.
Initializes the pool, starts the cleanup task, and optionally creates warmup connections if configured.
Source code in audex/lib/websocket/pool.py
acquire async ¶
acquire() -> WebsocketConnection
Acquire a connection from the pool.
Attempts to reuse an existing available connection, or creates a new one if none are available and the pool limit hasn't been reached. Includes automatic retry logic for transient failures.
Returns:
| Type | Description |
|---|---|
WebsocketConnection | An acquired WebsocketConnection ready for use. |
Raises:
| Type | Description |
|---|---|
ConnectionPoolUnavailableError | If the pool is closed. |
ConnectionPoolExhaustedError | If maximum connections limit is reached. |
ConnectionUnavailableError | If connection acquisition fails after retries. |
Source code in audex/lib/websocket/pool.py
acquire_new async ¶
acquire_new() -> WebsocketConnection
Force acquire a new connection, avoiding reuse of existing ones.
This method always creates a fresh connection and returns it in acquired state, similar to acquire() but without attempting to reuse existing connections.
Returns:
| Type | Description |
|---|---|
WebsocketConnection | A newly created and acquired WebsocketConnection. |
Raises:
| Type | Description |
|---|---|
ConnectionPoolUnavailableError | If the pool is closed. |
ConnectionPoolExhaustedError | If maximum connections limit is reached. |
ConnectionUnavailableError | If connection creation fails after retries. |
Source code in audex/lib/websocket/pool.py
release async ¶
release(connection: WebsocketConnection, *, force_remove: bool = False) -> None
Release a connection back to the pool.
The connection will be either: 1. Returned to the available pool immediately if data draining is disabled 2. Put into pending state for background draining if data draining is enabled 3. Removed from the pool if force_remove is True or the connection is unhealthy
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
connection | WebsocketConnection | The WebsocketConnection to release. | required |
force_remove | bool | If True, the connection will be removed from the pool regardless of its state. Defaults to False. | False |
Source code in audex/lib/websocket/pool.py
get_connection async ¶
get_connection() -> AsyncGenerator[WebsocketConnection, None]
Get a connection from the pool as an async context manager.
The connection will be automatically released back to the pool when the context manager exits.
Yields:
| Type | Description |
|---|---|
AsyncGenerator[WebsocketConnection, None] | A WebsocketConnection from the pool. |
Raises:
| Type | Description |
|---|---|
ConnectionPoolUnavailableError | If the pool is closed. |
ConnectionPoolExhaustedError | If maximum connections limit is reached. |
ConnectionUnavailableError | If connection acquisition fails. |
Example
async with pool.get_connection() as conn: await conn.send("Hello") response = await conn.recv()
Source code in audex/lib/websocket/pool.py
get_new_connection async ¶
get_new_connection() -> AsyncGenerator[WebsocketConnection, None]
Get a new connection from the pool as an async context manager.
Forces creation of a new connection without reusing existing ones. The connection will be automatically released back to the pool when the context manager exits.
Yields:
| Type | Description |
|---|---|
AsyncGenerator[WebsocketConnection, None] | A newly created WebsocketConnection. |
Raises:
| Type | Description |
|---|---|
ConnectionPoolUnavailableError | If the pool is closed. |
ConnectionPoolExhaustedError | If maximum connections limit is reached. |
ConnectionUnavailableError | If connection creation fails. |
Example
async with pool.get_new_connection() as conn: await conn.send("Hello") response = await conn.recv()
Source code in audex/lib/websocket/pool.py
close_all async ¶
Close all connections and shut down the pool.
Stops the cleanup task and closes all connections in the pool. After calling this method, the pool cannot be used again.
Source code in audex/lib/websocket/pool.py
options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true