Skip to content

WebSocket Connection

connection

Classes

ConnectionBusyError

ConnectionBusyError(message: str | None = None)

Bases: WebsocketError

Raised when attempting to use a connection that is already busy.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

ConnectionUnavailableError

ConnectionUnavailableError(message: str | None = None)

Bases: WebsocketError

Raised when a connection is unavailable or cannot be established.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

ConnectionClosedError

ConnectionClosedError(message: str | None = None)

Bases: WebsocketError

Raised when attempting to use a closed connection.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

ConnectionDrainTimeoutError

ConnectionDrainTimeoutError(message: str | None = None)

Bases: WebsocketError

Raised when connection draining exceeds the timeout.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

WebsocketConnection

WebsocketConnection(*, uri: str, headers: dict[str, str] | None = None, idle_timeout: float = 30.0, check_server_data_on_release: bool = False, drain_timeout: float = 5.0, drain_condition: DrainConditionCallback | None = None, **kwargs: Any)

Bases: LoggingMixin, Hashable

Manages a single WebSocket connection with lifecycle management.

This class provides automatic idle timeout monitoring, connection health checks, and proper resource cleanup for WebSocket connections.

Attributes:

Name Type Description
uri

The WebSocket URI to connect to.

headers

Optional HTTP headers for the connection.

idle_timeout

Maximum idle time before auto-close in seconds.

check_server_data_on_release

Whether to check for server data on release.

drain_timeout

Timeout for draining server data in seconds.

drain_condition

Callback to determine if data should be drained.

Initialize a WebSocket connection.

Parameters:

Name Type Description Default
uri str

The WebSocket URI to connect to.

required
headers dict[str, str] | None

Optional HTTP headers to include in connection requests.

None
idle_timeout float

Maximum time in seconds before idle connection closes. Defaults to 30.0.

30.0
check_server_data_on_release bool

Whether to check for server data during release. Defaults to False.

False
drain_timeout float

Maximum time in seconds to drain server data. Defaults to 5.0.

5.0
drain_condition DrainConditionCallback | None

Function to determine what constitutes server data that should be drained. Defaults to None (uses default condition).

None
**kwargs Any

Additional parameters to pass to websockets.connect().

{}
Source code in audex/lib/websocket/connection.py
def __init__(
    self,
    *,
    uri: str,
    headers: dict[str, str] | None = None,
    idle_timeout: float = 30.0,
    check_server_data_on_release: bool = False,
    drain_timeout: float = 5.0,
    drain_condition: DrainConditionCallback | None = None,
    **kwargs: t.Any,
):
    """Initialize a WebSocket connection.

    Args:
        uri: The WebSocket URI to connect to.
        headers: Optional HTTP headers to include in connection requests.
        idle_timeout: Maximum time in seconds before idle connection closes.
            Defaults to 30.0.
        check_server_data_on_release: Whether to check for server data
            during release. Defaults to False.
        drain_timeout: Maximum time in seconds to drain server data.
            Defaults to 5.0.
        drain_condition: Function to determine what constitutes server data
            that should be drained. Defaults to None (uses default condition).
        **kwargs: Additional parameters to pass to websockets.connect().
    """
    super().__init__()
    self.uri = uri
    self.headers = headers
    self.idle_timeout = idle_timeout
    self.check_server_data_on_release = check_server_data_on_release
    self.drain_timeout = drain_timeout
    self.drain_condition = drain_condition or self._default_drain_condition
    self._params = kwargs

    self.websocket: ClientConnection | None = None
    self._is_busy = False
    self._is_draining = False
    self._last_activity = time.time()
    self._monitor_task: aio.Task[None] | None = None
    self._closed = False
    self._lock = aio.Lock()
    self._connection_id = uuid.uuid4().hex  # For hashing
Attributes
is_busy property
is_busy: bool

Check if the connection is currently busy.

Returns:

Type Description
bool

True if the connection is busy, False otherwise.

is_draining property
is_draining: bool

Check if the connection is currently draining.

Returns:

Type Description
bool

True if the connection is draining, False otherwise.

is_connected property
is_connected: bool

Check if the connection is currently active.

Returns:

Type Description
bool

True if the connection is open and not closed, False otherwise.

last_activity property
last_activity: float

Get the timestamp of the last activity.

Returns:

Type Description
float

Unix timestamp of the last activity.

Functions
connect async
connect() -> None

Establish the WebSocket connection.

If already connected, this method does nothing. Otherwise, it attempts to establish a new connection and starts the idle monitor task.

Raises:

Type Description
ConnectionUnavailableError

If the connection has been closed or if connection establishment fails.

