tasks package

This package contains the tasks that are run by pis. Any new tasks should be added to this package.

tasks.download module

Simple download task.

class pis.tasks.download.Download(definition: TaskDefinition)[source]

Bases: Task

Simple dowload task.

Downloads a file from a URL to a local destination.

run(*, abort: Event) Self[source]

Download a file from the source URL to the destination path.

validate(*, abort: Event) Self[source]

Check that the downloaded file exists and has a valid size.

pydantic model pis.tasks.download.DownloadDefinition[source]

Bases: TaskDefinition

Configuration fields for the download task.

This task has the following custom configuration fields:
  • source (str): The URL of the file to download.

Show JSON schema
{
   "title": "DownloadDefinition",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "destination": {
         "format": "path",
         "title": "Destination",
         "type": "string"
      },
      "source": {
         "title": "Source",
         "type": "string"
      }
   },
   "additionalProperties": true,
   "required": [
      "name",
      "destination",
      "source"
   ]
}

Config:
  • extra: str = allow

Fields:
field source: str [Required]

tasks.download_latest module

Download the file with the latest modification date among those in a prefix URI.

class pis.tasks.download_latest.DownloadLatest(definition: TaskDefinition)[source]

Bases: Task

Download the file with the latest modification date among those in a prefix URI.

run(*, abort: Event) Self[source]

Run the task.

This method contains the actual work of the task. All tasks must implement run, and it must download or generate a resource in the path stored in the destination field of the task definition.

Optionally, an abort event can be watched to stop the task if another task has failed. This is useful for long running work that can be stopped midway once the run is deemed to be a failure.

Parameters:

abort (Event) – The event that will be set if another task has failed.

Returns:

The task instance itself must be returned.

Return type:

Self

pydantic model pis.tasks.download_latest.DownloadLatestDefinition[source]

Bases: TaskDefinition

Configuration fields for the download_latest task.

This task has the following custom configuration fields:
  • source str: The prefix from where the file with the latest modification date will

    be downloaded.

  • pattern str: Optional. The pattern to match files against. The pattern should be

    a simple string match, preceded by an exclamation mark to exclude files. For example, ‘foo’ will match all files containing ‘foo’, while ‘!foo’ will exclude all files containing ‘foo’.

Show JSON schema
{
   "title": "DownloadLatestDefinition",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "destination": {
         "format": "path",
         "title": "Destination",
         "type": "string"
      },
      "source": {
         "title": "Source",
         "type": "string"
      },
      "pattern": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Pattern"
      }
   },
   "additionalProperties": true,
   "required": [
      "name",
      "destination",
      "source"
   ]
}

Config:
  • extra: str = allow

Fields:
field pattern: str | None = None
field source: str [Required]

tasks.elasticsearch module

Download select fields from all documents in a series of ElasticSearch indexes.

class pis.tasks.elasticsearch.Elasticsearch(definition: TaskDefinition)[source]

Bases: Task

Download select fields from all documents in a series of ElasticSearch indexes.

This task will scan an ElasticSearch index and write the selected fields from each document to a file.

run(*, abort: Event) Self[source]

Run the task.

This method contains the actual work of the task. All tasks must implement run, and it must download or generate a resource in the path stored in the destination field of the task definition.

Optionally, an abort event can be watched to stop the task if another task has failed. This is useful for long running work that can be stopped midway once the run is deemed to be a failure.

Parameters:

abort (Event) – The event that will be set if another task has failed.

Returns:

The task instance itself must be returned.

Return type:

Self

validate(*, abort: Event) Self[source]

Validate the task.

This method should be implemented by the task subclass to perform validation. If not implemented, the task resource will always be considered valid.

The validate method should make use of the v method from the validators module to invoke a series of validators. See pis.validators.v().

Parameters:

abort (Event) – The event that will be set if another task has failed.

Returns:

The task instance itself must be returned.

Return type:

Self

pydantic model pis.tasks.elasticsearch.ElasticsearchDefinition[source]

Bases: TaskDefinition

Configuration fields for the elasticsearch task.

This task has the following custom configuration fields:
  • url (str): The URL of the ElasticSearch instance.

  • destination (str): The path to write the documents to.

  • index (str): The index to scan.

  • fields (list[str]): The fields to include in the documents

Show JSON schema
{
   "title": "ElasticsearchDefinition",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "destination": {
         "format": "path",
         "title": "Destination",
         "type": "string"
      },
      "url": {
         "title": "Url",
         "type": "string"
      },
      "index": {
         "title": "Index",
         "type": "string"
      },
      "fields": {
         "items": {
            "type": "string"
         },
         "title": "Fields",
         "type": "array"
      }
   },
   "additionalProperties": true,
   "required": [
      "name",
      "destination",
      "url",
      "index",
      "fields"
   ]
}

Config:
  • extra: str = allow

Fields:
field destination: Path [Required]
field fields: list[str] [Required]
field index: str [Required]
field url: str [Required]
exception pis.tasks.elasticsearch.ElasticsearchError[source]

