Built-in tasks

The otter.tasks package contains the built-in task types.

tasks.hello_world module

Simple hello world example.

class otter.tasks.hello_world.HelloWorldSpec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, who: str | None = 'world', **extra_data: Any)[source]

Bases: Spec

Configuration fields for the hello_world task.

who: str | None

The person to greet.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.tasks.hello_world.HelloWorld(spec: HelloWorldSpec, context: TaskContext)[source]

Bases: Task

Simple hello world example.

spec: HelloWorldSpec
async run() Self[source]

Say hello, then create an artifact about it.

async validate() Self[source]

Dummy validation step.

tasks.copy module

Copy a file.

class otter.tasks.copy.CopySpec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, source: str, destination: str, **extra_data: Any)[source]

Bases: Spec

Configuration fields for the copy task.

source: str

The source URI of the file to copy. Must be absolute.

destination: str

The destination for the file, relative to the release root.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.tasks.copy.Copy(spec: CopySpec, context: TaskContext)[source]

Bases: Task

Copy a file.

Copies a file from an external source to a destination inside the release. If no release_uri is provided in the configuration, the file will be downloaded to the local work_path.

Note

source must be absolute. This task is intended for external resources.

Note

destination will be prepended with either otter.config.model.Config.release_uri or otter.config.model.Config.work_path config fields.

spec: CopySpec
async validate() Self[source]

Check that the copied file exists and has a valid size.

tasks.download module

Download a file.

class otter.tasks.download.DownloadSpec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, source: str, **extra_data: Any)[source]

Bases: Spec

Configuration fields for the download task.

source: str

The source location, relative to the release, of the file to download.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.tasks.download.Download(spec: DownloadSpec, context: TaskContext)[source]

Bases: Task

Download a file.

Downloads a file from source to a local destination. The source must be relative to the release root. This should be used after a copy task has put that artifact into the release, as modifying an external resource would break reproducibility.

The destination will be the same as the source, but relative to the local work_path.

This task should only be used when a later task needs an artifact to exist locally for some reason. In most cases, downloading files should be avoided. Instead:

  • For copying artifacts into a release, use the copy task.

  • For transforming data, whenever possible, open files already copied and work

    with them directly.

Note

This task will not generate an artifact, as the downloaded file will stay local only. It is a responsibility of subsequent tasks to put the file in the release and generate the corresponding artifact. This is easy because the relative part of the path once downloaded will be the same as the one in the release.

spec: DownloadSpec
async validate() Self[source]

Check that the downloaded file exists and has a valid size.

tasks.explode module

Generate more tasks based on a list.

class otter.tasks.explode.ExplodeSpec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, do: list[Spec], foreach: list[str], each_placeholder: str = 'each', **extra_data: Any)[source]

Bases: Spec

Configuration fields for the explode task.

do: list[Spec]

The tasks to explode. Each task in the list will be duplicated for each iteration of the foreach list.

foreach: list[str]

The list of values to iterate over.

each_placeholder: str

The placeholder string to use for the current iteration value. The value of this field, e.g. each, will be replaced by each of the entries in the foreach list.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.tasks.explode.Explode(spec: ExplodeSpec, context: TaskContext)[source]

Bases: Task

Generate more tasks based on a list.

This task will duplicate the specs in the do list for each entry in the foreach list.

Inside of the specs in the do list, the string each_placeholder can be used as as a sentinel to refer to the current iteration value.

Warning

The ${each_placeholder} placeholder MUST be present in the otter.task.model.Spec.name of the new specs that are defined inside do, as otherwise all of them will have the same name, and name must be unique.

If you do a nested explode, the inner explode will have spec names that are identical to it’s sibling specs spawned during the outer explode. Since the spec names need to be unique, you should also include the outer explode’s placeholder in the inner explode’s spec names to avoid conlicts. For example, if you have an outer explode with each_placeholder: outer and an inner explode with each_placeholder: inner, you might name a spec in the inner explode name: process ${outer} and ${inner} data to ensure uniqueness.

