Skip to content

Audio Recorder

recorder

Classes

AudioFormat

Bases: str, Enum

Supported audio output formats.

AudioConfig

Bases: NamedTuple

Audio recording configuration.

Attributes:

Name Type Description
format int

Audio format (pyaudio constant).

channels int

Number of audio channels (1=mono, 2=stereo).

rate int

Sample rate in Hz (e.g., 16000, 44100, 48000).

chunk int

Number of frames per buffer.

input_device_index int | None

Index of input device, None for default.

AudioFrame

AudioFrame(timestamp: datetime, data: bytes)

Single audio frame with timestamp.

Uses slots to minimize memory footprint.

Attributes:

Name Type Description
timestamp

When this frame was captured.

data

Raw audio bytes.

Source code in audex/lib/recorder.py
def __init__(self, timestamp: datetime.datetime, data: bytes) -> None:
    self.timestamp = timestamp
    self.data = data

AudioSegment

Bases: NamedTuple

Represents a recorded audio segment.

Attributes:

Name Type Description
key str

Storage key where the audio is saved.

duration_ms int

Duration of the segment in milliseconds.

started_at datetime

Timestamp when recording started.

ended_at datetime

Timestamp when recording ended.

frames bytes

Raw audio frames (bytes).

AudioRecorder

AudioRecorder(store: Store, config: AudioConfig | None = None)

Bases: LoggingMixin, AsyncContextMixin

High-performance audio recorder using PyAudio with real-time streaming.

This recorder captures audio from a microphone and can start/stop recording multiple times, creating separate audio segments for each recording session. Audio data is automatically uploaded to the configured Store.

Features: - Real-time audio streaming with async generators - Multiple output format support (PCM, WAV, MP3, OPUS) - Efficient numpy-based audio processing - Non-blocking streaming while recording - Time-based segment extraction - Dynamic dtype handling based on AudioConfig

Attributes:

Name Type Description
store

Storage backend for uploading audio files.

config

Audio recording configuration.

Example
recorder = AudioRecorder(
    store=local_store,
    config=AudioConfig(
        format=pyaudio.paInt16,
        channels=1,
        rate=16000,
        chunk=1024,
    ),
)

await recorder.init()
await recorder.start("session-123", "segment")

# Stream audio chunks in real-time
async for chunk in recorder.stream(
    chunk_size=16000,  # 1 second chunks
    format=AudioFormat.MP3,
):
    await send_to_api(chunk)

segment = await recorder.stop()
await recorder.close()
Source code in audex/lib/recorder.py
def __init__(self, store: Store, config: AudioConfig | None = None):
    super().__init__()
    self.store = store
    self.config = config or AudioConfig()

    # Determine numpy dtype and sample width from config
    if self.config.format not in self._FORMAT_MAP:
        raise ValueError(f"Unsupported audio format: {self.config.format}")

    self._numpy_dtype, self._sample_width = self._FORMAT_MAP[self.config.format]

    self._audio: pyaudio.PyAudio | None = None
    self._stream: pyaudio.Stream | None = None

    # Use numpy array for efficient operations
    self._frames_data: list[npt.NDArray[t.Any]] = []  # Store as numpy arrays
    self._frames_timestamps: list[datetime.datetime] = []  # Separate timestamps

    self._is_recording = False
    self._current_key: str | None = None
    self._started_at: datetime.datetime | None = None

    # Streaming state
    self._stream_position: int = 0  # Track streaming position in samples
    self._stream_lock = asyncio.Lock()

    self.logger.debug(
        f"Initialized with dtype={self._numpy_dtype}, sample_width={self._sample_width}"
    )
Attributes
is_recording property
is_recording: bool

Check if recording is currently active.

current_segment_key property
current_segment_key: str | None

Get the key of the current recording segment.

Functions
init async
init() -> None

Initialize the audio system.

Creates the PyAudio instance and validates the audio configuration.

Raises:

Type Description
Exception

If audio initialization fails.

