Skip to content

batch

batch

Attributes

__all__ module-attribute

__all__ = ['BatchStrategy', 'BatchMetrics', 'dynamic']

Classes

BatchMetrics dataclass

BatchMetrics(total_requests: int = 0, total_batches: int = 0, total_latency_ms: float = 0.0, current_queue_size: int = 0, current_batch_size: int = 0, rejected_requests: int = 0, processing_times: list[float] = list(), batch_sizes: list[int] = list())

Real-time batch processing metrics.

Attributes
total_requests class-attribute instance-attribute
total_requests: int = 0

Total number of requests processed.

total_batches class-attribute instance-attribute
total_batches: int = 0

Total number of batches executed.

total_latency_ms class-attribute instance-attribute
total_latency_ms: float = 0.0

Cumulative latency in milliseconds.

current_queue_size class-attribute instance-attribute
current_queue_size: int = 0

Current number of items in queue.

current_batch_size class-attribute instance-attribute
current_batch_size: int = 0

Current batch size being used.

rejected_requests class-attribute instance-attribute
rejected_requests: int = 0

Number of rejected requests due to queue full.

processing_times class-attribute instance-attribute
processing_times: list[float] = field(default_factory=list)

Recent processing times (sliding window).

batch_sizes class-attribute instance-attribute
batch_sizes: list[int] = field(default_factory=list)

Recent batch sizes (sliding window).

avg_batch_size property
avg_batch_size: float
avg_latency_ms property
avg_latency_ms: float
avg_processing_time_ms property
avg_processing_time_ms: float
Functions
add_batch
add_batch(batch_size: int, processing_time: float, window_size: int = 50) -> None

Record a batch execution.

Parameters:

Name Type Description Default
batch_size int

Size of the batch processed.

required
processing_time float

Time taken to process (seconds).

required
window_size int

Maximum number of recent measurements to keep.

50
Source code in inferflow/batch/__init__.py
def add_batch(self, batch_size: int, processing_time: float, window_size: int = 50) -> None:
    """Record a batch execution.

    Args:
        batch_size: Size of the batch processed.
        processing_time: Time taken to process (seconds).
        window_size: Maximum number of recent measurements to keep.
    """
    self.total_batches += 1
    self.total_requests += batch_size
    self.total_latency_ms += processing_time * 1000

    self.processing_times.append(processing_time)
    self.batch_sizes.append(batch_size)

    # Keep only recent measurements
    if len(self.processing_times) > window_size:
        self.processing_times.pop(0)
    if len(self.batch_sizes) > window_size:
        self.batch_sizes.pop(0)

BatchStrategy

BatchStrategy()

Bases: ABC, Generic[P, R]

Abstract batch processing strategy (async version).

A batch strategy manages
  • Request queuing
  • Batch formation
  • Batch execution
  • Result distribution
Example
strategy = DynamicBatchStrategy(
    max_batch_size=32, max_wait_ms=50
)
await strategy.start(runtime)

# Submit requests (automatically batched)
result = await strategy.submit(preprocessed_input)

await strategy.stop()
Source code in inferflow/asyncio/batch/__init__.py
def __init__(self) -> None:
    self.runtime: Runtime[P, R] | None = None
    self.metrics = BatchMetrics()
    self._running = False
Attributes
runtime instance-attribute
runtime: Runtime[P, R] | None = None
metrics instance-attribute
metrics = BatchMetrics()
is_running property
is_running: bool

Check if batch worker is running.

Functions
submit abstractmethod async
submit(item: P) -> R

Submit an item for batched processing.

Parameters:

Name Type Description Default
item P

Preprocessed input.

required

Returns:

Type Description
R

Inference result.

Raises:

Type Description
RuntimeError

If strategy is not started.

Source code in inferflow/asyncio/batch/__init__.py
@abc.abstractmethod
async def submit(self, item: P) -> R:
    """Submit an item for batched processing.

    Args:
        item: Preprocessed input.

    Returns:
        Inference result.

    Raises:
        RuntimeError: If strategy is not started.
    """
start abstractmethod async
start(runtime: Runtime[P, R]) -> None

Start the batch processing worker.

Parameters:

Name Type Description Default
runtime Runtime[P, R]

Runtime to use for inference.

required
Source code in inferflow/asyncio/batch/__init__.py
@abc.abstractmethod
async def start(self, runtime: Runtime[P, R]) -> None:
    """Start the batch processing worker.

    Args:
        runtime: Runtime to use for inference.
    """
stop abstractmethod async
stop() -> None

Stop the batch processing worker and cleanup resources.

Source code in inferflow/asyncio/batch/__init__.py
@abc.abstractmethod
async def stop(self) -> None:
    """Stop the batch processing worker and cleanup resources."""
get_metrics
get_metrics() -> BatchMetrics

Get current batch processing metrics.

Source code in inferflow/asyncio/batch/__init__.py
def get_metrics(self) -> BatchMetrics:
    """Get current batch processing metrics."""
    return self.metrics

Functions

__getattr__

__getattr__(name: str) -> Any
Source code in inferflow/asyncio/batch/__init__.py
def __getattr__(name: str) -> t.Any:
    if name in __all__:
        return importlib.import_module("." + name, __name__)
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Submodules