Example:

steps:
    - explode species:
    foreach:
        - homo_sapiens
        - mus_musculus
        - drosophila_melanogaster
    each_placeholder: explode_each
    do:
        - name: copy ${explode_each} genes
          source: https://example.com/genes/${explode_each}/file.tsv
          destination: genes-${explode_each}.tsv
        - name: copy ${explode_each} proteins
          source: https://example.com/proteins/${explode_each}/file.tsv
          destination: proteins-${explode_each}.tsv

Keep in mind this replacement of explode_each will only be done in strings, not lists or sub-objects.

spec: ExplodeSpec

tasks.explode_glob module

Generate more tasks based on a glob.

class otter.tasks.explode_glob.ExplodeGlobSpec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, glob: str, do: list[Spec], **extra_data: Any)[source]

Bases: Spec

Configuration fields for the explode task.

glob: str

The glob expression.

do: list[Spec]

The tasks to explode. Each task in the list will be duplicated for each iteration of the foreach list.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.tasks.explode_glob.ExplodeGlob(spec: ExplodeGlobSpec, context: TaskContext)[source]

Bases: Task

Generate more tasks based on a glob.

This task will duplicate the specs in the do list for each entry in a list coming from a glob expression.

The task will add the following keys to a local scratchpad:

  • match_prefix: the path up to the glob pattern and relative to

    otter.config.model.Config.release_uri or otter.config.model.Config.work_path if the source is a relative location.

  • match_path: the part of the path that the glob matched without the file name.

  • match_stem: the file name of the matched file without the extension.

  • match_ext: the file extensions of the matched file, without the dot.

  • uri: ${match_prefix}/${match_path}/${match_stem}.${match_ext}

  • uuid: an UUID4, in case it is needed to generate unique names.

Note

${uri} will be either an absolute URL or a path relative to either otter.config.model.Config.release_uri or otter.config.model.Config.work_path depending on whether the source itself is absolute or relative.

Note

Forming a path with ${match_prefix}/${match_path}${match_stem} when match_path is empty would cause double slashes to be introduced. These are automatically removed. GCS paths like gs://bucket/////file are not supported by this task.

- name: explode_glob items
  glob: 'gs://release-25/input/items/**/*.json'
  do:
    - name: transform ${match_stem} into parquet
      source: ${match_path}/${match_stem}.${match_ext}
      destination: intermediate/${match_path}/${math_stem}.parquet

for a bucket containing two files:

gs://release-25/input/items/furniture/chair.json
gs://release-25/input/items/furniture/table.json

And release_uri set to gs://release-25

the values will be:

Scratchpad values for the first task

key

value

match_prefix

input/items/

match_path

furniture

match_stem

chair

match_ext

json

uri

input/items/furniture/chair.json

uuid

<uuid>

the first task will be duplicated twice, with the following specs:

- name: transform chair into parquet
  source: input/items/furniture/chair.json
  destination: intermediate/furniture/chair.parquet
- name: transform table into parquet
  source: input/items/furniture/table.json
  destination: intermediate/furniture/table.parquet
spec: ExplodeGlobSpec
scratchpad

Internal scratchpad used to replace values in subtask specs.

tasks.find_latest module

Find the last-modified file among those in a prefix URI.

class otter.tasks.find_latest.FindLatestSpec(*, name: str, requires: list[str] = [], scratchpad_ignore_missing: bool = False, source: str, scratchpad_key: str | None = None, **extra_data: Any)[source]

Bases: Spec

Configuration fields for the find_latest task.

source: str

The prefix from where the file with the latest modification date will be found. It can include glob patterns.

scratchpad_key: str | None

The scratchpad key where the path of the latest file will be stored. Defaults to the task name.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class otter.tasks.find_latest.FindLatest(spec: FindLatestSpec, context: TaskContext)[source]

Bases: Task

Find the last-modified file among those in a prefix URI.

spec: FindLatestSpec

Module contents

Builtin tasks.