Skip to content

Segment Repository

segment

Classes

SegmentRepository

SegmentRepository(sqlite: SQLite)

Bases: SQLiteRepository[Segment]

SQLite implementation of Segment repository.

Provides CRUD operations for Segment entities with additional specialized query methods for segment management by session.

Source code in audex/lib/repos/segment.py
def __init__(self, sqlite: SQLite) -> None:
    super().__init__(sqlite)
Functions
create async
create(data: Segment) -> str

Create a new segment in the database.

Parameters:

Name Type Description Default
data Segment

The segment entity to create.

required

Returns:

Type Description
str

The ID of the created segment.

Source code in audex/lib/repos/segment.py
async def create(self, data: Segment, /) -> str:
    """Create a new segment in the database.

    Args:
        data: The segment entity to create.

    Returns:
        The ID of the created segment.
    """
    async with self.sqlite.session() as session:
        segment_table = SegmentTable.from_entity(data)
        session.add(segment_table)
        await session.commit()
        await session.refresh(segment_table)
        return segment_table.id
read async
read(id: str) -> Segment | None

Read a segment by ID.

Parameters:

Name Type Description Default
id str

The ID (id) of the segment to retrieve.

required

Returns:

Type Description
Segment | None

The segment entity if found, None otherwise.

Source code in audex/lib/repos/segment.py
async def read(self, id: str, /) -> Segment | None:
    """Read a segment by ID.

    Args:
        id: The ID (id) of the segment to retrieve.

    Returns:
        The segment entity if found, None otherwise.
    """
    async with self.sqlite.session() as session:
        stmt = sqlm.select(SegmentTable).where(SegmentTable.id == id)
        result = await session.execute(stmt)
        segment_obj = result.scalar_one_or_none()

        if segment_obj is None:
            return None

        return segment_obj.to_entity()
first async
first(filter: Filter) -> Segment | None

Retrieve the first segment matching the filter.

Parameters:

Name Type Description Default
filter Filter

Filter to apply when searching for the segment.

required

Returns:

Type Description
Segment | None

The first segment entity matching the filter, or None if no match.

Source code in audex/lib/repos/segment.py
async def first(self, filter: Filter) -> Segment | None:
    """Retrieve the first segment matching the filter.

    Args:
        filter: Filter to apply when searching for the segment.

    Returns:
        The first segment entity matching the filter, or None if no match.
    """
    spec = self.build_query_spec(filter)

    async with self.sqlite.session() as session:
        stmt = sqlm.select(SegmentTable)

        for clause in spec.where:
            stmt = stmt.where(clause)

        for order in spec.order_by:
            stmt = stmt.order_by(order)

        stmt = stmt.limit(1)

        result = await session.execute(stmt)
        segment_obj = result.scalar_one_or_none()

        if segment_obj is None:
            return None

        return segment_obj.to_entity()
list async
list(arg: list[str] | Optional[Filter] = None, *, page_index: int = 0, page_size: int = 100) -> list[Segment]

List segments by IDs or with optional filtering and pagination.

Parameters:

Name Type Description Default
arg list[str] | Optional[Filter]

Either a list of IDs to retrieve, or an optional filter.

None
page_index int

Zero-based page index for pagination.

0
page_size int

Number of items per page.

100

Returns:

Type Description
list[Segment]

List of segment entities matching the criteria.

Source code in audex/lib/repos/segment.py
async def list(
    self,
    arg: builtins.list[str] | t.Optional[Filter] = None,  # noqa
    *,
    page_index: int = 0,
    page_size: int = 100,
) -> builtins.list[Segment]:
    """List segments by IDs or with optional filtering and
    pagination.

    Args:
        arg: Either a list of IDs to retrieve, or an optional filter.
        page_index: Zero-based page index for pagination.
        page_size: Number of items per page.

    Returns:
        List of segment entities matching the criteria.
    """
    async with self.sqlite.session() as session:
        if isinstance(arg, list):
            if not arg:
                return []

            stmt = sqlm.select(SegmentTable).where(sqlm.col(SegmentTable.id).in_(arg))
            result = await session.execute(stmt)
            segment_objs = result.scalars().all()
            return [obj.to_entity() for obj in segment_objs]

        spec = self.build_query_spec(arg)
        stmt = sqlm.select(SegmentTable)

        for clause in spec.where:
            stmt = stmt.where(clause)

        for order in spec.order_by:
            stmt = stmt.order_by(order)

        stmt = stmt.offset(page_index * page_size).limit(page_size)

        result = await session.execute(stmt)
        segment_objs = result.scalars().all()
        return [obj.to_entity() for obj in segment_objs]
update async
update(data: Segment) -> str

Update an existing segment.

Parameters:

Name Type Description Default
data Segment

The segment entity with updated values.

required

Returns:

Type Description
str

The ID of the updated segment.

Raises:

Type Description
ValueError

If the segment with the given ID does not exist.

Source code in audex/lib/repos/segment.py
async def update(self, data: Segment, /) -> str:
    """Update an existing segment.

    Args:
        data: The segment entity with updated values.

    Returns:
        The ID of the updated segment.

    Raises:
        ValueError: If the segment with the given ID does not exist.
    """
    async with self.sqlite.session() as session:
        stmt = sqlm.select(SegmentTable).where(SegmentTable.id == data.id)
        result = await session.execute(stmt)
        segment_obj = result.scalar_one_or_none()

        if segment_obj is None:
            raise ValueError(f"Segment with id {data.id} not found")

        segment_obj.update(data)
        session.add(segment_obj)
        await session.commit()
        await session.refresh(segment_obj)
        return segment_obj.id
update_many async
update_many(datas: list[Segment]) -> list[str]

