Skip to content

Inmemory Cache

inmemory

Classes

TTLEntry

TTLEntry(value: Any, ttl: int | None = None)

Wrapper for cache entries with TTL information.

Source code in audex/lib/cache/inmemory.py
def __init__(self, value: t.Any, ttl: int | None = None):
    self.value = value
    self.expire_at = time.time() + ttl if ttl is not None else None
Functions
is_expired
is_expired() -> bool

Check if this entry has expired.

Source code in audex/lib/cache/inmemory.py
def is_expired(self) -> bool:
    """Check if this entry has expired."""
    if self.expire_at is None:
        return False
    return time.time() > self.expire_at
get_ttl
get_ttl() -> int | None

Get remaining TTL in seconds.

Source code in audex/lib/cache/inmemory.py
def get_ttl(self) -> int | None:
    """Get remaining TTL in seconds."""
    if self.expire_at is None:
        return None
    remaining = int(self.expire_at - time.time())
    return max(0, remaining)

InmemoryCache

InmemoryCache(*, key_builder: KeyBuilder, cache_type: Literal['lru', 'lfu', 'ttl', 'fifo'] = 'lru', maxsize: int = 1000, default_ttl: int = 300, negative_ttl: int = 60)

Bases: KVCache

Initialize InmemoryCache.

Parameters:

Name Type Description Default
key_builder KeyBuilder

KeyBuilder instance for building cache keys

required
cache_type Literal['lru', 'lfu', 'ttl', 'fifo']

Type of cache strategy ('lru', 'lfu', 'ttl', 'fifo')

'lru'
maxsize int

Maximum number of items in cache

1000
default_ttl int

Default TTL in seconds for cache entries

300
negative_ttl int

TTL in seconds for negative cache entries

60
Source code in audex/lib/cache/inmemory.py
def __init__(
    self,
    *,
    key_builder: KeyBuilder,
    cache_type: t.Literal["lru", "lfu", "ttl", "fifo"] = "lru",
    maxsize: int = 1000,
    default_ttl: int = 300,
    negative_ttl: int = 60,
) -> None:
    """Initialize InmemoryCache.

    Args:
        key_builder: KeyBuilder instance for building cache keys
        cache_type: Type of cache strategy ('lru', 'lfu', 'ttl', 'fifo')
        maxsize: Maximum number of items in cache
        default_ttl: Default TTL in seconds for cache entries
        negative_ttl: TTL in seconds for negative cache entries
    """
    super().__init__()
    self._key_builder = key_builder
    self.cache_type = cache_type
    self.maxsize = maxsize
    self.default_ttl = default_ttl
    self.negative_ttl = negative_ttl
    self.logger.info("Initializing Cachetools cache")

    # Thread lock for thread-safe operations (async lock)
    self._lock = asyncio.Lock()

    # Initialize the appropriate cache type
    self._cache: t.MutableMapping[str, TTLEntry]
    if cache_type == "lru":
        self._cache = cachetools.LRUCache(maxsize=maxsize)
        self.logger.info(f"Initialized LRU cache with maxsize={maxsize}")
    elif cache_type == "lfu":
        self._cache = cachetools.LFUCache(maxsize=maxsize)
        self.logger.info(f"Initialized LFU cache with maxsize={maxsize}")
    elif cache_type == "ttl":
        cache_ttl = default_ttl
        self._cache = cachetools.TTLCache(maxsize=maxsize, ttl=cache_ttl)
        self.logger.info(f"Initialized TTL cache with maxsize={maxsize}, ttl={cache_ttl}")
    elif cache_type == "fifo":
        # Cachetools doesn't have built-in FIFO, use OrderedDict wrapper
        self._cache = collections.OrderedDict()
        self._maxsize = maxsize
        self.logger.info(f"Initialized FIFO cache with maxsize={maxsize}")

    self.logger.info(
        f"Cachetools cache initialized with type={self.cache_type}, maxsize={maxsize}",
        cache_type=self.cache_type,
        maxsize=maxsize,
    )
Attributes
key_builder property
key_builder: KeyBuilder

Get the key builder instance.

Functions
set_negative async
set_negative(key: str) -> None

Store cache miss marker to prevent cache penetration.

Source code in audex/lib/cache/inmemory.py
async def set_negative(self, key: str, /) -> None:
    """Store cache miss marker to prevent cache penetration."""
    async with self._lock:
        try:
            await self._evict_if_needed()
            entry = TTLEntry(NEGATIVE, self.negative_ttl)
            self._cache[key] = entry
            self.logger.debug(f"Set negative cache for key: {key}")
        except Exception as e:
            self.logger.warning(f"Failed to set negative cache for key {key}: {e}")
keys async
keys() -> list[str]

Return a list of cache keys.

Source code in audex/lib/cache/inmemory.py
async def keys(self) -> list[str]:
    """Return a list of cache keys."""
    result = []
    async for key in self.iter_keys():
        result.append(key)
    return result
get async
get(key: str, default: VT | T | None = None) -> VT | T | None

Get value from cache, return default if not found.