Source code in audex/lib/recorder.py
async def init(self) -> None:
    """Initialize the audio system.

    Creates the PyAudio instance and validates the audio configuration.

    Raises:
        Exception: If audio initialization fails.
    """
    self._audio = pyaudio.PyAudio()
    self.logger.info("Audio system initialized")

    # Log available devices
    device_count = self._audio.get_device_count()
    self.logger.debug(f"Found {device_count} audio devices")

    for i in range(device_count):
        device_info = self._audio.get_device_info_by_index(i)
        if device_info["maxInputChannels"] > 0:
            self.logger.debug(
                f"Input device {i}: {device_info['name']} "
                f"(channels: {device_info['maxInputChannels']}, "
                f"rate: {device_info['defaultSampleRate']})"
            )
close async
close() -> None

Close the audio system and release resources.

Stops any active recording and cleans up PyAudio resources.

Source code in audex/lib/recorder.py
async def close(self) -> None:
    """Close the audio system and release resources.

    Stops any active recording and cleans up PyAudio resources.
    """
    if self._is_recording:
        await self.stop()

    if self._stream is not None:
        self._stream.stop_stream()
        self._stream.close()
        self._stream = None

    if self._audio is not None:
        self._audio.terminate()
        self._audio = None

    self.logger.info("Audio system closed")
start async
start(*prefixes: str) -> str

Start a new recording segment.

Parameters:

Name Type Description Default
*prefixes str

Prefix parts for the storage key.

()

Returns:

Type Description
str

The full storage key for this segment.

Raises:

Type Description
RuntimeError

If already recording or audio system not initialized.

Source code in audex/lib/recorder.py
async def start(self, *prefixes: str) -> str:
    """Start a new recording segment.

    Args:
        *prefixes: Prefix parts for the storage key.

    Returns:
        The full storage key for this segment.

    Raises:
        RuntimeError: If already recording or audio system not initialized.
    """
    if self._is_recording:
        raise RuntimeError("Already recording")

    if self._audio is None:
        raise RuntimeError("Audio system not initialized. Call init() first.")

    # Generate unique key
    segment_id = utils.gen_id(prefix="")
    self._current_key = self.store.key_builder.build(*prefixes, f"{segment_id}.wav")
    self._frames_data.clear()
    self._frames_timestamps.clear()
    self._stream_position = 0
    self._started_at = utils.utcnow()

    # Open audio stream
    self._stream = self._audio.open(
        format=self.config.format,
        channels=self.config.channels,
        rate=self.config.rate,
        input=True,
        frames_per_buffer=self.config.chunk,
        input_device_index=self.config.input_device_index,
        stream_callback=self._audio_callback,
    )

    self._is_recording = True
    self._stream.start_stream()

    self.logger.info(f"Started recording to {self._current_key}")
    return self._current_key
stream async
stream(chunk_size: int | None = None, format: AudioFormat = PCM, channels: int | None = None, rate: int | None = None) -> AsyncGenerator[bytes, None]

Stream audio chunks in real-time while recording.

This does NOT affect the recording buffer. You can stream and record simultaneously.

Parameters:

Name Type Description Default
chunk_size int | None

Number of samples per chunk. None = config.chunk.

None
format AudioFormat

Output audio format.

PCM
channels int | None

Target channels. None = config.channels.

None
rate int | None

Target sample rate. None = config.rate.

None

Yields:

Type Description
AsyncGenerator[bytes, None]

Audio chunks in specified format.

Example
# Stream 1-second MP3 chunks
async for chunk in recorder.stream(
    chunk_size=16000, format=AudioFormat.MP3
):
    await send_to_server(chunk)