Update multiple segments in the database.

Parameters:

Name Type Description Default
datas list[Segment]

List of segment entities with updated values.

required

Returns:

Type Description
list[str]

List of IDs of the updated segments.

Raises:

Type Description
ValueError

If any segment with the given ID does not exist.

Source code in audex/lib/repos/segment.py
async def update_many(self, datas: builtins.list[Segment]) -> builtins.list[str]:
    """Update multiple segments in the database.

    Args:
        datas: List of segment entities with updated values.

    Returns:
        List of IDs of the updated segments.

    Raises:
        ValueError: If any segment with the given ID does not exist.
    """
    if not datas:
        return []

    updated_ids: builtins.list[str] = []
    async with self.sqlite.session() as session:
        ids = [data.id for data in datas]
        stmt = sqlm.select(SegmentTable).where(sqlm.col(SegmentTable.id).in_(ids))
        result = await session.execute(stmt)
        table_objs = {obj.id: obj for obj in result.scalars().all()}

        missing_ids = set(ids) - set(table_objs.keys())
        if missing_ids:
            raise ValueError(f"Segments with IDs {missing_ids} not found")

        for data in datas:
            segment_obj = table_objs[data.id]
            segment_obj.update(data)
            session.add(segment_obj)
            updated_ids.append(segment_obj.id)

        await session.commit()
        return updated_ids
delete async
delete(id: str) -> bool

Delete a segment by ID.

Parameters:

Name Type Description Default
id str

The ID (id) of the segment to delete.

required

Returns:

Type Description
bool

True if the segment was deleted, False if not found.

Source code in audex/lib/repos/segment.py
async def delete(self, id: str, /) -> bool:
    """Delete a segment by ID.

    Args:
        id: The ID (id) of the segment to delete.

    Returns:
        True if the segment was deleted, False if not found.
    """
    async with self.sqlite.session() as session:
        stmt = sqlm.select(SegmentTable).where(SegmentTable.id == id)
        result = await session.execute(stmt)
        segment_obj = result.scalar_one_or_none()

        if segment_obj is None:
            return False

        await session.delete(segment_obj)
        await session.commit()
        return True
delete_many async
delete_many(arg: list[str] | Optional[Filter] = None) -> list[str]

Delete multiple segments by IDs or matching a filter.

Parameters:

Name Type Description Default
arg list[str] | Optional[Filter]

Either a list of IDs to delete, or an optional filter.

None

Returns:

Type Description
list[str]

If deleting by IDs, returns list of deleted IDs.

list[str]

If deleting by filter, returns count of deleted records.

Source code in audex/lib/repos/segment.py
async def delete_many(
    self,
    arg: builtins.list[str] | t.Optional[Filter] = None,  # noqa
) -> builtins.list[str]:
    """Delete multiple segments by IDs or matching a filter.

    Args:
        arg: Either a list of IDs to delete, or an optional filter.

    Returns:
        If deleting by IDs, returns list of deleted IDs.
        If deleting by filter, returns count of deleted records.
    """
    async with self.sqlite.session() as session:
        if isinstance(arg, list):
            if not arg:
                return []

            stmt = sqlm.select(SegmentTable).where(sqlm.col(SegmentTable.id).in_(arg))
            result = await session.execute(stmt)
            segment_objs = result.scalars().all()

            if not segment_objs:
                return []

            segment_ids = [obj.id for obj in segment_objs]
            for obj in segment_objs:
                await session.delete(obj)

            await session.commit()
            return segment_ids

        spec = self.build_query_spec(arg)
        stmt = sqlm.select(SegmentTable.id)  # type: ignore
        for clause in spec.where:
            stmt = stmt.where(clause)

        result = await session.execute(stmt)
        segment_ids = [row[0] for row in result.all()]

        if not segment_ids:
            return []

        delete_stmt = sa.delete(SegmentTable).where(sqlm.col(SegmentTable.id).in_(segment_ids))
        await session.execute(delete_stmt)
        await session.commit()
        return segment_ids
count async
count(filter: Optional[Filter] = None) -> int

Count segments matching the filter.

Parameters:

Name Type Description Default
filter Optional[Filter]

Optional filter to apply. If None, counts all segments.

None

Returns:

Type Description
int

Number of segments matching the filter.

Source code in audex/lib/repos/segment.py
async def count(self, filter: t.Optional[Filter] = None) -> int:  # noqa
    """Count segments matching the filter.

    Args:
        filter: Optional filter to apply. If None, counts all segments.

    Returns:
        Number of segments matching the filter.
    """
    spec = self.build_query_spec(filter)

    async with self.sqlite.session() as session:
        stmt = sqlm.select(sa.func.count()).select_from(SegmentTable)

        for clause in spec.where:
            stmt = stmt.where(clause)

        result = await session.execute(stmt)
        return result.scalar_one()
sum_duration_by_session async
sum_duration_by_session(session_id: str) -> int

Sum the duration of all segments in a given session.

Parameters:

Name Type Description Default
session_id str

The ID of the session to sum durations for.

required

Returns:

Type Description
int

The total duration of segments in the session (in milliseconds).

Source code in audex/lib/repos/segment.py
async def sum_duration_by_session(self, session_id: str) -> int:
    """Sum the duration of all segments in a given session.

    Args:
        session_id: The ID of the session to sum durations for.

    Returns:
        The total duration of segments in the session (in milliseconds).
    """
    async with self.sqlite.session() as session:
        stmt = sqlm.select(sa.func.sum(SegmentTable.duration_ms)).where(
            SegmentTable.session_id == session_id
        )
        result = await session.execute(stmt)
        total_duration = result.scalar_one()
        return total_duration if total_duration is not None else 0

options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true