Bases: SQLiteRepository[Doctor]
SQLite implementation of Doctor repository.
Provides CRUD operations for Doctor entities with additional specialized query methods for authentication and voiceprint management.
Source code in audex/lib/repos/doctor.py
| def __init__(self, sqlite: SQLite) -> None:
super().__init__(sqlite)
|
Functions
create async
create(data: Doctor) -> str
Create a new doctor in the database.
Parameters:
| Name | Type | Description | Default |
data | Doctor | The doctor entity to create. | required |
Returns:
| Type | Description |
str | The ID of the created doctor. |
Source code in audex/lib/repos/doctor.py
| async def create(self, data: Doctor, /) -> str:
"""Create a new doctor in the database.
Args:
data: The doctor entity to create.
Returns:
The ID of the created doctor.
"""
async with self.sqlite.session() as session:
doctor_table = DoctorTable.from_entity(data)
session.add(doctor_table)
await session.commit()
await session.refresh(doctor_table)
return doctor_table.id
|
read async
read(id: str) -> Doctor | None
Read a doctor by ID.
Parameters:
| Name | Type | Description | Default |
id | str | The ID (id) of the doctor to retrieve. | required |
Returns:
| Type | Description |
Doctor | None | The doctor entity if found, None otherwise. |
Source code in audex/lib/repos/doctor.py
| async def read(self, id: str, /) -> Doctor | None:
"""Read a doctor by ID.
Args:
id: The ID (id) of the doctor to retrieve.
Returns:
The doctor entity if found, None otherwise.
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(DoctorTable).where(DoctorTable.id == id)
result = await session.execute(stmt)
doctor_obj = result.scalar_one_or_none()
if doctor_obj is None:
return None
return doctor_obj.to_entity()
|
first async
first(filter: Filter) -> Doctor | None
Retrieve the first doctor matching the filter.
Parameters:
| Name | Type | Description | Default |
filter | Filter | Filter to apply when searching for the doctor. | required |
Returns:
| Type | Description |
Doctor | None | The first doctor entity matching the filter, or None if no match. |
Source code in audex/lib/repos/doctor.py
| async def first(self, filter: Filter) -> Doctor | None:
"""Retrieve the first doctor matching the filter.
Args:
filter: Filter to apply when searching for the doctor.
Returns:
The first doctor entity matching the filter, or None if no match.
"""
spec = self.build_query_spec(filter)
async with self.sqlite.session() as session:
stmt = sqlm.select(DoctorTable)
for clause in spec.where:
stmt = stmt.where(clause)
for order in spec.order_by:
stmt = stmt.order_by(order)
stmt = stmt.limit(1)
result = await session.execute(stmt)
doctor_obj = result.scalar_one_or_none()
if doctor_obj is None:
return None
return doctor_obj.to_entity()
|
list async
list(arg: list[str] | Optional[Filter] = None, *, page_index: int = 0, page_size: int = 100) -> list[Doctor]
List doctors by IDs or with optional filtering and pagination.
Parameters:
| Name | Type | Description | Default |
arg | list[str] | Optional[Filter] | Either a list of IDs to retrieve, or an optional filter. | None |
page_index | int | Zero-based page index for pagination. | 0 |
page_size | int | Number of items per page. | 100 |
Returns:
| Type | Description |
list[Doctor] | List of doctor entities matching the criteria. |
Source code in audex/lib/repos/doctor.py
| async def list(
self,
arg: builtins.list[str] | t.Optional[Filter] = None, # noqa
*,
page_index: int = 0,
page_size: int = 100,
) -> builtins.list[Doctor]:
"""List doctors by IDs or with optional filtering and
pagination.
Args:
arg: Either a list of IDs to retrieve, or an optional filter.
page_index: Zero-based page index for pagination.
page_size: Number of items per page.
Returns:
List of doctor entities matching the criteria.
"""
async with self.sqlite.session() as session:
if isinstance(arg, list):
if not arg:
return []
stmt = sqlm.select(DoctorTable).where(sqlm.col(DoctorTable.id).in_(arg))
result = await session.execute(stmt)
doctor_objs = result.scalars().all()
return [obj.to_entity() for obj in doctor_objs]
spec = self.build_query_spec(arg)
stmt = sqlm.select(DoctorTable)
for clause in spec.where:
stmt = stmt.where(clause)
for order in spec.order_by:
stmt = stmt.order_by(order)
stmt = stmt.offset(page_index * page_size).limit(page_size)
result = await session.execute(stmt)
doctor_objs = result.scalars().all()
return [obj.to_entity() for obj in doctor_objs]
|
update async
update(data: Doctor) -> str
Update an existing doctor.
Parameters:
| Name | Type | Description | Default |
data | Doctor | The doctor entity with updated values. | required |
Returns:
| Type | Description |
str | The ID of the updated doctor. |
Raises:
| Type | Description |
ValueError | If the doctor with the given ID does not exist. |
Source code in audex/lib/repos/doctor.py
| async def update(self, data: Doctor, /) -> str:
"""Update an existing doctor.
Args:
data: The doctor entity with updated values.
Returns:
The ID of the updated doctor.
Raises:
ValueError: If the doctor with the given ID does not exist.
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(DoctorTable).where(DoctorTable.id == data.id)
result = await session.execute(stmt)
doctor_obj = result.scalar_one_or_none()
if doctor_obj is None:
raise ValueError(f"Doctor with id {data.id} not found")
doctor_obj.update(data)
session.add(doctor_obj)
await session.commit()
await session.refresh(doctor_obj)
return doctor_obj.id
|
update_many async
update_many(datas: list[Doctor]) -> list[str]
Update multiple doctors in the database.
Parameters:
| Name | Type | Description | Default |
datas | list[Doctor] | List of doctor entities with updated values. | required |
Returns:
| Type | Description |
list[str] | List of IDs of the updated doctors. |
Raises:
| Type | Description |
ValueError | If any doctor with the given ID does not exist. |
Source code in audex/lib/repos/doctor.py
| async def update_many(self, datas: builtins.list[Doctor]) -> builtins.list[str]:
"""Update multiple doctors in the database.
Args:
datas: List of doctor entities with updated values.
Returns:
List of IDs of the updated doctors.
Raises:
ValueError: If any doctor with the given ID does not exist.
"""
if not datas:
return []
updated_ids: builtins.list[str] = []
async with self.sqlite.session() as session:
ids = [data.id for data in datas]
stmt = sqlm.select(DoctorTable).where(sqlm.col(DoctorTable.id).in_(ids))
result = await session.execute(stmt)
table_objs = {obj.id: obj for obj in result.scalars().all()}
missing_ids = set(ids) - set(table_objs.keys())
if missing_ids:
raise ValueError(f"Doctors with IDs {missing_ids} not found")
for data in datas:
doctor_obj = table_objs[data.id]
doctor_obj.update(data)
session.add(doctor_obj)
updated_ids.append(doctor_obj.id)
await session.commit()
return updated_ids
|
delete async
Delete a doctor by ID.
Parameters:
| Name | Type | Description | Default |
id | str | The ID (id) of the doctor to delete. | required |
Returns:
| Type | Description |
bool | True if the doctor was deleted, False if not found. |
Source code in audex/lib/repos/doctor.py
| async def delete(self, id: str, /) -> bool:
"""Delete a doctor by ID.
Args:
id: The ID (id) of the doctor to delete.
Returns:
True if the doctor was deleted, False if not found.
"""
async with self.sqlite.session() as session:
stmt = sqlm.select(DoctorTable).where(DoctorTable.id == id)
result = await session.execute(stmt)
doctor_obj = result.scalar_one_or_none()
if doctor_obj is None:
return False
await session.delete(doctor_obj)
await session.commit()
return True
|
delete_many async
delete_many(arg: list[str] | Optional[Filter] = None) -> list[str]
Delete multiple doctors by IDs or matching a filter.
Parameters:
| Name | Type | Description | Default |
arg | list[str] | Optional[Filter] | Either a list of IDs to delete, or an optional filter. | None |
Returns:
| Type | Description |
list[str] | If deleting by IDs, returns list of deleted IDs. |
list[str] | If deleting by filter, returns count of deleted records. |
Source code in audex/lib/repos/doctor.py
| async def delete_many(
self,
arg: builtins.list[str] | t.Optional[Filter] = None, # noqa
) -> builtins.list[str]:
"""Delete multiple doctors by IDs or matching a filter.
Args:
arg: Either a list of IDs to delete, or an optional filter.
Returns:
If deleting by IDs, returns list of deleted IDs.
If deleting by filter, returns count of deleted records.
"""
async with self.sqlite.session() as session:
if isinstance(arg, list):
if not arg:
return []
stmt = sqlm.select(DoctorTable).where(sqlm.col(DoctorTable.id).in_(arg))
result = await session.execute(stmt)
doctor_objs = result.scalars().all()
if not doctor_objs:
return []
doctor_ids = [obj.id for obj in doctor_objs]
for obj in doctor_objs:
await session.delete(obj)
await session.commit()
return doctor_ids
spec = self.build_query_spec(arg)
stmt = sqlm.select(DoctorTable.id) # type: ignore
for clause in spec.where:
stmt = stmt.where(clause)
result = await session.execute(stmt)
doctor_ids = [row[0] for row in result.all()]
if not doctor_ids:
return []
delete_stmt = sa.delete(DoctorTable).where(sqlm.col(DoctorTable.id).in_(doctor_ids))
await session.execute(delete_stmt)
await session.commit()
return doctor_ids
|
count async
count(filter: Optional[Filter] = None) -> int
Count doctors matching the filter.
Parameters:
| Name | Type | Description | Default |
filter | Optional[Filter] | Optional filter to apply. If None, counts all doctors. | None |
Returns:
| Type | Description |
int | Number of doctors matching the filter. |
Source code in audex/lib/repos/doctor.py
| async def count(self, filter: t.Optional[Filter] = None) -> int: # noqa
"""Count doctors matching the filter.
Args:
filter: Optional filter to apply. If None, counts all doctors.
Returns:
Number of doctors matching the filter.
"""
spec = self.build_query_spec(filter)
async with self.sqlite.session() as session:
stmt = sqlm.select(sa.func.count()).select_from(DoctorTable)
for clause in spec.where:
stmt = stmt.where(clause)
result = await session.execute(stmt)
return result.scalar_one()
|