Source code in audex/lib/cache/inmemory.py
async def get(self, key: str, default: VT | T | None = None, /) -> VT | T | None:
    """Get value from cache, return default if not found."""
    result = await self.get_item(key)
    if isinstance(result, Empty):
        return default
    return result  # type: ignore
setdefault async
setdefault(key: str, default: VT | None = None) -> VT | None

Get value or set and return default if key doesn't exist.

Source code in audex/lib/cache/inmemory.py
async def setdefault(self, key: str, default: VT | None = None, /) -> VT | None:
    """Get value or set and return default if key doesn't exist."""
    async with self._lock:
        try:
            await self._cleanup_expired()
            entry = self._cache.get(key)

            if entry is not None and not entry.is_expired():
                if isinstance(entry.value, CacheMiss):
                    self.logger.debug(f"Key {key} in negative cache")
                    return default
                self.logger.debug(f"Key {key} exists, returning cached value")
                return entry.value  # type: ignore

            # Key doesn't exist or is expired
            if entry is not None and entry.is_expired():
                del self._cache[key]

            if default is not None:
                await self._evict_if_needed()
                new_entry = TTLEntry(default, self.default_ttl)
                self._cache[key] = new_entry
                self.logger.debug(f"Set default value for key: {key}")
                return default

            await self._evict_if_needed()
            neg_entry = TTLEntry(NEGATIVE, self.negative_ttl)
            self._cache[key] = neg_entry
            self.logger.debug(f"Set negative cache for key: {key}")
            return None

        except Exception as e:
            self.logger.error(f"Error in setdefault for key {key}: {e}")
            return default
clear async
clear() -> None

Clear all cache entries with the configured prefix.

Source code in audex/lib/cache/inmemory.py
async def clear(self) -> None:
    """Clear all cache entries with the configured prefix."""
    async with self._lock:
        try:
            keys_to_delete = [key for key in self._cache if self.key_builder.validate(key)]

            for key in keys_to_delete:
                del self._cache[key]

            self.logger.info(f"Cleared {len(keys_to_delete)} cache entries")
        except Exception as e:
            self.logger.error(f"Failed to clear cache: {e}")
pop async
pop(key: str, default: VT | T | None = None) -> VT | T | None

Remove and return value, or return default if not found.

Source code in audex/lib/cache/inmemory.py
async def pop(self, key: str, default: VT | T | None = None, /) -> VT | T | None:
    """Remove and return value, or return default if not found."""
    async with self._lock:
        try:
            await self._cleanup_expired()
            entry = self._cache.get(key)

            if entry is not None and not entry.is_expired():
                del self._cache[key]
                if isinstance(entry.value, CacheMiss):
                    return default
                self.logger.debug(f"Successfully popped key: {key}")
                return entry.value  # type: ignore

            self.logger.debug(f"Key not found for pop operation: {key}")
            return default
        except Exception as e:
            self.logger.error(f"Error when popping key {key}: {e}")
            return default
popitem async
popitem() -> tuple[str, VT]

Remove and return an arbitrary (key, value) pair.

Source code in audex/lib/cache/inmemory.py
async def popitem(self) -> tuple[str, VT]:
    """Remove and return an arbitrary (key, value) pair."""
    async with self._lock:
        try:
            await self._cleanup_expired()

            for key in list(self._cache.keys()):
                if not self.key_builder.validate(key):
                    continue

                entry = self._cache.get(key)
                if entry is not None and not entry.is_expired():
                    del self._cache[key]
                    if not isinstance(entry.value, CacheMiss):
                        self.logger.debug(f"Successfully popped item: {key}")
                        return key, entry.value

            self.logger.debug("No items to pop")
            raise KeyError("popitem(): cache is empty")
        except KeyError:
            raise
        except Exception as e:
            self.logger.error(f"Error during popitem: {e}")
            raise KeyError(f"popitem() failed: {e}") from e
set async
set(key: str, value: VT) -> None

Set a key-value pair (alias for set_item).

Source code in audex/lib/cache/inmemory.py
async def set(self, key: str, value: VT, /) -> None:
    """Set a key-value pair (alias for set_item)."""
    await self.set_item(key, value)
setx async
setx(key: str, value: VT, /, ttl: int | None = None) -> None

Set a key-value pair with optional TTL.

Source code in audex/lib/cache/inmemory.py
async def setx(self, key: str, value: VT, /, ttl: int | None = None) -> None:
    """Set a key-value pair with optional TTL."""
    async with self._lock:
        try:
            await self._evict_if_needed()
            entry = TTLEntry(value, ttl)
            self._cache[key] = entry

            if ttl is not None:
                self.logger.debug(f"Set key {key} with TTL {ttl}")
            else:
                self.logger.debug(f"Set key {key} without TTL")
        except Exception as e:
            self.logger.error(f"Failed to setx for key {key}: {e}")
ttl async
ttl(key: str) -> int | None

Get TTL for a key.

