Skip to content

Session Manager

session

Classes

SessionData

Bases: NamedTuple

Session data structure.

Attributes:

Name Type Description
doctor_id str

Doctor's unique identifier.

doctor_name str | None

Doctor's display name.

eid str

Doctor's eid.

created_at str

Session creation timestamp (ISO format).

expires_at str

Session expiration timestamp (ISO format).

Functions
to_dict
to_dict() -> dict[str, str | None]

Convert to dictionary for JSON serialization.

Source code in audex/lib/session.py
def to_dict(self) -> dict[str, str | None]:
    """Convert to dictionary for JSON serialization."""
    return {
        "doctor_id": self.doctor_id,
        "doctor_name": self.doctor_name,
        "eid": self.eid,
        "created_at": self.created_at,
        "expires_at": self.expires_at,
    }
from_dict classmethod
from_dict(data: dict[str, str]) -> Self

Create from dictionary.

Source code in audex/lib/session.py
@classmethod
def from_dict(cls, data: dict[str, str]) -> t.Self:
    """Create from dictionary."""
    return cls(
        doctor_id=data["doctor_id"],
        doctor_name=data["doctor_name"],
        eid=data["eid"],
        created_at=data["created_at"],
        expires_at=data["expires_at"],
    )

SessionManager

SessionManager(app_name: str = __title__, *, ttl: timedelta = timedelta(hours=8))

Bases: LoggingMixin, AsyncContextMixin

Secure local session manager with automatic state management.

This manager maintains session state automatically - no need to manually manage tokens. Session persists across application restarts until it expires. Uses system temp directory with encryption for security.

Security features: 1. Uses system temp directory (auto-cleaned on reboot) 2. File permissions restricted to current user (0o600) 3. Session data encrypted with machine-specific key 4. Automatic expiration and cleanup 5. Process-specific session binding

Attributes:

Name Type Description
session_dir

Directory for session files (in system temp).

session_file

Path to the encrypted session file.

ttl

Default time-to-live for sessions.

Example
# Initialize manager
manager = SessionManager(
    app_name="audex",
    default_ttl=timedelta(hours=8),
)

await manager.init()

# Login
await manager.login(
    doctor_id="doctor-abc123",
    doctor_name="张医生",
    eid="dr_zhang",
)

# Check if logged in (no token needed!)
if await manager.is_logged_in():
    session = await manager.get_session()
    print(f"Logged in as: {session.doctor_name}")

# Works across app restarts (if not expired)
# ... restart app ...
if await manager.is_logged_in():
    print("Auto-logged in!")

# Logout
await manager.logout()

# Cleanup
await manager.close()
Source code in audex/lib/session.py
def __init__(
    self,
    app_name: str = __title__,
    *,
    ttl: datetime.timedelta = datetime.timedelta(hours=8),
) -> None:
    super().__init__()
    self.app_name = app_name
    self.ttl = ttl

    # Use system temp directory
    self.session_dir = pathlib.Path(tempfile.gettempdir()) / f".{app_name}_session"
    self.session_file = self.session_dir / "session.enc"

    # Machine-specific encryption key (derived from machine ID)
    self._encryption_key = self._generate_machine_key()

    self._cleanup_task: asyncio.Task[None] | None = None
    self._lock = asyncio.Lock()
Functions
init async
init() -> None

Initialize the session manager.

Creates session directory with restricted permissions and starts automatic cleanup task.

Source code in audex/lib/session.py
async def init(self) -> None:
    """Initialize the session manager.

    Creates session directory with restricted permissions and starts
    automatic cleanup task.
    """
    # Create session directory with restricted permissions
    self.session_dir.mkdir(mode=0o700, parents=True, exist_ok=True)

    # Start automatic cleanup task
    self._cleanup_task = asyncio.create_task(self._cleanup_loop())

    self.logger.info(
        "Session manager initialized",
        session_dir=str(self.session_dir),
        ttl_hours=self.ttl.total_seconds() / 3600,
    )
close async
close() -> None

Close the session manager.

