Task

The otter.task package contains the classes that are used to define the tasks, their specs, the task state, task registry, its context and the task reporter, in charge of maintaining the task part of the manifest.

task.model module

Models for tasks.

class otter.task.model.Spec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, **extra_data: Any)[source]

Bases: BaseModel

Task Spec model.

A Spec describes the properties and types for the config of a Task. Specs are generated from the config file in otter.task.load_specs().

This is the base on which task Specs are built. Specific Tasks extend this class to add custom attributes.

The first word in name determines the task_type. This is used to identify the Task in the otter.task.TaskRegistry and in the config file.

For example, for a DoSomething class defining a Task, the task_type will be do_something, and in the configuration file, it could be used inside a Step like this:

steps:
    - do_something to create an example resource:
        some_field: some_value
        another_field: another_value
name: str

The name of the task. It is used to identify the task in the manifest and in the configuration file.

requires: list[str]

A list of tasks that this one depends on. The task will only run when all the prerequisites are completed.

scratchpad_ignore_missing: bool

Whether to ignore missing sentinels from the scratchpad when building the task. This is useful for tasks that use their own placeholders and internal scratchpads when running, e.g.: explode task.

Defaults to False.

property task_type: str

The task type, used to identify it in the task registry.

Determined by the first word in the task name.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.task.model.State(value)[source]

Bases: Enum

Enumeration of possible states for a otter.task.model.Task.

PENDING_RUN = 0
RUNNING = 1
PENDING_VALIDATION = 2
VALIDATING = 3
WAITING_FOR_SUBTASKS = 4
DONE = 5
otter.task.model.READY_STATES = [State.PENDING_RUN, State.PENDING_VALIDATION]

States on which a task can be considered as ready to be sent to a worker.

class otter.task.model.TaskContext(config: Config, scratchpad: Scratchpad)[source]

Bases: object

Task context.

state: State

The state of the task. See otter.task.model.State.

abort: Event

An event that will trigger if another task fails. The abort event is assigned to the task context when the task is sent to run.

config: Config

The configuration object. See otter.config.model.Config.

scratchpad: Scratchpad

The scratchpad object. See otter.scratchpad.model.Scratchpad.

specs: list[Spec]

The list of new specs to be added to the step.

add_specs(specs: Spec | Sequence[Spec]) None[source]

Add specs to the context.

The coordinator will take these new specs, build tasks from them and add them to the step’s queue.

This enables tasks to dynamically generate new tasks.

Warning

Adding requirements to these specs can cause cycles in the graph. This can only be checked at runtime, and can cause long running steps to fail halfway through.

Parameters:

specs (Sequence[Spec]) – The list of specs to add.

class otter.task.model.Task(spec: Spec, context: TaskContext)[source]

Bases: TaskReporter

Base class for tasks.

Task is the main building block for a Step. They are the main unit of work in Otter.

The config for a Task is contained in a otter.task.model.Spec object.

A Task can optionally have a list of otter.manifest.model.Artifact, which will contain metadata related to its input input and output and will be added to the step manifest.

Tasks subclass otter.task.model.TaskReporter to provide automatic logging, tracking and error handling.

To implement a new Task:
1. Create a new class that extends Spec with the required config fields.
2. Create a subclass of Task and implement the run() method. Optionally, implement the validate() method.
final has_validation() bool[source]

Determine if the task has validation.

Returns:

True if the task has validation, False otherwise.

Return type:

bool

final has_subtasks() bool[source]

Determine if the task has generated subtasks.

Returns:

True if the task has generated subtasks, False otherwise.

Return type:

bool

final get_execution_method() Callable[[...], Self] | Callable[[...], Awaitable[Self]][source]

Get the method to execute based on the task state.

Returns:

The method to execute.

Return type:

Callable[…, Self] | Callable[…, Awaitable[Self]]

final get_next_state() State[source]

Get the next state.

Returns the immediately next state, except in these cases:
  • If the current state is RUNNING and the task does not implement validation, check for subtasks. If there are subtasks, return WAITING_FOR_SUBTASKS, else return DONE.

  • If the current state is VALIDATING, check for subtasks. If there are subtasks, return WAITING_FOR_SUBTASKS, else return DONE.

Returns:

The next state.

Return type:

State

abstractmethod async run() Self[source]

Run the task.

This method contains the actual work of a Task. Tasks must implement run.

Optionally, a list of otter.manifest.models.Artifact object can be assigned to self.artifacts in the body of the method. These will be added to the step manifest.

Optionally, an abort event can be watched to stop the task if another fails. This is useful for long running work that can be stopped midway once the run is deemed to be a failure.

Returns:

The Task instance itself must be returned.

Return type:

Self

async validate() Self[source]

Validate the task result.

This method should be implemented if the task requires validation. If not implemented, the task will always be considered valid.

The validate method should call validators from the otter.validators module to perform validation. The validators must always return None and raise otter.util.errors.TaskValidationError if the validation fails.

See also

otter.validators for some built-in validators that can be used as examples to implement your own.

Returns:

The Task instance itself must be returned.

Return type:

Self

task.task_registry module

Registry of task classes, used to build tasks from their spec.

class otter.task.task_registry.TaskRegistry(config: Config, scratchpad: Scratchpad)[source]

Bases: object

Task types are registered here.

The registry is where a Task will be build from when the Step is run. It contains the mapping of a task_type to its Task and TaskSpec.

Note

The otter.scratchpad.model.Scratchpad placeholders are replaced into the Spec here, right before the Task is built.

register(package_name: str) None[source]

Register tasks in a package into the registry.

Parameters:

package_name (str) – The name of the package to register tasks from.

Raises:

SystemExit – If the package is not found, modules are missing the expected class, or the class is not found in the module.

build(spec: Spec) Task[source]

Build a Task.

Template replacement is performed here, right before initializing the Task.

Parameters:

spec (Spec) – The spec to build the Task from.

Raises:

SystemExit – If the task type is invalid or the spec is invalid.

Returns:

The built Task.

Return type:

Task

task.task_reporter module

TaskReporter class and report decorator for logging and updating tasks in the manifest.

class otter.task.task_reporter.TaskReporter(name: str)[source]

Bases: object

Class for logging and updating tasks in the manifest.

property artifacts: list[Artifact] | None

Return the Artifacts associated with the Task.

start_run() None[source]

Update a task that has started running.

finish_run(done: bool = False) None[source]

Update a task that has finished running.

start_validation() None[source]

Update a task that has started validation.

finish_validation() None[source]

Update a task that has finished validation.

abort() None[source]

Update a task that has been aborted.

fail(error: Exception, where: str) None[source]

Update a task that has failed running or validation.

otter.task.task_reporter.report(func: Callable[..., Task] | Callable[..., Awaitable[Task]]) Callable[..., Awaitable[Task]][source]

Decorator for logging and updating tasks in the manifest.

Module contents

Task module.

otter.task.load_specs(config_path: Path, step_name: str) list[Spec][source]

Load Specs for a Step.

Returns:

The task specs.

Return type:

list[Spec]