Skip to content

variant_index

gentropy.variant_index.VariantIndexStep

Generate variant index based on a VEP output in json format.

The variant index is a dataset that contains variant annotations extracted from VEP output. It is expected that all variants in the VEP output are present in the variant index. There's an option to provide extra variant annotations to be added to the variant index eg. allele frequencies from GnomAD.

Source code in src/gentropy/variant_index.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class VariantIndexStep:
    """Generate variant index based on a VEP output in json format.

    The variant index is a dataset that contains variant annotations extracted from VEP output. It is expected that all variants in the VEP output are present in the variant index.
    There's an option to provide extra variant annotations to be added to the variant index eg. allele frequencies from GnomAD.
    """

    def __init__(
        self: VariantIndexStep,
        session: Session,
        vep_output_json_path: str,
        variant_index_path: str,
        hash_threshold: int,
        variant_annotations_path: list[str] | None = None,
        amino_acid_change_annotations: list[str] | None = None,
    ) -> None:
        """Run VariantIndex step.

        Args:
            session (Session): Session object.
            vep_output_json_path (str): Variant effect predictor output path (in json format).
            variant_index_path (str): Variant index dataset path to save resulting data.
            hash_threshold (int): Hash threshold for variant identifier length.
            variant_annotations_path (list[str] | None): List of paths to extra variant annotation datasets.
            amino_acid_change_annotations (list[str] | None): list of paths to amino-acid based variant annotations.
        """
        # Extract variant annotations from VEP output:
        variant_index = VariantEffectPredictorParser.extract_variant_index_from_vep(
            session.spark, vep_output_json_path, hash_threshold
        )

        # Process variant annotations if provided:
        if variant_annotations_path:
            for annotation_path in variant_annotations_path:
                # Read variant annotations from parquet:
                annotations = VariantIndex.from_parquet(
                    session=session,
                    path=annotation_path,
                    recursiveFileLookup=True,
                    id_threshold=hash_threshold,
                )

                # Update index with extra annotations:
                variant_index = variant_index.add_annotation(annotations)

        # If provided read amino-acid based annotation and enrich variant index:
        if amino_acid_change_annotations:
            for annotation_path in amino_acid_change_annotations:
                annotation_data = AminoAcidVariants.from_parquet(
                    session, annotation_path
                )

                # Update index with extra annotations:
                variant_index = variant_index.annotate_with_amino_acid_consequences(
                    annotation_data
                )

        (
            variant_index.df.repartitionByRange(
                session.output_partitions, "chromosome", "position"
            )
            .sortWithinPartitions("chromosome", "position")
            .write.mode(session.write_mode)
            .parquet(variant_index_path)
        )

__init__(session: Session, vep_output_json_path: str, variant_index_path: str, hash_threshold: int, variant_annotations_path: list[str] | None = None, amino_acid_change_annotations: list[str] | None = None) -> None

Run VariantIndex step.

Parameters:

Name Type Description Default
session Session

Session object.

required
vep_output_json_path str

Variant effect predictor output path (in json format).

required
variant_index_path str

Variant index dataset path to save resulting data.

required
hash_threshold int

Hash threshold for variant identifier length.

required
variant_annotations_path list[str] | None

List of paths to extra variant annotation datasets.

None
amino_acid_change_annotations list[str] | None

list of paths to amino-acid based variant annotations.

None
Source code in src/gentropy/variant_index.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def __init__(
    self: VariantIndexStep,
    session: Session,
    vep_output_json_path: str,
    variant_index_path: str,
    hash_threshold: int,
    variant_annotations_path: list[str] | None = None,
    amino_acid_change_annotations: list[str] | None = None,
) -> None:
    """Run VariantIndex step.

    Args:
        session (Session): Session object.
        vep_output_json_path (str): Variant effect predictor output path (in json format).
        variant_index_path (str): Variant index dataset path to save resulting data.
        hash_threshold (int): Hash threshold for variant identifier length.
        variant_annotations_path (list[str] | None): List of paths to extra variant annotation datasets.
        amino_acid_change_annotations (list[str] | None): list of paths to amino-acid based variant annotations.
    """
    # Extract variant annotations from VEP output:
    variant_index = VariantEffectPredictorParser.extract_variant_index_from_vep(
        session.spark, vep_output_json_path, hash_threshold
    )

    # Process variant annotations if provided:
    if variant_annotations_path:
        for annotation_path in variant_annotations_path:
            # Read variant annotations from parquet:
            annotations = VariantIndex.from_parquet(
                session=session,
                path=annotation_path,
                recursiveFileLookup=True,
                id_threshold=hash_threshold,
            )

            # Update index with extra annotations:
            variant_index = variant_index.add_annotation(annotations)

    # If provided read amino-acid based annotation and enrich variant index:
    if amino_acid_change_annotations:
        for annotation_path in amino_acid_change_annotations:
            annotation_data = AminoAcidVariants.from_parquet(
                session, annotation_path
            )

            # Update index with extra annotations:
            variant_index = variant_index.annotate_with_amino_acid_consequences(
                annotation_data
            )

    (
        variant_index.df.repartitionByRange(
            session.output_partitions, "chromosome", "position"
        )
        .sortWithinPartitions("chromosome", "position")
        .write.mode(session.write_mode)
        .parquet(variant_index_path)
    )