Stops the cleanup task. Session file is preserved for next startup.

Source code in audex/lib/session.py
async def close(self) -> None:
    """Close the session manager.

    Stops the cleanup task. Session file is preserved for next
    startup.
    """
    if self._cleanup_task:
        self._cleanup_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._cleanup_task
        self._cleanup_task = None

    self.logger.info("Session manager closed")
login async
login(doctor_id: str, eid: str, *, doctor_name: str | None = None, ttl: timedelta | None = None) -> SessionData

Create a login session.

If a session already exists, it will be replaced with the new one. Session persists across application restarts until expiration.

Parameters:

Name Type Description Default
doctor_id str

Doctor's unique identifier.

required
doctor_name str | None

Doctor's display name.

None
eid str

Doctor's eid.

required
ttl timedelta | None

Time-to-live for the session. If None, uses default.

None

Returns:

Type Description
SessionData

Created SessionData.

Example
session = await manager.login(
    doctor_id="doctor-abc123",
    doctor_name="张医生",
    eid="dr_zhang",
)
print(f"Logged in as: {session.doctor_name}")
Source code in audex/lib/session.py
async def login(
    self,
    doctor_id: str,
    eid: str,
    *,
    doctor_name: str | None = None,
    ttl: datetime.timedelta | None = None,
) -> SessionData:
    """Create a login session.

    If a session already exists, it will be replaced with the new one.
    Session persists across application restarts until expiration.

    Args:
        doctor_id: Doctor's unique identifier.
        doctor_name: Doctor's display name.
        eid: Doctor's eid.
        ttl: Time-to-live for the session. If None, uses default.

    Returns:
        Created SessionData.

    Example:
        ```python
        session = await manager.login(
            doctor_id="doctor-abc123",
            doctor_name="张医生",
            eid="dr_zhang",
        )
        print(f"Logged in as: {session.doctor_name}")
        ```
    """
    async with self._lock:
        ttl = ttl or self.ttl
        now = utils.utcnow()
        expires_at = now + ttl

        session_data = SessionData(
            doctor_id=doctor_id,
            doctor_name=doctor_name,
            eid=eid,
            created_at=now.isoformat(),
            expires_at=expires_at.isoformat(),
        )

        # Encrypt and write session data
        await self._write_session(session_data)

        self.logger.bind(
            doctor_id=doctor_id,
            eid=eid,
            expires_at=expires_at.isoformat(),
        ).info(f"Session created for {eid}")

        return session_data
logout async
logout() -> bool

Logout by deleting the session.

Returns:

Type Description
bool

True if session was deleted, False if no session exists.

Example
logged_out = await manager.logout()
if logged_out:
    print("Successfully logged out")
Source code in audex/lib/session.py
async def logout(self) -> bool:
    """Logout by deleting the session.

    Returns:
        True if session was deleted, False if no session exists.

    Example:
        ```python
        logged_out = await manager.logout()
        if logged_out:
            print("Successfully logged out")
        ```
    """
    async with self._lock:
        if not self.session_file.exists():
            return False

        try:
            self.session_file.unlink()
            self.logger.info("Session deleted (logout)")
            return True
        except Exception as e:
            self.logger.error(f"Failed to delete session: {e}", exc_info=True)
            return False
is_logged_in async
is_logged_in() -> bool

Check if there's an active (non-expired) session.

Returns:

Type Description
bool

True if there's an active session, False otherwise.

Example
if await manager.is_logged_in():
    print("User is logged in")
else:
    print("Please login")
Source code in audex/lib/session.py
async def is_logged_in(self) -> bool:
    """Check if there's an active (non-expired) session.

    Returns:
        True if there's an active session, False otherwise.

    Example:
        ```python
        if await manager.is_logged_in():
            print("User is logged in")
        else:
            print("Please login")
        ```
    """
    session = await self.get_session()
    return session is not None
get_session async
get_session() -> SessionData | None

Get current session if exists and not expired.

Returns:

Type Description
SessionData | None

