Skip to content

dynamic

dynamic

Attributes

logger module-attribute

logger = getLogger('inferflow.asyncio.batch.dynamic')

__all__ module-attribute

__all__ = ['DynamicBatchStrategy', 'QueueFullError']

Classes

QueueFullError

Bases: Exception

Raised when queue is full and blocking is disabled.

DynamicBatchStrategy

DynamicBatchStrategy(min_batch_size: int = 1, max_batch_size: int = 32, max_wait_ms: float = 50, queue_size: int = 1000, block_on_full: bool = True)

Bases: BatchStrategy[P, R]

Dynamic batching with adaptive batch size (sync version).

This strategy
  • Collects requests into batches
  • Adjusts batch size based on queue depth
  • Uses timeout to ensure low latency
  • Distributes results to individual requests

Parameters:

Name Type Description Default
min_batch_size int

Minimum batch size (default: 1).

1
max_batch_size int

Maximum batch size (default: 32).

32
max_wait_ms float

Maximum wait time before processing batch (default: 50ms).

50
queue_size int

Maximum queue size (default: 1000).

1000
block_on_full bool

Block when queue is full instead of raising error.

True
Example
strategy = DynamicBatchStrategy(
    max_batch_size=32,
    max_wait_ms=50,
)
Source code in inferflow/asyncio/batch/dynamic.py
def __init__(
    self,
    min_batch_size: int = 1,
    max_batch_size: int = 32,
    max_wait_ms: float = 50,
    queue_size: int = 1000,
    block_on_full: bool = True,
):
    super().__init__()
    self.min_batch_size = min_batch_size
    self.max_batch_size = max_batch_size
    self.max_wait_ms = max_wait_ms / 1000
    self.queue_size = queue_size
    self.block_on_full = block_on_full

    self._queue: aio.Queue[tuple[P, aio.Future[R]]] = aio.Queue(maxsize=queue_size)
    self._worker_task: aio.Task[None] | None = None

    logger.info(
        f"DynamicBatchStrategy (async) initialized: "
        f"batch_size=[{min_batch_size}, {max_batch_size}], "
        f"max_wait={max_wait_ms}ms, queue_size={queue_size}"
    )
Attributes
min_batch_size instance-attribute
min_batch_size = min_batch_size
max_batch_size instance-attribute
max_batch_size = max_batch_size
max_wait_ms instance-attribute
max_wait_ms = max_wait_ms / 1000
queue_size instance-attribute
queue_size = queue_size
block_on_full instance-attribute
block_on_full = block_on_full
Functions
submit async
submit(item: P) -> R

Submit an item for batched processing.

Parameters:

Name Type Description Default
item P

Preprocessed input.

required

Returns:

Type Description
R

Inference result.

Raises:

Type Description
RuntimeError

If strategy is not started.

QueueFullError

If queue is full and blocking is disabled.

Source code in inferflow/asyncio/batch/dynamic.py
async def submit(self, item: P) -> R:
    """Submit an item for batched processing.

    Args:
        item: Preprocessed input.

    Returns:
        Inference result.

    Raises:
        RuntimeError: If strategy is not started.
        QueueFullError: If queue is full and blocking is disabled.
    """
    if not self._running:
        raise RuntimeError("BatchStrategy not started. Call start() first.")

    future: aio.Future[R] = aio.Future()

    if self._queue.full() and not self.block_on_full:
        self.metrics.rejected_requests += 1
        raise QueueFullError(
            f"Queue is full ({self.queue_size} items). Rejected {self.metrics.rejected_requests} requests."
        )

    await self._queue.put((item, future))
    self.metrics.current_queue_size = self._queue.qsize()

    return await future
start async
start(runtime: Runtime[P, R]) -> None

Start the batch processing worker.

Parameters:

Name Type Description Default
runtime Runtime[P, R]

Runtime to use for inference.

required
Source code in inferflow/asyncio/batch/dynamic.py
async def start(self, runtime: Runtime[P, R]) -> None:
    """Start the batch processing worker.

    Args:
        runtime: Runtime to use for inference.
    """
    if self._running:
        logger.warning("BatchStrategy already running")
        return

    self.runtime = runtime
    self._running = True
    self._worker_task = aio.create_task(self._worker())

    logger.info("DynamicBatchStrategy (async) started")
stop async
stop() -> None

Stop the batch processing worker (async).

Source code in inferflow/asyncio/batch/dynamic.py
async def stop(self) -> None:
    """Stop the batch processing worker (async)."""
    if not self._running:
        return

    self._running = False

    if self._worker_task:
        self._worker_task.cancel()
        with contextlib.suppress(aio.CancelledError):
            await self._worker_task

    logger.info(
        f"DynamicBatchStrategy (async) stopped."
        f"Processed {self.metrics.total_requests} requests in "
        f"{self.metrics.total_batches} batches."
    )