Source code in audex/lib/websocket/connection.py
async def connect(self) -> None:
    """Establish the WebSocket connection.

    If already connected, this method does nothing. Otherwise, it attempts
    to establish a new connection and starts the idle monitor task.

    Raises:
        ConnectionUnavailableError: If the connection has been closed or
            if connection establishment fails.
    """
    async with self._lock:
        if self.is_connected:
            return

        if self._closed:
            raise ConnectionUnavailableError("Connection has been closed")

        # Clean up old monitor task if done
        if self._monitor_task is not None and self._monitor_task.done():
            try:
                await self._monitor_task
            finally:
                self._monitor_task = None

    # Establish connection outside the lock to avoid blocking
    try:
        websocket = await connect(self.uri, additional_headers=self.headers, **self._params)

        async with self._lock:
            self.websocket = websocket
            self._update_activity()

            # Start monitor task if not already running
            if self._monitor_task is None or self._monitor_task.done():
                self._monitor_task = aio.create_task(self._monitor_idle())

        self.logger.debug(f"Connected to {self.uri}")
    except Exception as e:
        self.logger.error(f"Failed to connect to {self.uri}: {e}")
        raise ConnectionUnavailableError(f"Failed to connect: {e}") from e
close async
close() -> None

Close the WebSocket connection and clean up resources.

This method cancels the idle monitor task, closes the WebSocket connection, and resets all internal state flags.

Source code in audex/lib/websocket/connection.py
async def close(self) -> None:
    """Close the WebSocket connection and clean up resources.

    This method cancels the idle monitor task, closes the WebSocket
    connection, and resets all internal state flags.
    """
    async with self._lock:
        if self._closed:
            return

        self._closed = True

        # Cancel monitor task
        if self._monitor_task is not None:
            if not self._monitor_task.done():
                self._monitor_task.cancel()

            try:
                await self._monitor_task
            except aio.CancelledError:
                self.logger.debug(f"Monitor task for {self.uri} cancelled")
            finally:
                self._monitor_task = None

        # Close websocket
        if self.websocket is not None:
            try:
                await self.websocket.close()
            finally:
                self.websocket = None

        self._is_busy = False
        self._is_draining = False
        self.logger.debug(f"Closed connection to {self.uri}")
acquire async
acquire() -> None

Acquire the connection for exclusive use.

This method marks the connection as busy and ensures it is connected.

Raises:

Type Description
ConnectionUnavailableError

If the connection has been closed or if connection establishment fails.

ConnectionBusyError

If the connection is already busy or draining.

Source code in audex/lib/websocket/connection.py
async def acquire(self) -> None:
    """Acquire the connection for exclusive use.

    This method marks the connection as busy and ensures it is connected.

    Raises:
        ConnectionUnavailableError: If the connection has been closed or
            if connection establishment fails.
        ConnectionBusyError: If the connection is already busy or draining.
    """
    async with self._lock:
        if self._closed:
            raise ConnectionUnavailableError("Connection has been closed")
        if self._is_busy:
            raise ConnectionBusyError("Connection is already busy")
        if self._is_draining:
            raise ConnectionBusyError("Connection is currently draining")

    try:
        await self.connect()
        async with self._lock:
            self._is_busy = True
            self._update_activity()
    except Exception as e:
        self.logger.error(f"Failed to acquire connection to {self.uri}: {e}")
        raise ConnectionUnavailableError(f"Failed to acquire connection: {e}") from e
release async
release() -> None

Release the connection back to the pool.

This method marks the connection as no longer busy and updates the activity timestamp.

Source code in audex/lib/websocket/connection.py
async def release(self) -> None:
    """Release the connection back to the pool.

    This method marks the connection as no longer busy and updates
    the activity timestamp.
    """
    async with self._lock:
        if not self._is_busy:
            return

        self._is_busy = False
        self._update_activity()
ping async
ping() -> None

Send a ping to the WebSocket server to check connection health.

This method performs a health check by sending a ping frame to the server. The connection must be open for this to succeed.

Raises:

Type Description
ConnectionUnavailableError

If the websocket is not connected.

ConnectionClosedError

If the connection closes during the ping.