Source code in audex/lib/recorder.py
async def stream(
    self,
    chunk_size: int | None = None,
    format: AudioFormat = AudioFormat.PCM,
    channels: int | None = None,
    rate: int | None = None,
) -> t.AsyncGenerator[bytes, None]:
    """Stream audio chunks in real-time while recording.

    This does NOT affect the recording buffer. You can stream and
    record simultaneously.

    Args:
        chunk_size: Number of samples per chunk. None = config.chunk.
        format: Output audio format.
        channels: Target channels. None = config.channels.
        rate: Target sample rate. None = config.rate.

    Yields:
        Audio chunks in specified format.

    Example:
        ```python
        # Stream 1-second MP3 chunks
        async for chunk in recorder.stream(
            chunk_size=16000, format=AudioFormat.MP3
        ):
            await send_to_server(chunk)
        ```
    """
    if not self._is_recording:
        self.logger.warning("Cannot stream: not recording")
        return

    chunk_size = chunk_size or self.config.chunk
    target_channels = channels or self.config.channels
    target_rate = rate or self.config.rate

    self.logger.info(
        f"Started streaming: chunk_size={chunk_size}, format={format.value}, "
        f"rate={target_rate}, channels={target_channels}"
    )

    while self._is_recording:
        async with self._stream_lock:
            # Check if we have enough new frames
            total_samples = sum(len(frame) for frame in self._frames_data)
            streamed_samples = self._stream_position

            available_samples = total_samples - streamed_samples

            if available_samples < chunk_size:
                # Not enough data yet
                await asyncio.sleep(0.01)  # 10ms
                continue

            # Calculate which frames to extract
            samples_needed = chunk_size
            start_sample = streamed_samples
            end_sample = start_sample + samples_needed

            # Efficiently concatenate numpy arrays
            all_audio = np.concatenate(self._frames_data)
            chunk_audio = all_audio[start_sample:end_sample]

            # Update stream position
            self._stream_position = end_sample

        # Process audio (outside lock for performance)
        if target_rate != self.config.rate or target_channels != self.config.channels:
            chunk_audio = self._resample_audio_numpy(
                chunk_audio,
                src_rate=self.config.rate,
                dst_rate=target_rate,
                src_channels=self.config.channels,
                dst_channels=target_channels,
            )

        # Encode to target format
        encoded_chunk = self._encode_audio(
            chunk_audio,
            sample_rate=target_rate,
            channels=target_channels,
            output_format=format,
        )

        yield encoded_chunk
segment async
segment(started_at: datetime, ended_at: datetime, *, channels: int | None = None, rate: int | None = None, format: AudioFormat = PCM) -> bytes

Extract audio segment between two timestamps.

Parameters:

Name Type Description Default
started_at datetime

Start timestamp.

required
ended_at datetime

End timestamp.

required
channels int | None

Target channels. None = config.channels.

None
rate int | None

Target sample rate. None = config.rate.

None
format AudioFormat

Output format (PCM, WAV, MP3, OPUS).

PCM

Returns:

Type Description
bytes

Audio segment in specified format.

Raises:

Type Description
RuntimeError

If audio system not initialized.

ValueError

If invalid time range or no frames.

Source code in audex/lib/recorder.py
async def segment(
    self,
    started_at: datetime.datetime,
    ended_at: datetime.datetime,
    *,
    channels: int | None = None,
    rate: int | None = None,
    format: AudioFormat = AudioFormat.PCM,
) -> bytes:
    """Extract audio segment between two timestamps.

    Args:
        started_at: Start timestamp.
        ended_at: End timestamp.
        channels: Target channels. None = config.channels.
        rate: Target sample rate. None = config.rate.
        format: Output format (PCM, WAV, MP3, OPUS).

    Returns:
        Audio segment in specified format.

    Raises:
        RuntimeError: If audio system not initialized.
        ValueError: If invalid time range or no frames.
    """
    if self._audio is None:
        raise RuntimeError("Audio system not initialized")

    if ended_at < started_at:
        raise ValueError(
            f"End time ({ended_at.isoformat()}) must be after "
            f"start time ({started_at.isoformat()})"
        )

    if not self._frames_data:
        raise ValueError("No audio frames available")

    target_channels = channels or self.config.channels
    target_rate = rate or self.config.rate

    # Find frame indices
    start_idx = self._find_frame_index(started_at)
    end_idx = self._find_frame_index(ended_at)

    if start_idx == end_idx:
        end_idx = min(start_idx + 1, len(self._frames_data) - 1)

    self.logger.debug(
        f"Extracting frames {start_idx} to {end_idx} (total: {end_idx - start_idx + 1} frames)"
    )

    # Efficiently concatenate numpy arrays
    selected_frames = self._frames_data[start_idx : end_idx + 1]
    combined_audio = np.concatenate(selected_frames)

    # Resample if needed
    if target_rate != self.config.rate or target_channels != self.config.channels:
        combined_audio = self._resample_audio_numpy(
            combined_audio,
            src_rate=self.config.rate,
            dst_rate=target_rate,
            src_channels=self.config.channels,
            dst_channels=target_channels,
        )
        self.logger.debug(
            f"Resampled: {self.config.rate}Hz {self.config.channels}ch -> "
            f"{target_rate}Hz {target_channels}ch"
        )

    # Encode to target format
    encoded_data = self._encode_audio(
        combined_audio,
        sample_rate=target_rate,
        channels=target_channels,
        output_format=format,
    )

    self.logger.debug(
        f"Created {format.value.upper()} segment: "
        f"{len(encoded_data)} bytes, {target_rate}Hz {target_channels}ch"
    )

    return encoded_data
