Skip to content

tasks

tasks

Attributes

__doctitle__ module-attribute

__doctitle__ = 'Object-oriented Workflow'

InputT module-attribute

InputT = TypeVar('InputT')

OutputT module-attribute

OutputT = TypeVar('OutputT')

ContextT module-attribute

ContextT = TypeVar('ContextT')

__all__ module-attribute

__all__ = ['Task', 'PipelineTask', 'FunctionTask', 'TaskChain', 'ParallelTasks', 'TypedWorkflow']

Classes

Task

Task(name: str | None = None, description: str | None = None, timeout: float | None = None, retry: int = 0, skip_on_error: bool = False)

Bases: ABC, Generic[InputT, OutputT]

Abstract task with typed input and output (sync version).

Source code in inferflow/workflow/tasks.py
def __init__(
    self,
    name: str | None = None,
    description: str | None = None,
    timeout: float | None = None,
    retry: int = 0,
    skip_on_error: bool = False,
):
    self.metadata = TaskMetadata(
        name=name or self.__class__.__name__,
        description=description or self.__class__.__doc__,
        timeout=timeout,
        retry=retry,
        skip_on_error=skip_on_error,
    )
Attributes
metadata instance-attribute
metadata = TaskMetadata(name=name or __name__, description=description or __doc__, timeout=timeout, retry=retry, skip_on_error=skip_on_error)
Functions
execute abstractmethod
execute(input: InputT) -> OutputT

Execute the task (sync).

Source code in inferflow/workflow/tasks.py
@abc.abstractmethod
def execute(self, input: InputT) -> OutputT:
    """Execute the task (sync)."""
should_execute
should_execute(_input: InputT) -> bool

Check if task should execute.

Source code in inferflow/workflow/tasks.py
def should_execute(self, _input: InputT) -> bool:
    """Check if task should execute."""
    return True

PipelineTask dataclass

PipelineTask(pipeline: Any, name: str = '')

Bases: Task[bytes, Any]

Task wrapper for InferFlow pipelines (sync version).

Attributes
pipeline instance-attribute
pipeline: Any

InferFlow pipeline instance.

name class-attribute instance-attribute
name: str = ''

Task name.

Functions
execute
execute(input: bytes) -> Any

Execute pipeline inference (sync).

Source code in inferflow/workflow/tasks.py
def execute(self, input: bytes) -> t.Any:
    """Execute pipeline inference (sync)."""
    return self.pipeline(input)

FunctionTask dataclass

FunctionTask(func: Callable[[InputT], OutputT], name: str = '')

Bases: Task[InputT, OutputT]

Task wrapper for functions (sync version).

Attributes
func instance-attribute
func: Callable[[InputT], OutputT]

Task function.

name class-attribute instance-attribute
name: str = ''

Task name.

Functions
execute
execute(input: InputT) -> OutputT

Execute function (sync).

Source code in inferflow/workflow/tasks.py
def execute(self, input: InputT) -> OutputT:
    """Execute function (sync)."""
    return self.func(input)

TaskChain

TaskChain()

Bases: Generic[InputT, OutputT]

Chain of tasks with automatic type flow (sync version).

Source code in inferflow/workflow/tasks.py
def __init__(self):
    self.tasks: list[Task[t.Any, t.Any]] = []
Attributes
tasks instance-attribute
tasks: list[Task[Any, Any]] = []
Functions
then
then(task: Task[Any, OutputT]) -> TaskChain[InputT, OutputT]

Add a task to the chain.

Source code in inferflow/workflow/tasks.py
def then(self, task: Task[t.Any, OutputT]) -> TaskChain[InputT, OutputT]:
    """Add a task to the chain."""
    self.tasks.append(task)
    return self  # type: ignore
execute
execute(input: InputT) -> OutputT

Execute all tasks in sequence (sync).

Source code in inferflow/workflow/tasks.py
def execute(self, input: InputT) -> OutputT:
    """Execute all tasks in sequence (sync)."""
    current: t.Any = input

    for task in self.tasks:
        if task.should_execute(current):
            current = task.execute(current)

    return current  # type: ignore

ParallelTasks

ParallelTasks(tasks: list[Task[InputT, Any]])

Bases: Generic[InputT]

Execute multiple tasks in parallel with same input (sync version).

Note: In sync version, tasks are executed sequentially. Use asyncio version for true parallelism.

Source code in inferflow/workflow/tasks.py
def __init__(self, tasks: list[Task[InputT, t.Any]]):
    self.tasks = tasks
Attributes
tasks instance-attribute
tasks = tasks
Functions
execute
execute(input: InputT) -> list[Any]

Execute all tasks (sync - sequential).

Source code in inferflow/workflow/tasks.py
def execute(self, input: InputT) -> list[t.Any]:
    """Execute all tasks (sync - sequential)."""
    results = []
    for task in self.tasks:
        if task.should_execute(input):
            results.append(task.execute(input))
    return results

TypedWorkflow dataclass

TypedWorkflow(input_builder: Callable[[Any], ContextT], tasks: list[Task[ContextT, ContextT]], output_builder: Callable[[ContextT], Any] | None = None)

Bases: WorkflowExecutor[ContextT]

Typed workflow with explicit context transformation (sync version).

Attributes
input_builder instance-attribute
input_builder: Callable[[Any], ContextT]

Build initial context from input.

tasks instance-attribute
tasks: list[Task[ContextT, ContextT]]

Tasks to execute.

output_builder class-attribute instance-attribute
output_builder: Callable[[ContextT], Any] | None = None

Extract output from final context.

Functions
run
run(input_data: Any) -> Any

Execute workflow (sync).

Source code in inferflow/workflow/tasks.py
def run(self, input_data: t.Any) -> t.Any:
    """Execute workflow (sync)."""
    context = self.input_builder(input_data)

    for task in self.tasks:
        if task.should_execute(context):
            context = task.execute(context)

    if self.output_builder:
        return self.output_builder(context)
    return context