Source code in audex/lib/cache/inmemory.py
async def ttl(self, key: str, /) -> int | None:
    """Get TTL for a key."""
    async with self._lock:
        try:
            entry = self._cache.get(key)

            if entry is None:
                self.logger.debug(f"Key {key} does not exist")
                return None

            if entry.is_expired():
                del self._cache[key]
                self.logger.debug(f"Key {key} has expired")
                return None

            ttl_value = entry.get_ttl()
            if ttl_value is None:
                self.logger.debug(f"Key {key} exists without expiration")
            else:
                self.logger.debug(f"Key {key} TTL: {ttl_value}")

            return ttl_value
        except Exception as e:
            self.logger.error(f"Error when getting TTL for key {key}: {e}")
            return None
expire async
expire(key: str, /, ttl: int | None = None) -> None

Set or remove expiration for a key.

Source code in audex/lib/cache/inmemory.py
async def expire(self, key: str, /, ttl: int | None = None) -> None:
    """Set or remove expiration for a key."""
    async with self._lock:
        try:
            entry = self._cache.get(key)

            if entry is None:
                self.logger.warning(f"Cannot set expiration for non-existent key: {key}")
                return

            if entry.is_expired():
                del self._cache[key]
                self.logger.warning(f"Key {key} has already expired")
                return

            # Create new entry with updated TTL
            new_entry = TTLEntry(entry.value, ttl)
            self._cache[key] = new_entry

            if ttl is not None:
                self.logger.debug(f"Set expiration for key {key}: {ttl} seconds")
            else:
                self.logger.debug(f"Removed expiration for key: {key}")
        except Exception as e:
            self.logger.error(f"Error when setting expiration for key {key}: {e}")
incr async
incr(key: str, /, amount: int = 1) -> int

Increment a key's value.

Source code in audex/lib/cache/inmemory.py
async def incr(self, key: str, /, amount: int = 1) -> int:
    """Increment a key's value."""
    async with self._lock:
        try:
            await self._cleanup_expired()
            entry = self._cache.get(key)

            if entry is None or entry.is_expired():
                # Initialize to amount if key doesn't exist
                new_value = amount
                await self._evict_if_needed()
                new_entry = TTLEntry(new_value, self.default_ttl)
                self._cache[key] = new_entry
                self.logger.debug(f"Initialized and incremented key {key} to {new_value}")
                return new_value

            if not isinstance(entry.value, int):
                self.logger.error(f"Cannot increment non-integer value for key {key}")
                raise ValueError(f"Value at key {key} is not an integer")

            new_value = entry.value + amount
            # Preserve existing TTL
            existing_ttl = entry.get_ttl()
            new_entry = TTLEntry(new_value, existing_ttl)
            self._cache[key] = new_entry

            self.logger.debug(f"Incremented key {key} by {amount}, result: {new_value}")
            return new_value
        except Exception as e:
            self.logger.error(f"Error when incrementing key {key}: {e}")
            return 0
decr async
decr(key: str, /, amount: int = 1) -> int

Decrement a key's value.

Source code in audex/lib/cache/inmemory.py
async def decr(self, key: str, /, amount: int = 1) -> int:
    """Decrement a key's value."""
    return await self.incr(key, -amount)
values async
values() -> list[VT]

Return all cache values.

Source code in audex/lib/cache/inmemory.py
async def values(self) -> list[VT]:
    """Return all cache values."""
    async with self._lock:
        try:
            await self._cleanup_expired()
            values = []
            for key, entry in self._cache.items():
                if not self.key_builder.validate(key):
                    continue
                if entry.is_expired():
                    continue
                if isinstance(entry.value, CacheMiss):
                    continue
                values.append(entry.value)
            return values
        except Exception as e:
            self.logger.error(f"Error when getting values: {e}")
            return []
items async
items() -> list[tuple[str, VT]]

Return all cache items as (key, value) pairs.

Source code in audex/lib/cache/inmemory.py
async def items(self) -> list[tuple[str, VT]]:
    """Return all cache items as (key, value) pairs."""
    async with self._lock:
        try:
            await self._cleanup_expired()
            items = []
            for key, entry in self._cache.items():
                if not self.key_builder.validate(key):
                    continue
                if entry.is_expired():
                    continue
                if isinstance(entry.value, CacheMiss):
                    continue
                items.append((key, entry.value))
            return items
        except Exception as e:
            self.logger.error(f"Error when getting items: {e}")
            return []
init async
init() -> None

Initialize cache resources if needed.

Source code in audex/lib/cache/inmemory.py
async def init(self) -> None:
    """Initialize cache resources if needed."""
    self.logger.info("InmemoryCache initialized")
close async
close() -> None

Close cache and cleanup resources.

Source code in audex/lib/cache/inmemory.py
async def close(self) -> None:
    """Close cache and cleanup resources."""
    async with self._lock:
        try:
            self._cache.clear()
            self.logger.info("Cachetools cache closed and cleared")
        except Exception as e:
            self.logger.warning(f"Error while closing cache: {e}")

options: show_root_heading: true show_source: true heading_level: 2 members_order: source show_signature_annotations: true separate_signature: true