stop async
stop() -> AudioSegment

Stop recording and save to storage.

Returns:

Type Description
AudioSegment

AudioSegment containing recording information.

Raises:

Type Description
RuntimeError

If not currently recording.

Source code in audex/lib/recorder.py
async def stop(self) -> AudioSegment:
    """Stop recording and save to storage.

    Returns:
        AudioSegment containing recording information.

    Raises:
        RuntimeError: If not currently recording.
    """
    if not self._is_recording:
        raise RuntimeError("Not currently recording")

    self._is_recording = False

    # Stop stream
    if self._stream is not None:
        self._stream.stop_stream()
        self._stream.close()
        self._stream = None

    ended_at = utils.utcnow()

    # Combine all frames efficiently with numpy
    all_audio = np.concatenate(self._frames_data)

    # Convert to bytes based on format
    if self.config.format == pyaudio.paInt24:
        frames = self._pack_24bit(all_audio)
    else:
        frames = all_audio.tobytes()

    frame_count = len(self._frames_data)

    # Calculate duration
    if self._started_at is None:
        self._started_at = ended_at

    duration_ms = int((ended_at - self._started_at).total_seconds() * 1000)

    # Create WAV file using pydub
    pydub_audio = self._to_pydub_segment(
        all_audio,
        sample_rate=self.config.rate,
        channels=self.config.channels,
    )
    wav_buffer = io.BytesIO()
    pydub_audio.export(wav_buffer, format="wav")
    wav_data = wav_buffer.getvalue()

    # Upload to store
    key = self._current_key
    if key is None:
        raise RuntimeError("No current segment key")

    await self.store.upload(
        data=wav_data,
        key=key,
        metadata={
            "content_type": "audio/wav",
            "duration_ms": duration_ms,
            "sample_rate": self.config.rate,
            "channels": self.config.channels,
            "started_at": self._started_at.isoformat(),
            "ended_at": ended_at.isoformat(),
            "frame_count": frame_count,
        },
    )

    self.logger.info(
        f"Stopped recording. Duration: {duration_ms}ms, "
        f"Frames: {frame_count}, Size: {len(wav_data)} bytes"
    )

    segment = AudioSegment(
        key=key,
        duration_ms=duration_ms,
        started_at=self._started_at,
        ended_at=ended_at,
        frames=frames,
    )

    # Reset state but keep frames for potential extraction
    self._current_key = None
    self._started_at = None

    return segment
clear_frames
clear_frames() -> None

Clear all recorded frames from memory.

Source code in audex/lib/recorder.py
def clear_frames(self) -> None:
    """Clear all recorded frames from memory."""
    self._frames_data.clear()
    self._frames_timestamps.clear()
    self._stream_position = 0
    self.logger.debug("Cleared all recorded frames from memory")
list_input_devices
list_input_devices() -> list[dict[str, Any]]

List available audio input devices.

Source code in audex/lib/recorder.py
def list_input_devices(self) -> list[dict[str, t.Any]]:
    """List available audio input devices."""
    if self._audio is None:
        raise RuntimeError("Audio system not initialized")

    devices: list[dict[str, t.Any]] = []
    for i in range(self._audio.get_device_count()):
        info = self._audio.get_device_info_by_index(i)
        if info["maxInputChannels"] > 0:
            devices.append({
                "index": i,
                "name": info["name"],
                "channels": info["maxInputChannels"],
                "default_rate": info["defaultSampleRate"],
            })

    return devices

options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true