Skip to content

pipeline

pipeline

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Pipeline'

__all__ module-attribute

__all__ = ['Pipeline', 'classification', 'detection', 'segmentation']

Classes

Pipeline

Pipeline(runtime: Runtime[P, R], batch_strategy: BatchStrategy[P, R] | None = None)

Bases: ABC, Generic[P, R, O]

Abstract inference pipeline (sync version).

A pipeline combines: - Preprocessing: Convert raw input to model-ready format - Inference: Run model inference via runtime - Postprocessing: Convert raw output to structured result

Example
pipeline = ClassificationPipeline(runtime=runtime)
with pipeline.serve():
    result = pipeline(image)
    print(
        f"Class: {result.class_name}, Confidence: {result.confidence}"
    )

Initialize pipeline.

Parameters:

Name Type Description Default
runtime Runtime[P, R]

Inference runtime.

required
batch_strategy BatchStrategy[P, R] | None

Optional batching strategy for improved throughput.

None
Source code in inferflow/pipeline/__init__.py
def __init__(
    self,
    runtime: Runtime[P, R],
    batch_strategy: BatchStrategy[P, R] | None = None,
):
    """Initialize pipeline.

    Args:
        runtime: Inference runtime.
        batch_strategy: Optional batching strategy for improved throughput.
    """
    self.runtime = runtime
    self.batch_strategy = batch_strategy
Attributes
runtime instance-attribute
runtime = runtime
batch_strategy instance-attribute
batch_strategy = batch_strategy
Functions
preprocess abstractmethod
preprocess(input: ImageInput) -> P

Preprocess raw input into model-ready format.

Parameters:

Name Type Description Default
input ImageInput

Raw input (image bytes, numpy array, PIL Image, etc.)

required

Returns:

Type Description
P

Preprocessed input ready for inference.

Source code in inferflow/pipeline/__init__.py
@abc.abstractmethod
def preprocess(self, input: ImageInput) -> P:
    """Preprocess raw input into model-ready format.

    Args:
        input: Raw input (image bytes, numpy array, PIL Image, etc.)

    Returns:
        Preprocessed input ready for inference.
    """
postprocess abstractmethod
postprocess(raw: R) -> O

Postprocess raw model output into structured result.

Parameters:

Name Type Description Default
raw R

Raw output from model inference.

required

Returns:

Type Description
O

Structured output (classification result, detections, etc.)

Source code in inferflow/pipeline/__init__.py
@abc.abstractmethod
def postprocess(self, raw: R) -> O:
    """Postprocess raw model output into structured result.

    Args:
        raw: Raw output from model inference.

    Returns:
        Structured output (classification result, detections, etc.)
    """
infer
infer(preprocessed: P) -> R

Run inference on preprocessed input.

This method automatically uses batching if a batch strategy is configured.

Parameters:

Name Type Description Default
preprocessed P

Preprocessed input.

required

Returns:

Type Description
R

Raw inference result.

Source code in inferflow/pipeline/__init__.py
def infer(self, preprocessed: P) -> R:
    """Run inference on preprocessed input.

    This method automatically uses batching if a batch strategy is configured.

    Args:
        preprocessed: Preprocessed input.

    Returns:
        Raw inference result.
    """
    if self.batch_strategy:
        return self.batch_strategy.submit(preprocessed)
    return self.runtime.infer(preprocessed)
__call__
__call__(input: ImageInput) -> O

End-to-end inference.

Parameters:

Name Type Description Default
input ImageInput

Raw input.

required

Returns:

Type Description
O

Structured output.

Example
result = pipeline(image_bytes)
Source code in inferflow/pipeline/__init__.py
def __call__(self, input: ImageInput) -> O:
    """End-to-end inference.

    Args:
        input:  Raw input.

    Returns:
        Structured output.

    Example:
        ```python
        result = pipeline(image_bytes)
        ```
    """
    preprocessed = self.preprocess(input)
    raw = self.infer(preprocessed)
    return self.postprocess(raw)
serve
serve() -> Iterator[Self]

Start serving pipeline with automatic lifecycle management.

This method
  • Loads the runtime
  • Starts batch processing (if enabled)
  • Yields the pipeline for inference
  • Cleans up resources on exit
Example
with pipeline.serve():
    result = pipeline(image)
Source code in inferflow/pipeline/__init__.py
@contextlib.contextmanager
def serve(self) -> t.Iterator[t.Self]:
    """Start serving pipeline with automatic lifecycle management.

    This method:
        - Loads the runtime
        - Starts batch processing (if enabled)
        - Yields the pipeline for inference
        - Cleans up resources on exit

    Example:
        ```python
        with pipeline.serve():
            result = pipeline(image)
        ```
    """
    with self.runtime.context():
        if self.batch_strategy:
            self.batch_strategy.start(self.runtime)

        try:
            yield self
        finally:
            if self.batch_strategy:
                self.batch_strategy.stop()

Functions

__getattr__

__getattr__(name: str) -> Any
Source code in inferflow/pipeline/__init__.py
def __getattr__(name: str) -> t.Any:
    if name in __all__:
        return importlib.import_module("." + name, __name__)
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Submodules