Skip to content

tasks

tasks

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Object-oriented Workflow (Async)'

InputT module-attribute

InputT = TypeVar('InputT')

OutputT module-attribute

OutputT = TypeVar('OutputT')

ContextT module-attribute

ContextT = TypeVar('ContextT')

__all__ module-attribute

__all__ = ['Task', 'PipelineTask', 'FunctionTask', 'TaskChain', 'ParallelTasks', 'TypedWorkflow']

Classes

Task

Task(name: str | None = None, description: str | None = None, timeout: float | None = None, retry: int = 0, skip_on_error: bool = False)

Bases: ABC, Generic[InputT, OutputT]

Abstract task with typed input and output (async version).

Source code in inferflow/asyncio/workflow/tasks.py
def __init__(
    self,
    name: str | None = None,
    description: str | None = None,
    timeout: float | None = None,
    retry: int = 0,
    skip_on_error: bool = False,
):
    self.metadata = TaskMetadata(
        name=name or self.__class__.__name__,
        description=description or self.__class__.__doc__,
        timeout=timeout,
        retry=retry,
        skip_on_error=skip_on_error,
    )
Attributes
metadata instance-attribute
metadata = TaskMetadata(name=name or __name__, description=description or __doc__, timeout=timeout, retry=retry, skip_on_error=skip_on_error)
Functions
execute abstractmethod async
execute(input: InputT) -> OutputT

Execute the task.

Source code in inferflow/asyncio/workflow/tasks.py
@abc.abstractmethod
async def execute(self, input: InputT) -> OutputT:
    """Execute the task."""
should_execute
should_execute(_input: InputT) -> bool
Source code in inferflow/asyncio/workflow/tasks.py
def should_execute(self, _input: InputT) -> bool:
    return True

PipelineTask dataclass

PipelineTask(pipeline: Any, name: str = '')

Bases: Task[bytes, Any]

Task wrapper for InferFlow pipelines (async version).

Attributes
pipeline instance-attribute
pipeline: Any
name class-attribute instance-attribute
name: str = ''
Functions
execute async
execute(input: bytes) -> Any
Source code in inferflow/asyncio/workflow/tasks.py
async def execute(self, input: bytes) -> t.Any:
    return await self.pipeline(input)

FunctionTask dataclass

FunctionTask(func: Callable[[InputT], Awaitable[OutputT] | OutputT], name: str = '')

Bases: Task[InputT, OutputT]

Task wrapper for functions (async version).

Attributes
func instance-attribute
func: Callable[[InputT], Awaitable[OutputT] | OutputT]
name class-attribute instance-attribute
name: str = ''
Functions
execute async
execute(input: InputT) -> OutputT
Source code in inferflow/asyncio/workflow/tasks.py
async def execute(self, input: InputT) -> OutputT:
    if asyncio.iscoroutinefunction(self.func):
        return await self.func(input)
    return self.func(input)  # type: ignore

TaskChain

TaskChain()

Bases: Generic[InputT, OutputT]

Chain of tasks with automatic type flow (async version).

Source code in inferflow/asyncio/workflow/tasks.py
def __init__(self):
    self.tasks: list[Task[t.Any, t.Any]] = []
Attributes
tasks instance-attribute
tasks: list[Task[Any, Any]] = []
Functions
then
then(task: Task[Any, OutputT]) -> TaskChain[InputT, OutputT]
Source code in inferflow/asyncio/workflow/tasks.py
def then(self, task: Task[t.Any, OutputT]) -> TaskChain[InputT, OutputT]:
    self.tasks.append(task)
    return self  # type: ignore
execute async
execute(input: InputT) -> OutputT
Source code in inferflow/asyncio/workflow/tasks.py
async def execute(self, input: InputT) -> OutputT:
    current: t.Any = input
    for task in self.tasks:
        if task.should_execute(current):
            current = await task.execute(current)
    return current  # type: ignore

ParallelTasks

ParallelTasks(tasks: list[Task[InputT, Any]])

Bases: Generic[InputT]

Execute multiple tasks in parallel with same input (async version).

Source code in inferflow/asyncio/workflow/tasks.py
def __init__(self, tasks: list[Task[InputT, t.Any]]):
    self.tasks = tasks
Attributes
tasks instance-attribute
tasks = tasks
Functions
execute async
execute(input: InputT) -> list[Any]
Source code in inferflow/asyncio/workflow/tasks.py
async def execute(self, input: InputT) -> list[t.Any]:
    return await asyncio.gather(*[task.execute(input) for task in self.tasks if task.should_execute(input)])

TypedWorkflow dataclass

TypedWorkflow(input_builder: Callable[[Any], ContextT], tasks: list[Task[ContextT, ContextT]], output_builder: Callable[[ContextT], Any] | None = None)

Bases: WorkflowExecutor[ContextT]

Typed workflow with explicit context transformation (async version).

Attributes
input_builder instance-attribute
input_builder: Callable[[Any], ContextT]
tasks instance-attribute
tasks: list[Task[ContextT, ContextT]]
output_builder class-attribute instance-attribute
output_builder: Callable[[ContextT], Any] | None = None
Functions
run async
run(input_data: Any) -> Any
Source code in inferflow/asyncio/workflow/tasks.py
async def run(self, input_data: t.Any) -> t.Any:
    context = self.input_builder(input_data)

    for task in self.tasks:
        if task.should_execute(context):
            context = await task.execute(context)

    if self.output_builder:
        return self.output_builder(context)
    return context