SessionData if session exists and is valid, None otherwise.

Example
session = await manager.get_session()
if session:
    print(f"Logged in as: {session.doctor_name}")
    print(f"Doctor ID: {session.doctor_id}")
else:
    print("No active session")
Source code in audex/lib/session.py
async def get_session(self) -> SessionData | None:
    """Get current session if exists and not expired.

    Returns:
        SessionData if session exists and is valid, None otherwise.

    Example:
        ```python
        session = await manager.get_session()
        if session:
            print(f"Logged in as: {session.doctor_name}")
            print(f"Doctor ID: {session.doctor_id}")
        else:
            print("No active session")
        ```
    """
    async with self._lock:
        if not self.session_file.exists():
            return None

        try:
            # Read and decrypt session data
            session_data = await self._read_session()
            if session_data is None:
                return None

            # Check if expired
            expires_at = datetime.datetime.fromisoformat(session_data.expires_at)
            if utils.utcnow() >= expires_at:
                self.logger.debug("Session expired, removing file")
                self.session_file.unlink()
                return None

            return session_data

        except Exception as e:
            self.logger.warning(f"Failed to read session: {e}", exc_info=True)
            # If data is corrupted, remove file
            if self.session_file.exists():
                self.session_file.unlink()
            return None
get_doctor_id async
get_doctor_id() -> str | None

Get current logged-in doctor's ID.

Returns:

Type Description
str | None

Doctor ID if logged in, None otherwise.

Example
doctor_id = await manager.get_doctor_id()
if doctor_id:
    print(f"Current doctor: {doctor_id}")
Source code in audex/lib/session.py
async def get_doctor_id(self) -> str | None:
    """Get current logged-in doctor's ID.

    Returns:
        Doctor ID if logged in, None otherwise.

    Example:
        ```python
        doctor_id = await manager.get_doctor_id()
        if doctor_id:
            print(f"Current doctor: {doctor_id}")
        ```
    """
    session = await self.get_session()
    return session.doctor_id if session else None
extend_session async
extend_session(extra_ttl: timedelta) -> bool

Extend current session expiration time.

Parameters:

Name Type Description Default
extra_ttl timedelta

Additional time to add to expiration.

required

Returns:

Type Description
bool

True if session was extended, False if no session exists.

Example
# Extend by 2 hours
extended = await manager.extend_session(timedelta(hours=2))
if extended:
    print("Session extended")
Source code in audex/lib/session.py
async def extend_session(self, extra_ttl: datetime.timedelta) -> bool:
    """Extend current session expiration time.

    Args:
        extra_ttl: Additional time to add to expiration.

    Returns:
        True if session was extended, False if no session exists.

    Example:
        ```python
        # Extend by 2 hours
        extended = await manager.extend_session(timedelta(hours=2))
        if extended:
            print("Session extended")
        ```
    """
    async with self._lock:
        session = await self.get_session()
        if session is None:
            return False

        # Calculate new expiration
        current_expires = datetime.datetime.fromisoformat(session.expires_at)
        new_expires = current_expires + extra_ttl

        # Create updated session
        updated_session = SessionData(
            doctor_id=session.doctor_id,
            doctor_name=session.doctor_name,
            eid=session.eid,
            created_at=session.created_at,
            expires_at=new_expires.isoformat(),
        )

        await self._write_session(updated_session)
        self.logger.info(
            "Session extended",
            new_expires_at=new_expires.isoformat(),
        )

        return True

SessionError

SessionError(message: str | None = None)

Bases: AudexError

Base exception for session errors.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

SessionExpiredError

SessionExpiredError(message: str | None = None)

Bases: SessionError

Raised when session has expired.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

SessionNotFoundError

SessionNotFoundError(message: str | None = None)

Bases: SessionError

Raised when session is not found.

Source code in audex/exceptions.py
def __init__(self, message: str | None = None) -> None:
    """Initialize the exception.

    Args:
        message: Custom error message. If None, uses default_message.
    """
    self.message = message or self.default_message
    super().__init__(self.message)

options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true