Skip to content

tensorrt

tensorrt

Attributes

__doctitle__ module-attribute

__doctitle__ = 'TensorRT Runtime (Async)'

logger module-attribute

logger = getLogger('inferflow.asyncio.runtime.tensorrt')

__all__ module-attribute

__all__ = ['TensorRTRuntime']

Classes

TensorRTRuntime

TensorRTRuntime(model_path: str | PathLike[str], device: str | Device, precision: Precision = FP32, warmup_iterations: int = 3, warmup_shape: tuple[int, ...] = (1, 3, 224, 224))

Bases: RuntimeConfigMixin, TensorRTRuntimeMixin, BatchableRuntime[ndarray, Any]

TensorRT Runtime for optimized inference (async version).

Asynchronous version of inferflow.runtime.tensorrt.TensorRTRuntime.

All I/O operations (engine loading, inference) are executed in a thread pool to avoid blocking the event loop. The API is identical to the sync version, but all methods are async.

Supports
  • TensorRT (.engine, .trt) models
  • CUDA devices only
  • FP32, FP16, INT8 precision
  • Batch inference
  • Automatic warmup

Attributes:

Name Type Description
runtime Runtime | None

TensorRT runtime instance (None before load()).

engine ICudaEngine | None

TensorRT engine (None before load()).

context IExecutionContext | None

TensorRT execution context (None before load()).

inputs list[DeviceAllocation]

List of input device memory allocations.

outputs list[DeviceAllocation]

List of output device memory allocations.

bindings list[int]

List of binding pointers for execution.

stream Stream | None

CUDA stream for async operations.

Parameters:

Name Type Description Default
model_path str | PathLike[str]

Path to TensorRT engine file.

required
device str | Device

CUDA device (default: "cuda:0").

required
warmup_iterations int

Number of warmup iterations (default: 3).

3
warmup_shape tuple[int, ...]

Input shape for warmup (default: (1, 3, 640, 640)).

(1, 3, 224, 224)

Raises:

Type Description
FileNotFoundError

If model file does not exist.

ValueError

If device is not CUDA.

ImportError

If tensorrt or pycuda is not installed.

Example
import inferflow.asyncio as iff
import numpy as np

# Initialize runtime
runtime = iff.TensorRTRuntime(
    model_path="model.engine",
    device="cuda:0",
)

# Single inference
async with runtime:
    input_array = np.random.randn(1, 3, 640, 640).astype(
        np.float32
    )
    output = await runtime.infer(input_array)

# Batch inference
async with runtime:
    batch = [
        np.random.randn(1, 3, 640, 640).astype(np.float32),
        np.random.randn(1, 3, 640, 640).astype(np.float32),
    ]
    outputs = await runtime.infer_batch(batch)
Source code in inferflow/asyncio/runtime/tensorrt.py
def __init__(
    self,
    model_path: str | os.PathLike[str],
    device: str | Device,
    precision: Precision = Precision.FP32,
    warmup_iterations: int = 3,
    warmup_shape: tuple[int, ...] = (1, 3, 224, 224),
):
    super().__init__(
        model_path=model_path,
        device=device,
        precision=precision,
        warmup_iterations=warmup_iterations,
        warmup_shape=warmup_shape,
    )

    # Validate device (reuse mixin)
    self._validate_tensorrt_device()

    self.logger_trt = trt.Logger(trt.Logger.WARNING)
    self.runtime: trt.Runtime | None = None
    self.engine: trt.ICudaEngine | None = None
    self.context: trt.IExecutionContext | None = None

    # CUDA memory
    self.inputs: list[cuda.DeviceAllocation] = []
    self.outputs: list[cuda.DeviceAllocation] = []
    self.bindings: list[int] = []
    self.stream: cuda.Stream | None = None

    logger.info(f"TensorRTRuntime (async) initialized: model={self.model_path}, device={self.device}")
Attributes
logger_trt instance-attribute
logger_trt = Logger(WARNING)
runtime instance-attribute
runtime: Runtime | None = None
engine instance-attribute
engine: ICudaEngine | None = None
context instance-attribute
context: IExecutionContext | None = None
inputs instance-attribute
inputs: list[DeviceAllocation] = []
outputs instance-attribute
outputs: list[DeviceAllocation] = []
bindings instance-attribute
bindings: list[int] = []
stream instance-attribute
stream: Stream | None = None
Functions
load async
load() -> None

Load TensorRT engine and prepare for inference (async).

Performs
  • Load engine from disk (in thread pool)
  • Create execution context
  • Allocate device memory for inputs/outputs
  • Create CUDA stream
  • Warmup inference (in thread pool)

Raises:

Type Description
FileNotFoundError

If engine file does not exist.

RuntimeError

If TensorRT fails to deserialize engine.