Bases: Exception

Base class for Elasticsearch errors.

tasks.explode module

Pretask — explode tasks based on a list of dictionaries.

class pis.tasks.explode.Explode(definition: TaskDefinition)[source]

Bases: Pretask

Pretask — explode tasks based on a list of dictionaries.

This pretask will duplicate the tasks in the do list for each iteration of the foreach list. The foreach list is a list of dictionaries where each dictionary will be used to replace the variables in the tasks in the do list.

If the foreach list is not set, the foreach_function will be called to get the list of dictionaries. The function must return a list of dictionaries.

run(*, abort: Event) Self[source]

Run the task.

This method contains the actual work of the task. All tasks must implement run, and it must download or generate a resource in the path stored in the destination field of the task definition.

Optionally, an abort event can be watched to stop the task if another task has failed. This is useful for long running work that can be stopped midway once the run is deemed to be a failure.

Parameters:

abort (Event) – The event that will be set if another task has failed.

Returns:

The task instance itself must be returned.

Return type:

Self

pydantic model pis.tasks.explode.ExplodeDefinition[source]

Bases: PretaskDefinition

Configuration fields for the explode pretask.

This pretask has the following custom configuration fields:
  • do (list[dict]): The tasks to explode. Each task in the list will be

    duplicated for each iteration of the foreach list.

  • foreach (list[dict[str, str]] | None): The list of dictionaries to iterate

    over. Each item in the list will be used to replace the variables in the tasks in the do list.

  • foreach_function (str | None): If set, this function will be called to get

    the foreach list. The function must return a list of dictionaries.

  • foreach_function_args (dict[str, Any] | None): Arguments to pass to the

    foreach function.

Show JSON schema
{
   "title": "ExplodeDefinition",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "do": {
         "items": {
            "type": "object"
         },
         "title": "Do",
         "type": "array"
      },
      "foreach": {
         "anyOf": [
            {
               "items": {
                  "additionalProperties": {
                     "type": "string"
                  },
                  "type": "object"
               },
               "type": "array"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Foreach"
      },
      "foreach_function": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Foreach Function"
      },
      "foreach_function_args": {
         "anyOf": [
            {
               "type": "object"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Foreach Function Args"
      }
   },
   "additionalProperties": true,
   "required": [
      "name",
      "do"
   ]
}

Config:
  • extra: str = allow

Fields:
field do: list[dict] [Required]
field foreach: list[dict[str, str]] | None = None
field foreach_function: str | None = None
field foreach_function_args: dict[str, Any] | None = None
pis.tasks.explode.urls_from_json(source: str, destination: str, json_path: str, prefix: str | None) list[dict[str, str]][source]

Get a list of URLs from a JSON file.

This function will download a JSON file from a URL, extract a list of URLs from it using a JQ query, and return a list of dictionaries with the source and destination URLs.

Parameters:
  • source (str) – The URL of the JSON file to download.

  • destination (str) – The destination file to save the JSON file to.

  • json_path (str) – The JQ query to extract the URLs from the JSON file.

  • prefix (str | None) – If set, this prefix will be removed from the source URLs to generate the destination file names.

Returns:

A list of dictionaries with the source and destination URLs.

Return type:

list[dict[str, str]]

tasks.hello_world module

Simple hello world task.

class pis.tasks.hello_world.HelloWorld(definition: TaskDefinition)[source]

Bases: Task

Simple hello world task.

run(*, abort: Event) Self[source]

Run the task.

This method contains the actual work of the task. All tasks must implement run, and it must download or generate a resource in the path stored in the destination field of the task definition.

Optionally, an abort event can be watched to stop the task if another task has failed. This is useful for long running work that can be stopped midway once the run is deemed to be a failure.

Parameters:

abort (Event) – The event that will be set if another task has failed.

Returns:

The task instance itself must be returned.

Return type:

Self

validate(*, abort: Event) Self[source]

Validate the task.

This method should be implemented by the task subclass to perform validation. If not implemented, the task resource will always be considered valid.

The validate method should make use of the v method from the validators module to invoke a series of validators. See pis.validators.v().

Parameters:

abort (Event) – The event that will be set if another task has failed.

Returns:

The task instance itself must be returned.

Return type:

Self

pydantic model pis.tasks.hello_world.HelloWorldDefinition[source]

Bases: TaskDefinition

Configuration fields for the hello_world task.

This task has the following custom configuration fields:
  • who (str): The person to greet in the output file.

Show JSON schema
{
   "title": "HelloWorldDefinition",
   "type": "object",
   "properties": {
      "name": {
         "title": "Name",
         "type": "string"
      },
      "destination": {
         "format": "path",
         "title": "Destination",
         "type": "string"
      },
      "who": {
         "default": "world",
         "title": "Who",
         "type": "string"
      }
   },
   "additionalProperties": true,
   "required": [
      "name",
      "destination"
   ]
}

Config:
  • extra: str = allow

Fields:
field who: str = 'world'

Module contents

Task definitions.

This module contains the task and pretask classes.