Source code in audex/lib/websocket/connection.py
async def ping(self) -> None:
    """Send a ping to the WebSocket server to check connection
    health.

    This method performs a health check by sending a ping frame to the
    server. The connection must be open for this to succeed.

    Raises:
        ConnectionUnavailableError: If the websocket is not connected.
        ConnectionClosedError: If the connection closes during the ping.
    """
    # Check connection state outside lock (quick check)
    if not (
        self.websocket is not None
        and not self._closed
        and self.websocket.state == protocol.OPEN
    ):
        raise ConnectionUnavailableError("Websocket is not connected")

    # Get websocket reference under lock
    async with self._lock:
        if not self.is_connected:
            raise ConnectionUnavailableError("Websocket is not connected")
        websocket = self.websocket

    # Perform ping outside lock to avoid blocking other operations
    connection_closed = False
    connection_closed_error = None
    try:
        await websocket.ping()
        async with self._lock:
            self._update_activity()
    except ConnectionClosed as e:
        self.logger.error(f"Connection closed during ping: {e}")
        connection_closed = True
        connection_closed_error = e

    if connection_closed:
        await self.close()
        raise ConnectionClosedError(
            f"Connection closed during ping: {connection_closed_error}"
        ) from connection_closed_error
session async
session() -> AsyncGenerator[WebsocketConnection, None]

Create a context manager session for the connection.

The connection is automatically acquired on entry and released on exit.

Yields:

Type Description
AsyncGenerator[WebsocketConnection, None]

The WebsocketConnection instance.

Example

async with connection.session(): await connection.send("Hello") response = await connection.recv()

Source code in audex/lib/websocket/connection.py
@contextlib.asynccontextmanager
async def session(self) -> t.AsyncGenerator[WebsocketConnection, None]:
    """Create a context manager session for the connection.

    The connection is automatically acquired on entry and released on exit.

    Yields:
        The WebsocketConnection instance.

    Example:
        async with connection.session():
            await connection.send("Hello")
            response = await connection.recv()
    """
    await self.acquire()
    try:
        yield self
    finally:
        await self.release()
send async
send(message: str | bytes) -> None

Send a message through the WebSocket connection.

Parameters:

Name Type Description Default
message str | bytes

The message to send (string or bytes).

required

Raises:

Type Description
ConnectionBusyError

If the connection has not been acquired.

ConnectionUnavailableError

If the websocket is not connected.

ConnectionClosedError

If the connection closes during sending.

Source code in audex/lib/websocket/connection.py
async def send(self, message: str | bytes) -> None:
    """Send a message through the WebSocket connection.

    Args:
        message: The message to send (string or bytes).

    Raises:
        ConnectionBusyError: If the connection has not been acquired.
        ConnectionUnavailableError: If the websocket is not connected.
        ConnectionClosedError: If the connection closes during sending.
    """
    if not self.is_busy:
        raise ConnectionBusyError("Connection must be acquired before sending messages")

    # Check connection state outside lock
    if not (
        self.websocket is not None
        and not self._closed
        and self.websocket.state == protocol.OPEN
    ):
        raise ConnectionUnavailableError("Websocket is not connected")

    # Get websocket reference under lock
    async with self._lock:
        websocket = self.websocket
        if websocket is None:
            raise ConnectionUnavailableError("Websocket is not connected")

    # Perform send outside lock
    try:
        await websocket.send(message)
        async with self._lock:
            self._update_activity()
    except ConnectionClosed as e:
        await self.close()
        self.logger.error(f"Connection closed while sending message: {e}")
        raise ConnectionClosedError(f"Connection closed while sending message: {e}") from e
recv async
recv() -> str | bytes

Receive a message from the WebSocket connection.

Returns:

Type Description
str | bytes

The received message (string or bytes).

Raises:

Type Description
ConnectionBusyError

If the connection has not been acquired.

ConnectionUnavailableError

If the websocket is not connected.

ConnectionClosedError

If the connection closes during receiving.

Source code in audex/lib/websocket/connection.py
async def recv(self) -> str | bytes:
    """Receive a message from the WebSocket connection.

    Returns:
        The received message (string or bytes).

    Raises:
        ConnectionBusyError: If the connection has not been acquired.
        ConnectionUnavailableError: If the websocket is not connected.
        ConnectionClosedError: If the connection closes during receiving.
    """
    if not self._is_busy:
        raise ConnectionBusyError("Connection must be acquired before receiving messages")

    # Check connection state outside lock
    if not (
        self.websocket is not None
        and not self._closed
        and self.websocket.state == protocol.OPEN
    ):
        raise ConnectionUnavailableError("Websocket is not connected")

    # Get websocket reference under lock
    async with self._lock:
        websocket = self.websocket
        if websocket is None:
            raise ConnectionUnavailableError("Websocket is not connected")

    # Perform recv outside lock
    try:
        message = await websocket.recv()
        async with self._lock:
            self._update_activity()
        return message
    except ConnectionClosed as e:
        await self.close()
        self.logger.error(f"Connection closed while receiving message: {e}")
        raise ConnectionClosedError(f"Connection closed while receiving message: {e}") from e

options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true