Skip to content

workflow

workflow

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Workflow (Async)'

ContextT module-attribute

ContextT = TypeVar('ContextT')

__all__ module-attribute

__all__ = ['ExecutionMode', 'TaskMetadata', 'TaskNode', 'WorkflowExecutor', 'decorators', 'tasks']

Classes

ExecutionMode

Bases: str, Enum

Task execution mode.

Attributes
SEQUENTIAL class-attribute instance-attribute
SEQUENTIAL = 'sequential'

Execute tasks one by one.

PARALLEL class-attribute instance-attribute
PARALLEL = 'parallel'

Execute tasks concurrently.

TaskMetadata dataclass

TaskMetadata(name: str, description: str | None = None, timeout: float | None = None, retry: int = 0, skip_on_error: bool = False)

Metadata for a workflow task.

Attributes
name instance-attribute
name: str

Task name.

description class-attribute instance-attribute
description: str | None = None

Task description.

timeout class-attribute instance-attribute
timeout: float | None = None

Execution timeout in seconds.

retry class-attribute instance-attribute
retry: int = 0

Number of retries on failure.

skip_on_error class-attribute instance-attribute
skip_on_error: bool = False

Continue workflow if task fails.

Functions

TaskNode

Bases: Protocol[ContextT]

Protocol for a workflow task node (async version).

Attributes
metadata property
metadata: TaskMetadata

Get task metadata.

Functions
execute async
execute(context: ContextT) -> ContextT

Execute the task.

Source code in inferflow/asyncio/workflow/__init__.py
async def execute(self, context: ContextT) -> ContextT:
    """Execute the task."""
should_execute
should_execute(context: ContextT) -> bool

Check if task should execute.

Source code in inferflow/asyncio/workflow/__init__.py
def should_execute(self, context: ContextT) -> bool:
    """Check if task should execute."""

WorkflowExecutor

Bases: ABC, Generic[ContextT]

Abstract workflow executor (async version).

Functions
run abstractmethod async
run(context: ContextT) -> ContextT

Execute the workflow.

Source code in inferflow/asyncio/workflow/__init__.py
@abc.abstractmethod
async def run(self, context: ContextT) -> ContextT:
    """Execute the workflow."""

Functions

__getattr__

__getattr__(name: str) -> Any
Source code in inferflow/asyncio/workflow/__init__.py
def __getattr__(name: str) -> t.Any:
    if name in __all__:
        return importlib.import_module("." + name, __name__)
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Submodules