Skip to content

workflow

workflow

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Workflow'

T_co module-attribute

T_co = TypeVar('T_co', covariant=True)

ContextT module-attribute

ContextT = TypeVar('ContextT')

__all__ module-attribute

__all__ = ['ExecutionMode', 'TaskMetadata', 'TaskNode', 'WorkflowExecutor', 'decorators', 'tasks']

Classes

ExecutionMode

Bases: str, Enum

Task execution mode.

Attributes
SEQUENTIAL class-attribute instance-attribute
SEQUENTIAL = 'sequential'

Execute tasks one by one.

PARALLEL class-attribute instance-attribute
PARALLEL = 'parallel'

Execute tasks concurrently.

TaskMetadata dataclass

TaskMetadata(name: str, description: str | None = None, timeout: float | None = None, retry: int = 0, skip_on_error: bool = False)

Metadata for a workflow task.

Attributes
name instance-attribute
name: str

Task name.

description class-attribute instance-attribute
description: str | None = None

Task description.

timeout class-attribute instance-attribute
timeout: float | None = None

Execution timeout in seconds.

retry class-attribute instance-attribute
retry: int = 0

Number of retries on failure.

skip_on_error class-attribute instance-attribute
skip_on_error: bool = False

Continue workflow if task fails.

Functions

TaskNode

Bases: Protocol[ContextT]

Protocol for a workflow task node (sync version).

Attributes
metadata property
metadata: TaskMetadata

Get task metadata.

Functions
execute
execute(context: ContextT) -> ContextT

Execute the task.

Parameters:

Name Type Description Default
context ContextT

Workflow context.

required

Returns:

Type Description
ContextT

Updated context.

Source code in inferflow/workflow/__init__.py
def execute(self, context: ContextT) -> ContextT:
    """Execute the task.

    Args:
        context:  Workflow context.

    Returns:
        Updated context.
    """
should_execute
should_execute(context: ContextT) -> bool

Check if task should execute.

Parameters:

Name Type Description Default
context ContextT

Current context.

required

Returns:

Type Description
bool

True if task should execute.

Source code in inferflow/workflow/__init__.py
def should_execute(self, context: ContextT) -> bool:
    """Check if task should execute.

    Args:
        context:  Current context.

    Returns:
        True if task should execute.
    """

WorkflowExecutor

Bases: ABC, Generic[ContextT]

Abstract workflow executor (sync version).

Functions
run abstractmethod
run(context: ContextT) -> ContextT

Execute the workflow.

Parameters:

Name Type Description Default
context ContextT

Initial context.

required

Returns:

Type Description
ContextT

Final context after execution.

Source code in inferflow/workflow/__init__.py
@abc.abstractmethod
def run(self, context: ContextT) -> ContextT:
    """Execute the workflow.

    Args:
        context: Initial context.

    Returns:
        Final context after execution.
    """

Functions

__getattr__

__getattr__(name: str) -> Any
Source code in inferflow/workflow/__init__.py
def __getattr__(name: str) -> t.Any:
    if name in __all__:
        return importlib.import_module("." + name, __name__)
    raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Submodules