Source code in inferflow/asyncio/runtime/tensorrt.py
async def load(self) -> None:
    """Load TensorRT engine and prepare for inference (async).

    Performs:
        - Load engine from disk (in thread pool)
        - Create execution context
        - Allocate device memory for inputs/outputs
        - Create CUDA stream
        - Warmup inference (in thread pool)

    Raises:
        FileNotFoundError: If engine file does not exist.
        RuntimeError: If TensorRT fails to deserialize engine.
    """
    logger.info(f"Loading TensorRT engine from {self.model_path}")

    # Load engine in thread pool
    loop = asyncio.get_event_loop()

    def _load_engine():
        self.runtime = trt.Runtime(self.logger_trt)
        with self.model_path.open("rb") as f:
            engine_data = f.read()
        return self.runtime.deserialize_cuda_engine(engine_data)

    self.engine = await loop.run_in_executor(None, _load_engine)
    self.context = self.engine.create_execution_context()

    # Create CUDA stream
    self.stream = cuda.Stream()

    # Allocate memory for all bindings
    for i in range(self.engine.num_bindings):
        binding_shape = self.engine.get_binding_shape(i)
        dtype = trt.nptype(self.engine.get_binding_dtype(i))

        # Calculate size (reuse mixin)
        size = self._calculate_binding_size(binding_shape, dtype)

        # Allocate device memory
        mem = cuda.mem_alloc(size)
        self.bindings.append(int(mem))

        if self.engine.binding_is_input(i):
            self.inputs.append(mem)
        else:
            self.outputs.append(mem)

    logger.info(f"Engine loaded: inputs={len(self.inputs)}, outputs={len(self.outputs)}")

    # Warmup
    await self._warmup()
infer async
infer(input: ndarray) -> Any

Run inference on a single input (async).

Uses CUDA async operations for efficient processing.

Parameters:

Name Type Description Default
input ndarray

Input numpy array.

required

Returns:

Type Description
Any

Output array with shape determined by model.

Raises:

Type Description
RuntimeError

If engine is not loaded.

Example
async with runtime:
    input = np.random.randn(1, 3, 640, 640).astype(
        np.float32
    )
    output = await runtime.infer(input)
Source code in inferflow/asyncio/runtime/tensorrt.py
async def infer(self, input: np.ndarray) -> t.Any:
    """Run inference on a single input (async).

    Uses CUDA async operations for efficient processing.

    Args:
        input: Input numpy array.

    Returns:
        Output array with shape determined by model.

    Raises:
        RuntimeError: If engine is not loaded.

    Example:
        ```python
        async with runtime:
            input = np.random.randn(1, 3, 640, 640).astype(
                np.float32
            )
            output = await runtime.infer(input)
        ```
    """
    if self.context is None:
        raise RuntimeError("Engine not loaded. Call load() first.")

    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, self._infer_sync, input)
infer_batch async
infer_batch(inputs: list[ndarray]) -> list[Any]

Run inference on a batch of inputs (async).

Concatenates inputs and runs batch inference, then splits the output back into individual results.

Parameters:

Name Type Description Default
inputs list[ndarray]

List of input arrays. Each should have shape (1, C, H, W).

required

Returns:

Type Description
list[Any]

List of outputs, one per input. Each maintains batch dimension.

Raises:

Type Description
RuntimeError

If engine is not loaded.

Example
async with runtime:
    batch = [
        np.random.randn(1, 3, 640, 640).astype(np.float32),
        np.random.randn(1, 3, 640, 640).astype(np.float32),
    ]
    outputs = await runtime.infer_batch(batch)
Source code in inferflow/asyncio/runtime/tensorrt.py
async def infer_batch(self, inputs: list[np.ndarray]) -> list[t.Any]:
    """Run inference on a batch of inputs (async).

    Concatenates inputs and runs batch inference, then splits
    the output back into individual results.

    Args:
        inputs:  List of input arrays. Each should have shape (1, C, H, W).

    Returns:
        List of outputs, one per input. Each maintains batch dimension.

    Raises:
        RuntimeError: If engine is not loaded.

    Example:
        ```python
        async with runtime:
            batch = [
                np.random.randn(1, 3, 640, 640).astype(np.float32),
                np.random.randn(1, 3, 640, 640).astype(np.float32),
            ]
            outputs = await runtime.infer_batch(batch)
        ```
    """
    if self.context is None:
        raise RuntimeError("Engine not loaded. Call load() first.")

    # Concatenate and infer
    batch = np.concatenate(inputs, axis=0)
    batch_output = await self.infer(batch)

    # Split outputs
    batch_size = len(inputs)
    if isinstance(batch_output, np.ndarray):
        return [batch_output[i : i + 1] for i in range(batch_size)]

    raise TypeError(f"Unexpected output type: {type(batch_output)}")
unload async
unload() -> None

Unload engine and free resources (async).

Performs
  • Free CUDA device memory
  • Release engine and context

Safe to call multiple times.

Source code in inferflow/asyncio/runtime/tensorrt.py
async def unload(self) -> None:
    """Unload engine and free resources (async).

    Performs:
        - Free CUDA device memory
        - Release engine and context

    Safe to call multiple times.
    """
    logger.info("Unloading engine")

    # Free CUDA memory
    for mem in self.inputs + self.outputs:
        mem.free()

    self.inputs.clear()
    self.outputs.clear()
    self.bindings.clear()

    self.context = None
    self.engine = None
    self.runtime = None
    self.stream = None

    logger.info("Engine unloaded")