Skip to content

decorators

decorators

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Decorator-based Workflow (Async)'

__all__ module-attribute

__all__ = ['DecoratedTask', 'TaskGroup', 'task', 'parallel', 'sequence', 'Workflow']

Classes

DecoratedTask dataclass

DecoratedTask(func: Callable[[ContextT], Awaitable[ContextT]], _metadata: TaskMetadata = (lambda: TaskMetadata(name='task'))(), condition: Callable[[ContextT], bool] | None = None)

Bases: Generic[ContextT]

Task created by @task decorator (async version).

Attributes
func instance-attribute
func: Callable[[ContextT], Awaitable[ContextT]]
condition class-attribute instance-attribute
condition: Callable[[ContextT], bool] | None = None
metadata property
metadata: TaskMetadata
Functions
execute async
execute(context: ContextT) -> ContextT
Source code in inferflow/asyncio/workflow/decorators.py
async def execute(self, context: ContextT) -> ContextT:
    if asyncio.iscoroutinefunction(self.func):
        return await self.func(context)
    return self.func(context)  # type: ignore
should_execute
should_execute(context: ContextT) -> bool
Source code in inferflow/asyncio/workflow/decorators.py
def should_execute(self, context: ContextT) -> bool:
    if self.condition is None:
        return True
    return self.condition(context)

TaskGroup dataclass

TaskGroup(tasks: list[TaskNode[ContextT]], mode: ExecutionMode, _metadata: TaskMetadata = (lambda: TaskMetadata(name='task_group'))())

Bases: Generic[ContextT]

Group of tasks with execution mode (async version).

Attributes
tasks instance-attribute
tasks: list[TaskNode[ContextT]]
mode instance-attribute
metadata property
metadata: TaskMetadata
Functions
execute async
execute(context: ContextT) -> ContextT
Source code in inferflow/asyncio/workflow/decorators.py
async def execute(self, context: ContextT) -> ContextT:
    if self.mode == ExecutionMode.SEQUENTIAL:
        for task in self.tasks:
            if task.should_execute(context):
                context = await task.execute(context)
        return context

    if self.mode == ExecutionMode.PARALLEL:
        results = await asyncio.gather(*[
            task.execute(context) for task in self.tasks if task.should_execute(context)
        ])
        for result in results:
            context = result
        return context

    raise ValueError(f"Unknown execution mode:  {self.mode}")
should_execute
should_execute(_context: ContextT) -> bool
Source code in inferflow/asyncio/workflow/decorators.py
def should_execute(self, _context: ContextT) -> bool:
    return True

Workflow

Workflow(*tasks: TaskNode[ContextT])

Bases: WorkflowExecutor[ContextT]

Workflow executor for decorator-based tasks (async version).

Source code in inferflow/asyncio/workflow/decorators.py
def __init__(self, *tasks: TaskNode[ContextT]):
    self.root = sequence(*tasks) if len(tasks) > 1 else tasks[0]
Attributes
root instance-attribute
root = sequence(*tasks) if len(tasks) > 1 else tasks[0]
Functions
run async
run(context: ContextT) -> ContextT
Source code in inferflow/asyncio/workflow/decorators.py
async def run(self, context: ContextT) -> ContextT:
    return await self.root.execute(context)
__aenter__ async
__aenter__() -> Self
Source code in inferflow/asyncio/workflow/decorators.py
async def __aenter__(self) -> t.Self:
    return self
__aexit__ async
__aexit__(*args: Any) -> None
Source code in inferflow/asyncio/workflow/decorators.py
async def __aexit__(self, *args: t.Any) -> None:
    pass

Functions

task

task(name: str | None = None, description: str | None = None, condition: Callable[[Any], bool] | None = None, timeout: float | None = None, retry: int = 0, skip_on_error: bool = False) -> Callable[[Callable[[ContextT], Awaitable[ContextT]]], DecoratedTask[ContextT]]

Decorator to create a workflow task (async version).

Source code in inferflow/asyncio/workflow/decorators.py
def task(
    name: str | None = None,
    description: str | None = None,
    condition: t.Callable[[t.Any], bool] | None = None,
    timeout: float | None = None,
    retry: int = 0,
    skip_on_error: bool = False,
) -> t.Callable[[t.Callable[[ContextT], t.Awaitable[ContextT]]], DecoratedTask[ContextT]]:
    """Decorator to create a workflow task (async version)."""

    def decorator(func: t.Callable[[ContextT], t.Awaitable[ContextT]]) -> DecoratedTask[ContextT]:
        task_name = name or func.__name__
        metadata = TaskMetadata(
            name=task_name,
            description=description or func.__doc__,
            timeout=timeout,
            retry=retry,
            skip_on_error=skip_on_error,
        )
        return DecoratedTask(func=func, _metadata=metadata, condition=condition)

    return decorator

parallel

parallel(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]

Create a parallel task group (async version).

Source code in inferflow/asyncio/workflow/decorators.py
def parallel(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]:
    """Create a parallel task group (async version)."""
    return TaskGroup(tasks=list(tasks), mode=ExecutionMode.PARALLEL)

sequence

sequence(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]

Create a sequential task group (async version).

Source code in inferflow/asyncio/workflow/decorators.py
def sequence(*tasks: TaskNode[ContextT]) -> TaskGroup[ContextT]:
    """Create a sequential task group (async version)."""
    return TaskGroup(tasks=list(tasks), mode=ExecutionMode.SEQUENTIAL)