Bases: SQLiteRepository[Segment]
SQLite implementation of Segment repository.
Provides CRUD operations for Segment entities with additional specialized query methods for segment management by session.
Source code in audex/lib/repos/segment.py
| def __init__(self, sqlite: SQLite) -> None:
super().__init__(sqlite)
|
Functions
create async
create(data: Segment) -> str
Create a new segment in the database.
Parameters:
| Name | Type | Description | Default |
data | Segment | The segment entity to create. | required |
Returns:
| Type | Description |
str | The ID of the created segment. |
Source code in audex/lib/repos/segment.py
| async def create(self, data: Segment, /) -> str:
"""Create a new segment in the database.
Args:
data: The segment entity to create.
Returns:
The ID of the created segment.
"""
async with self.sqlite.session() as session:
segment_table = SegmentTable.from_entity(data)
session.add(segment_table)
await session.commit()
await session.refresh(segment_table)
return segment_table.id
|
read async
read(id: str) -> Segment | None
Read a segment by ID.
Parameters:
| Name | Type | Description | Default |
id | str | The ID (id) of the segment to retrieve. | required |
Returns:
| Type | Description |
Segment | None | The segment entity if found, None otherwise. |
Source code in audex/lib/repos/segment.py
| async def read(self, id: str, /) -> Segment | None:
"""Read a segment by ID.
Args:
id: The ID (id) of the segment to retrieve.
Returns:
The segment entity if found, None otherwise.
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(SegmentTable).where(SegmentTable.id == id)
result = await session.execute(stmt)
segment_obj = result.scalar_one_or_none()
if segment_obj is None:
return None
return segment_obj.to_entity()
|
first async
first(filter: Filter) -> Segment | None
Retrieve the first segment matching the filter.
Parameters:
| Name | Type | Description | Default |
filter | Filter | Filter to apply when searching for the segment. | required |
Returns:
| Type | Description |
Segment | None | The first segment entity matching the filter, or None if no match. |
Source code in audex/lib/repos/segment.py
| async def first(self, filter: Filter) -> Segment | None:
"""Retrieve the first segment matching the filter.
Args:
filter: Filter to apply when searching for the segment.
Returns:
The first segment entity matching the filter, or None if no match.
"""
spec = self.build_query_spec(filter)
async with self.sqlite.session() as session:
stmt = sqlm.select(SegmentTable)
for clause in spec.where:
stmt = stmt.where(clause)
for order in spec.order_by:
stmt = stmt.order_by(order)
stmt = stmt.limit(1)
result = await session.execute(stmt)
segment_obj = result.scalar_one_or_none()
if segment_obj is None:
return None
return segment_obj.to_entity()
|
list async
list(arg: list[str] | Optional[Filter] = None, *, page_index: int = 0, page_size: int = 100) -> list[Segment]
List segments by IDs or with optional filtering and pagination.
Parameters:
| Name | Type | Description | Default |
arg | list[str] | Optional[Filter] | Either a list of IDs to retrieve, or an optional filter. | None |
page_index | int | Zero-based page index for pagination. | 0 |
page_size | int | Number of items per page. | 100 |
Returns:
| Type | Description |
list[Segment] | List of segment entities matching the criteria. |
Source code in audex/lib/repos/segment.py
| async def list(
self,
arg: builtins.list[str] | t.Optional[Filter] = None, # noqa
*,
page_index: int = 0,
page_size: int = 100,
) -> builtins.list[Segment]:
"""List segments by IDs or with optional filtering and
pagination.
Args:
arg: Either a list of IDs to retrieve, or an optional filter.
page_index: Zero-based page index for pagination.
page_size: Number of items per page.
Returns:
List of segment entities matching the criteria.
"""
async with self.sqlite.session() as session:
if isinstance(arg, list):
if not arg:
return []
stmt = sqlm.select(SegmentTable).where(sqlm.col(SegmentTable.id).in_(arg))
result = await session.execute(stmt)
segment_objs = result.scalars().all()
return [obj.to_entity() for obj in segment_objs]
spec = self.build_query_spec(arg)
stmt = sqlm.select(SegmentTable)
for clause in spec.where:
stmt = stmt.where(clause)
for order in spec.order_by:
stmt = stmt.order_by(order)
stmt = stmt.offset(page_index * page_size).limit(page_size)
result = await session.execute(stmt)
segment_objs = result.scalars().all()
return [obj.to_entity() for obj in segment_objs]
|
update async
update(data: Segment) -> str
Update an existing segment.
Parameters:
| Name | Type | Description | Default |
data | Segment | The segment entity with updated values. | required |
Returns:
| Type | Description |
str | The ID of the updated segment. |
Raises:
| Type | Description |
ValueError | If the segment with the given ID does not exist. |
Source code in audex/lib/repos/segment.py
| async def update(self, data: Segment, /) -> str:
"""Update an existing segment.
Args:
data: The segment entity with updated values.
Returns:
The ID of the updated segment.
Raises:
ValueError: If the segment with the given ID does not exist.
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(SegmentTable).where(SegmentTable.id == data.id)
result = await session.execute(stmt)
segment_obj = result.scalar_one_or_none()
if segment_obj is None:
raise ValueError(f"Segment with id {data.id} not found")
segment_obj.update(data)
session.add(segment_obj)
await session.commit()
await session.refresh(segment_obj)
return segment_obj.id
|
update_many async
update_many(datas: list[Segment]) -> list[str]
Update multiple segments in the database.
Parameters:
| Name | Type | Description | Default |
datas | list[Segment] | List of segment entities with updated values. | required |
Returns:
| Type | Description |
list[str] | List of IDs of the updated segments. |
Raises:
| Type | Description |
ValueError | If any segment with the given ID does not exist. |
Source code in audex/lib/repos/segment.py
| async def update_many(self, datas: builtins.list[Segment]) -> builtins.list[str]:
"""Update multiple segments in the database.
Args:
datas: List of segment entities with updated values.
Returns:
List of IDs of the updated segments.
Raises:
ValueError: If any segment with the given ID does not exist.
"""
if not datas:
return []
updated_ids: builtins.list[str] = []
async with self.sqlite.session() as session:
ids = [data.id for data in datas]
stmt = sqlm.select(SegmentTable).where(sqlm.col(SegmentTable.id).in_(ids))
result = await session.execute(stmt)
table_objs = {obj.id: obj for obj in result.scalars().all()}
missing_ids = set(ids) - set(table_objs.keys())
if missing_ids:
raise ValueError(f"Segments with IDs {missing_ids} not found")
for data in datas:
segment_obj = table_objs[data.id]
segment_obj.update(data)
session.add(segment_obj)
updated_ids.append(segment_obj.id)
await session.commit()
return updated_ids
|
delete async
Delete a segment by ID.
Parameters:
| Name | Type | Description | Default |
id | str | The ID (id) of the segment to delete. | required |
Returns:
| Type | Description |
bool | True if the segment was deleted, False if not found. |
Source code in audex/lib/repos/segment.py
| async def delete(self, id: str, /) -> bool:
"""Delete a segment by ID.
Args:
id: The ID (id) of the segment to delete.
Returns:
True if the segment was deleted, False if not found.
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(SegmentTable).where(SegmentTable.id == id)
result = await session.execute(stmt)
segment_obj = result.scalar_one_or_none()
if segment_obj is None:
return False
await session.delete(segment_obj)
await session.commit()
return True
|
delete_many async
delete_many(arg: list[str] | Optional[Filter] = None) -> list[str]
Delete multiple segments by IDs or matching a filter.
Parameters:
| Name | Type | Description | Default |
arg | list[str] | Optional[Filter] | Either a list of IDs to delete, or an optional filter. | None |
Returns:
| Type | Description |
list[str] | If deleting by IDs, returns list of deleted IDs. |
list[str] | If deleting by filter, returns count of deleted records. |
Source code in audex/lib/repos/segment.py
| async def delete_many(
self,
arg: builtins.list[str] | t.Optional[Filter] = None, # noqa
) -> builtins.list[str]:
"""Delete multiple segments by IDs or matching a filter.
Args:
arg: Either a list of IDs to delete, or an optional filter.
Returns:
If deleting by IDs, returns list of deleted IDs.
If deleting by filter, returns count of deleted records.
"""
async with self.sqlite.session() as session:
if isinstance(arg, list):
if not arg:
return []
stmt = sqlm.select(SegmentTable).where(sqlm.col(SegmentTable.id).in_(arg))
result = await session.execute(stmt)
segment_objs = result.scalars().all()
if not segment_objs:
return []
segment_ids = [obj.id for obj in segment_objs]
for obj in segment_objs:
await session.delete(obj)
await session.commit()
return segment_ids
spec = self.build_query_spec(arg)
stmt = sqlm.select(SegmentTable.id) # type: ignore
for clause in spec.where:
stmt = stmt.where(clause)
result = await session.execute(stmt)
segment_ids = [row[0] for row in result.all()]
if not segment_ids:
return []
delete_stmt = sa.delete(SegmentTable).where(sqlm.col(SegmentTable.id).in_(segment_ids))
await session.execute(delete_stmt)
await session.commit()
return segment_ids
|
count async
count(filter: Optional[Filter] = None) -> int
Count segments matching the filter.
Parameters:
| Name | Type | Description | Default |
filter | Optional[Filter] | Optional filter to apply. If None, counts all segments. | None |
Returns:
| Type | Description |
int | Number of segments matching the filter. |
Source code in audex/lib/repos/segment.py
| async def count(self, filter: t.Optional[Filter] = None) -> int: # noqa
"""Count segments matching the filter.
Args:
filter: Optional filter to apply. If None, counts all segments.
Returns:
Number of segments matching the filter.
"""
spec = self.build_query_spec(filter)
async with self.sqlite.session() as session:
stmt = sqlm.select(sa.func.count()).select_from(SegmentTable)
for clause in spec.where:
stmt = stmt.where(clause)
result = await session.execute(stmt)
return result.scalar_one()
|
sum_duration_by_session async
sum_duration_by_session(session_id: str) -> int
Sum the duration of all segments in a given session.
Parameters:
| Name | Type | Description | Default |
session_id | str | The ID of the session to sum durations for. | required |
Returns:
| Type | Description |
int | The total duration of segments in the session (in milliseconds). |
Source code in audex/lib/repos/segment.py
| async def sum_duration_by_session(self, session_id: str) -> int:
"""Sum the duration of all segments in a given session.
Args:
session_id: The ID of the session to sum durations for.
Returns:
The total duration of segments in the session (in milliseconds).
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(sa.func.sum(SegmentTable.duration_ms)).where(
SegmentTable.session_id == session_id
)
result = await session.execute(stmt)
total_duration = result.scalar_one()
return total_duration if total_duration is not None else 0
|