Task

The otter.task package contains the classes that are used to define the tasks, their specs, the task state, task registry, its context and the task reporter, in charge of maintaining the task part of the manifest.

task.model module

Models for tasks.

pydantic model otter.task.model.Spec[source]

Bases: BaseModel

Task Spec model.

A Spec describes the properties and types for the config of a Task. Specs are generated from the config file in otter.task.load_specs().

This is the base on which task Specs are built. Specific Tasks extend this class to add custom attributes.

The first word in name determines the task_type. This is used to identify the Task in the otter.task.TaskRegistry and in the config file.

For example, for a DoSomething class defining a Task, the task_type will be do_something, and in the configuration file, it could be used inside a Step like this:

steps:
    - do_something to create an example resource:
        some_field: some_value
        another_field: another_value

Show JSON schema
{
   "title": "Spec",
   "description": "Task Spec model.\n\nA `Spec` describes the properties and types for the config of a :py:class:`Task`.\n`Specs` are generated from the config file in :py:meth:`otter.task.load_specs`.\n\nThis is the base on which task `Specs` are built. Specific `Tasks` extend this\nclass to add custom attributes.\n\nThe first word in :py:attr:`name` determines the :py:attr:`task_type`. This is\nused to identify the :py:class:`Task` in the :py:class:`otter.task.TaskRegistry`\nand in the config file.\n\nFor example, for a ``DoSomething`` class defining a `Task`, the `task_type`\nwill be ``do_something``, and in the configuration file, it could be used\ninside a `Step` like this:\n\n.. code-block:: yaml\n\n    steps:\n        - do_something to create an example resource:\n            some_field: some_value\n            another_field: another_value",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "requires": {
         "default": [],
         "items": {
            "type": "string"
         },
         "title": "Requires",
         "type": "array"
      },
      "scratchpad_ignore_missing": {
         "default": false,
         "title": "Scratchpad Ignore Missing",
         "type": "boolean"
      }
   },
   "additionalProperties": true,
   "required": [
      "name"
   ]
}

Config:
  • extra: str = allow

Fields:
Validators:
  • _name_has_description » name

field name: str [Required]

The name of the task. It is used to identify the task in the manifest and in the configuration file.

Validated by:
  • _name_has_description

field requires: list[str] = []

A list of task names that this task depends on. The task will only run when all the prerequisites are completed.

field scratchpad_ignore_missing: bool = False

Whether to ignore missing keys in the scratchpad when replacing placeholders.

This is useful for tasks that use their own placeholders, which will happen after the spec is instantiated and the placeholders contained in the global scratchpad are replaced.

Defaults to False.

property task_type: str

The task type, used to identify it in the task registry.

Determined by the first word in the task name.

class otter.task.model.State(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

Enumeration of possible states for a otter.task.model.Task.

PENDING_RUN = 0
RUNNING = 1
PENDING_VALIDATION = 2
VALIDATING = 3
DONE = 4
next() State[source]

Get the next state.

otter.task.model.DEP_READY_STATES = [State.PENDING_VALIDATION, State.VALIDATING, State.DONE]

States on which a dependency can be considered as ready for other tasks depending on it.

otter.task.model.READY_STATES = [State.PENDING_RUN, State.PENDING_VALIDATION]

States on which a task can be considered as ready to be sent to a worker.

class otter.task.model.TaskContext(config: Config, scratchpad: Scratchpad)[source]

Bases: object

Task context.

state: State

The state of the task. See otter.task.model.State.

abort: Event

An event that will trigger if another task fails. The abort event is assigned to the task context when the task is sent to run.

config: Config

The configuration object. See otter.config.model.Config.

scratchpad: Scratchpad

The scratchpad object. See otter.scratchpad.model.Scratchpad.

property specs: list[Spec]

The list of generated specs.

add_specs(specs: Sequence[Spec]) None[source]

Add specs to the context.

This method can be called from inside a task and passed a list of specs. As soon as the task is finished, the specs will be instantiated into new tasks and added to the queue.

This enables tasks to dynamically generate new tasks based on the result of the current task.

Warning

Adding requirements to these specs can cause cycles in the graph. This can only be checked at runtime, and can cause long running steps to fail halfway through.

Parameters:

specs (Sequence[Spec]) – The list of specs to add.

class otter.task.model.Task(spec: Spec, context: TaskContext)[source]

Bases: TaskReporter

Base class for tasks.

Task is the main building block for a Step. They are the main unit of work in Otter.

The config for a Task is contained in a otter.task.model.Spec object.

A Task can optionally have a list of otter.manifest.model.Artifact, which will contain metadata related to its input input and output and will be added to the step manifest.

Tasks subclass otter.task.model.TaskReporter to provide automatic logging, tracking and error handling.

To implement a new Task:
1. Create a new class that extends Spec with the required config fields.
2. Create a subclass of Task and implement the run and, optionally, validate methods.
final get_state_execution_method() Callable[[...], Task][source]

Get the method to execute based on the task state.

Returns:

The method to execute.

Return type:

Callable[…, Task]

abstract run() Self[source]

Run the task.

This method contains the actual work of a Task. All tasks must implement run.

Optionally, a list of otter.manifest.models.Artifact object can be assigned to self.artifacts in the body of the method. These will be added to the step manifest.

Optionally, an abort event can be watched to stop the task if another fails. This is useful for long running work that can be stopped midway once the run is deemed to be a failure.

Returns:

The Task instance itself must be returned.

Return type:

Self

validate() Self[source]

Validate the task result.

This method should be implemented if the task requires validation. If not implemented, the task will always be considered valid.

The validate method should make use of the v method from the validators module to invoke a series of validators. See otter.validators.v().

Returns:

The Task instance itself must be returned.

Return type:

Self

task.task_registry module

Registry of task classes, used to instantiate tasks from their spec.

class otter.task.task_registry.TaskRegistry(config: Config, scratchpad: Scratchpad)[source]

Bases: object

Task types are registered here.

The registry is where a Task will be instantiated from when the Step is run. It contains the mapping of a task_type to its Task and TaskSpec.

Note

The otter.scratchpad.model.Scratchpad placeholders are replaced into the Spec here, right before the Task is instantiated.

register(package_name: str) None[source]

Register tasks in a package into the registry.

Parameters:

package_name (str) – The name of the package to register tasks from.

Raises:

SystemExit – If the package is not found, modules are missing the expected class, or the class is not found in the module.

instantiate(spec: Spec) Task[source]

Instantiate a Task.

Template replacement is performed here, right before initializing the Task.

Parameters:

spec (Spec) – The spec to instantiate the Task from.

task.task_reporter module

TaskReporter class and report decorator for logging and updating tasks in the manifest.

class otter.task.task_reporter.TaskReporter(name: str)[source]

Bases: object

Class for logging and updating tasks in the manifest.

property artifacts: list[Artifact] | None

Return the Artifacts associated with the Task.

start_run() None[source]

Update a task that has started running.

finish_run() None[source]

Update a task that has finished running.

start_validation() None[source]

Update a task that has started validation.

finish_validation() None[source]

Update a task that has finished validation.

abort() None[source]

Update a task that has been aborted.

fail(error: Exception, where: str) None[source]

Update a task that has failed running or validation.

otter.task.task_reporter.report(func: Callable[..., Task]) Callable[..., Task][source]

Decorator for logging and updating tasks in the manifest.

Module contents

Task module.

otter.task.load_specs(config_path: Path, step_name: str) list[Spec][source]

Load Specs for a Step.

Returns:

The task specs.

Return type:

list[Spec]