Skip to content

decorators

decorators

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Decorator-based Workflow'

__all__ module-attribute

__all__ = ['DecoratedTask', 'TaskGroup', 'task', 'parallel', 'sequence', 'Workflow']

Classes

DecoratedTask dataclass

DecoratedTask(func: Callable[[ContextT], ContextT], _metadata: TaskMetadata = (lambda: TaskMetadata(name='task'))(), condition: Callable[[ContextT], bool] | None = None)

Bases: Generic[ContextT]

Task created by @task decorator (sync version).

Attributes
func instance-attribute
func: Callable[[ContextT], ContextT]

Task function.

condition class-attribute instance-attribute
condition: Callable[[ContextT], bool] | None = None

Execution condition.

metadata property
metadata: TaskMetadata

Get task metadata.

Functions
execute
execute(context: ContextT) -> ContextT

Execute the task (sync).

Source code in inferflow/workflow/decorators.py
def execute(self, context: ContextT) -> ContextT:
    """Execute the task (sync)."""
    return self.func(context)
should_execute
should_execute(context: ContextT) -> bool

Check if task should execute.

Source code in inferflow/workflow/decorators.py
def should_execute(self, context: ContextT) -> bool:
    """Check if task should execute."""
    if self.condition is None:
        return True
    return self.condition(context)

TaskGroup dataclass

TaskGroup(tasks: list[TaskNode[ContextT]], mode: ExecutionMode, _metadata: TaskMetadata = (lambda: TaskMetadata(name='task_group'))())

Bases: Generic[ContextT]

Group of tasks with execution mode (sync version).

Attributes
tasks instance-attribute
tasks: list[TaskNode[ContextT]]

Tasks in the group.

mode instance-attribute

Execution mode.

metadata property
metadata: TaskMetadata

Get task metadata.

Functions
execute
execute(context: ContextT) -> ContextT

Execute all tasks in the group (sync).

Source code in inferflow/workflow/decorators.py
def execute(self, context: ContextT) -> ContextT:
    """Execute all tasks in the group (sync)."""
    if self.mode == ExecutionMode.SEQUENTIAL:
        for task in self.tasks:
            if task.should_execute(context):
                context = task.execute(context)
        return context

    if self.mode == ExecutionMode.PARALLEL:
        # Sync version:  sequential execution (no true parallelism)
        # For true parallel, use asyncio version
        for task in self.tasks:
            if task.should_execute(context):
                context = task.execute(context)
        return context

    raise ValueError(f"Unknown execution mode:  {self.mode}")
should_execute
should_execute(_context: ContextT) -> bool

Always execute task groups.

Source code in inferflow/workflow/decorators.py
def should_execute(self, _context: ContextT) -> bool:
    """Always execute task groups."""
    return True

Workflow

Workflow(*tasks: TaskNode[ContextT])

Bases: WorkflowExecutor[ContextT]

Workflow executor for decorator-based tasks (sync version).

Source code in inferflow/workflow/decorators.py
def __init__(self, *tasks: TaskNode[ContextT]):
    self.root = sequence(*tasks) if len(tasks) > 1 else tasks[0]
Attributes
root instance-attribute
root = sequence(*tasks) if len(tasks) > 1 else tasks[0]
Functions
run
run(context: ContextT) -> ContextT

Execute the workflow (sync).

Source code in inferflow/workflow/decorators.py
def run(self, context: ContextT) -> ContextT:
    """Execute the workflow (sync)."""
    return self.root.execute(context)
__enter__
__enter__() -> Self

Context manager entry.

Source code in inferflow/workflow/decorators.py
def __enter__(self) -> t.Self:
    """Context manager entry."""
    return self
__exit__
__exit__(*args: Any) -> None

Context manager exit.

Source code in inferflow/workflow/decorators.py
def __exit__(self, *args: t.Any) -> None:
    """Context manager exit."""

Functions

task

task(name: str | None = None, description: str | None = None, condition: Callable[[Any], bool] | None = None, timeout: float | None = None, retry: int = 0, skip_on_error: bool = False) -> Callable[[Callable[[ContextT], ContextT]], DecoratedTask[ContextT]]

Decorator to create a workflow task (sync version).

Source code in inferflow/workflow/decorators.py
def task(
    name: str | None = None,
    description: str | None = None,
    condition: t.Callable[[t.Any], bool] | None = None,
    timeout: float | None = None,
    retry: int = 0,
    skip_on_error: bool = False,
) -> t.Callable[[t.Callable[[ContextT], ContextT]], DecoratedTask[ContextT]]:
    """Decorator to create a workflow task (sync version)."""

    def decorator(func: t.Callable[[ContextT], ContextT]) -> DecoratedTask[ContextT]:
        task_name = name or func.__name__
        metadata = TaskMetadata(
            name=task_name,
            description=description or func.__doc__,
            timeout=timeout,
            retry=retry,
            skip_on_error=skip_on_error,
        )

        return DecoratedTask(
            func=func,
            _metadata=metadata,
            condition=condition,
        )

    return decorator

parallel

parallel(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]

Create a parallel task group (sync version).

Source code in inferflow/workflow/decorators.py
def parallel(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]:
    """Create a parallel task group (sync version)."""
    return TaskGroup(tasks=list(tasks), mode=ExecutionMode.PARALLEL)

sequence

sequence(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]

Create a sequential task group (sync version).

Source code in inferflow/workflow/decorators.py
def sequence(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]:
    """Create a sequential task group (sync version)."""
    return TaskGroup(tasks=list(tasks), mode=ExecutionMode.SEQUENTIAL)