Skip to content

Variant annotation

gentropy.dataset.variant_annotation.VariantAnnotation dataclass

Bases: Dataset

Dataset with variant-level annotations.

Source code in src/gentropy/dataset/variant_annotation.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@dataclass
class VariantAnnotation(Dataset):
    """Dataset with variant-level annotations."""

    @classmethod
    def get_schema(cls: type[VariantAnnotation]) -> StructType:
        """Provides the schema for the VariantAnnotation dataset.

        Returns:
            StructType: Schema for the VariantAnnotation dataset
        """
        return parse_spark_schema("variant_annotation.json")

    def max_maf(self: VariantAnnotation) -> Column:
        """Maximum minor allele frequency accross all populations.

        Returns:
            Column: Maximum minor allele frequency accross all populations.
        """
        return f.array_max(
            f.transform(
                self.df.alleleFrequencies,
                lambda af: f.when(
                    af.alleleFrequency > 0.5, 1 - af.alleleFrequency
                ).otherwise(af.alleleFrequency),
            )
        )

    def filter_by_variant_df(
        self: VariantAnnotation, df: DataFrame
    ) -> VariantAnnotation:
        """Filter variant annotation dataset by a variant dataframe.

        Args:
            df (DataFrame): A dataframe of variants

        Returns:
            VariantAnnotation: A filtered variant annotation dataset
        """
        self.df = self._df.join(
            f.broadcast(df.select("variantId", "chromosome")),
            on=["variantId", "chromosome"],
            how="inner",
        )
        return self

    def get_transcript_consequence_df(
        self: VariantAnnotation, gene_index: GeneIndex | None = None
    ) -> DataFrame:
        """Dataframe of exploded transcript consequences.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            gene_index (GeneIndex | None): A gene index. Defaults to None.

        Returns:
            DataFrame: A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence
        """
        # exploding the array removes records without VEP annotation
        transript_consequences = self.df.withColumn(
            "transcriptConsequence", f.explode("vep.transcriptConsequences")
        ).select(
            "variantId",
            "chromosome",
            "position",
            "transcriptConsequence",
            f.col("transcriptConsequence.geneId").alias("geneId"),
        )
        if gene_index:
            transript_consequences = transript_consequences.join(
                f.broadcast(gene_index.df),
                on=["chromosome", "geneId"],
            )
        return transript_consequences.persist()

    def get_most_severe_vep_v2g(
        self: VariantAnnotation,
        vep_consequences: DataFrame,
        gene_index: GeneIndex,
    ) -> V2G:
        """Creates a dataset with variant to gene assignments based on VEP's predicted consequence of the transcript.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            vep_consequences (DataFrame): A dataframe of VEP consequences
            gene_index (GeneIndex): A gene index to filter by. Defaults to None.

        Returns:
            V2G: High and medium severity variant to gene assignments
        """
        return V2G(
            _df=self.get_transcript_consequence_df(gene_index)
            .select(
                "variantId",
                "chromosome",
                f.col("transcriptConsequence.geneId").alias("geneId"),
                f.explode("transcriptConsequence.consequenceTerms").alias("label"),
                f.lit("vep").alias("datatypeId"),
                f.lit("variantConsequence").alias("datasourceId"),
            )
            .join(
                f.broadcast(vep_consequences),
                on="label",
                how="inner",
            )
            .drop("label")
            .filter(f.col("score") != 0)
            # A variant can have multiple predicted consequences on a transcript, the most severe one is selected
            .transform(
                lambda df: get_record_with_maximum_value(
                    df, ["variantId", "geneId"], "score"
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_plof_v2g(self: VariantAnnotation, gene_index: GeneIndex) -> V2G:
        """Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            gene_index (GeneIndex): A gene index to filter by.

        Returns:
            V2G: variant to gene assignments from the LOFTEE algorithm
        """
        return V2G(
            _df=(
                self.get_transcript_consequence_df(gene_index)
                .filter(f.col("transcriptConsequence.lof").isNotNull())
                .withColumn(
                    "isHighQualityPlof",
                    f.when(f.col("transcriptConsequence.lof") == "HC", True).when(
                        f.col("transcriptConsequence.lof") == "LC", False
                    ),
                )
                .withColumn(
                    "score",
                    f.when(f.col("isHighQualityPlof"), 1.0).when(
                        ~f.col("isHighQualityPlof"), 0
                    ),
                )
                .select(
                    "variantId",
                    "chromosome",
                    "geneId",
                    "isHighQualityPlof",
                    f.col("score"),
                    f.lit("vep").alias("datatypeId"),
                    f.lit("loftee").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_distance_to_tss(
        self: VariantAnnotation,
        gene_index: GeneIndex,
        max_distance: int = 500_000,
    ) -> V2G:
        """Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

        Args:
            gene_index (GeneIndex): A gene index to filter by.
            max_distance (int): The maximum distance from the TSS to consider. Defaults to 500_000.

        Returns:
            V2G: variant to gene assignments with their distance to the TSS
        """
        return V2G(
            _df=(
                self.df.alias("variant")
                .join(
                    f.broadcast(gene_index.locations_lut()).alias("gene"),
                    on=[
                        f.col("variant.chromosome") == f.col("gene.chromosome"),
                        f.abs(f.col("variant.position") - f.col("gene.tss"))
                        <= max_distance,
                    ],
                    how="inner",
                )
                .withColumn(
                    "distance", f.abs(f.col("variant.position") - f.col("gene.tss"))
                )
                .withColumn(
                    "inverse_distance",
                    max_distance - f.col("distance"),
                )
                .transform(lambda df: normalise_column(df, "inverse_distance", "score"))
                .select(
                    "variantId",
                    f.col("variant.chromosome").alias("chromosome"),
                    "distance",
                    "geneId",
                    "score",
                    f.lit("distance").alias("datatypeId"),
                    f.lit("canonical_tss").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

filter_by_variant_df(df: DataFrame) -> VariantAnnotation

Filter variant annotation dataset by a variant dataframe.

Parameters:

Name Type Description Default
df DataFrame

A dataframe of variants

required

Returns:

Name Type Description
VariantAnnotation VariantAnnotation

A filtered variant annotation dataset

Source code in src/gentropy/dataset/variant_annotation.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def filter_by_variant_df(
    self: VariantAnnotation, df: DataFrame
) -> VariantAnnotation:
    """Filter variant annotation dataset by a variant dataframe.

    Args:
        df (DataFrame): A dataframe of variants

    Returns:
        VariantAnnotation: A filtered variant annotation dataset
    """
    self.df = self._df.join(
        f.broadcast(df.select("variantId", "chromosome")),
        on=["variantId", "chromosome"],
        how="inner",
    )
    return self

get_distance_to_tss(gene_index: GeneIndex, max_distance: int = 500000) -> V2G

Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

Parameters:

Name Type Description Default
gene_index GeneIndex

A gene index to filter by.

required
max_distance int

The maximum distance from the TSS to consider. Defaults to 500_000.

500000

Returns:

Name Type Description
V2G V2G

variant to gene assignments with their distance to the TSS

Source code in src/gentropy/dataset/variant_annotation.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def get_distance_to_tss(
    self: VariantAnnotation,
    gene_index: GeneIndex,
    max_distance: int = 500_000,
) -> V2G:
    """Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

    Args:
        gene_index (GeneIndex): A gene index to filter by.
        max_distance (int): The maximum distance from the TSS to consider. Defaults to 500_000.

    Returns:
        V2G: variant to gene assignments with their distance to the TSS
    """
    return V2G(
        _df=(
            self.df.alias("variant")
            .join(
                f.broadcast(gene_index.locations_lut()).alias("gene"),
                on=[
                    f.col("variant.chromosome") == f.col("gene.chromosome"),
                    f.abs(f.col("variant.position") - f.col("gene.tss"))
                    <= max_distance,
                ],
                how="inner",
            )
            .withColumn(
                "distance", f.abs(f.col("variant.position") - f.col("gene.tss"))
            )
            .withColumn(
                "inverse_distance",
                max_distance - f.col("distance"),
            )
            .transform(lambda df: normalise_column(df, "inverse_distance", "score"))
            .select(
                "variantId",
                f.col("variant.chromosome").alias("chromosome"),
                "distance",
                "geneId",
                "score",
                f.lit("distance").alias("datatypeId"),
                f.lit("canonical_tss").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_most_severe_vep_v2g(vep_consequences: DataFrame, gene_index: GeneIndex) -> V2G

Creates a dataset with variant to gene assignments based on VEP's predicted consequence of the transcript.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
vep_consequences DataFrame

A dataframe of VEP consequences

required
gene_index GeneIndex

A gene index to filter by. Defaults to None.

required

Returns:

Name Type Description
V2G V2G

High and medium severity variant to gene assignments

Source code in src/gentropy/dataset/variant_annotation.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def get_most_severe_vep_v2g(
    self: VariantAnnotation,
    vep_consequences: DataFrame,
    gene_index: GeneIndex,
) -> V2G:
    """Creates a dataset with variant to gene assignments based on VEP's predicted consequence of the transcript.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        vep_consequences (DataFrame): A dataframe of VEP consequences
        gene_index (GeneIndex): A gene index to filter by. Defaults to None.

    Returns:
        V2G: High and medium severity variant to gene assignments
    """
    return V2G(
        _df=self.get_transcript_consequence_df(gene_index)
        .select(
            "variantId",
            "chromosome",
            f.col("transcriptConsequence.geneId").alias("geneId"),
            f.explode("transcriptConsequence.consequenceTerms").alias("label"),
            f.lit("vep").alias("datatypeId"),
            f.lit("variantConsequence").alias("datasourceId"),
        )
        .join(
            f.broadcast(vep_consequences),
            on="label",
            how="inner",
        )
        .drop("label")
        .filter(f.col("score") != 0)
        # A variant can have multiple predicted consequences on a transcript, the most severe one is selected
        .transform(
            lambda df: get_record_with_maximum_value(
                df, ["variantId", "geneId"], "score"
            )
        ),
        _schema=V2G.get_schema(),
    )

get_plof_v2g(gene_index: GeneIndex) -> V2G

Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
gene_index GeneIndex

A gene index to filter by.

required

Returns:

Name Type Description
V2G V2G

variant to gene assignments from the LOFTEE algorithm

Source code in src/gentropy/dataset/variant_annotation.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def get_plof_v2g(self: VariantAnnotation, gene_index: GeneIndex) -> V2G:
    """Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        gene_index (GeneIndex): A gene index to filter by.

    Returns:
        V2G: variant to gene assignments from the LOFTEE algorithm
    """
    return V2G(
        _df=(
            self.get_transcript_consequence_df(gene_index)
            .filter(f.col("transcriptConsequence.lof").isNotNull())
            .withColumn(
                "isHighQualityPlof",
                f.when(f.col("transcriptConsequence.lof") == "HC", True).when(
                    f.col("transcriptConsequence.lof") == "LC", False
                ),
            )
            .withColumn(
                "score",
                f.when(f.col("isHighQualityPlof"), 1.0).when(
                    ~f.col("isHighQualityPlof"), 0
                ),
            )
            .select(
                "variantId",
                "chromosome",
                "geneId",
                "isHighQualityPlof",
                f.col("score"),
                f.lit("vep").alias("datatypeId"),
                f.lit("loftee").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_schema() -> StructType classmethod

Provides the schema for the VariantAnnotation dataset.

Returns:

Name Type Description
StructType StructType

Schema for the VariantAnnotation dataset

Source code in src/gentropy/dataset/variant_annotation.py
28
29
30
31
32
33
34
35
@classmethod
def get_schema(cls: type[VariantAnnotation]) -> StructType:
    """Provides the schema for the VariantAnnotation dataset.

    Returns:
        StructType: Schema for the VariantAnnotation dataset
    """
    return parse_spark_schema("variant_annotation.json")

get_transcript_consequence_df(gene_index: GeneIndex | None = None) -> DataFrame

Dataframe of exploded transcript consequences.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
gene_index GeneIndex | None

A gene index. Defaults to None.

None

Returns:

Name Type Description
DataFrame DataFrame

A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence

Source code in src/gentropy/dataset/variant_annotation.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_transcript_consequence_df(
    self: VariantAnnotation, gene_index: GeneIndex | None = None
) -> DataFrame:
    """Dataframe of exploded transcript consequences.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        gene_index (GeneIndex | None): A gene index. Defaults to None.

    Returns:
        DataFrame: A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence
    """
    # exploding the array removes records without VEP annotation
    transript_consequences = self.df.withColumn(
        "transcriptConsequence", f.explode("vep.transcriptConsequences")
    ).select(
        "variantId",
        "chromosome",
        "position",
        "transcriptConsequence",
        f.col("transcriptConsequence.geneId").alias("geneId"),
    )
    if gene_index:
        transript_consequences = transript_consequences.join(
            f.broadcast(gene_index.df),
            on=["chromosome", "geneId"],
        )
    return transript_consequences.persist()

max_maf() -> Column

Maximum minor allele frequency accross all populations.

Returns:

Name Type Description
Column Column

Maximum minor allele frequency accross all populations.

Source code in src/gentropy/dataset/variant_annotation.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def max_maf(self: VariantAnnotation) -> Column:
    """Maximum minor allele frequency accross all populations.

    Returns:
        Column: Maximum minor allele frequency accross all populations.
    """
    return f.array_max(
        f.transform(
            self.df.alleleFrequencies,
            lambda af: f.when(
                af.alleleFrequency > 0.5, 1 - af.alleleFrequency
            ).otherwise(af.alleleFrequency),
        )
    )

Schema

root
 |-- variantId: string (nullable = false)
 |-- chromosome: string (nullable = false)
 |-- position: integer (nullable = false)
 |-- referenceAllele: string (nullable = false)
 |-- alternateAllele: string (nullable = false)
 |-- chromosomeB37: string (nullable = true)
 |-- positionB37: integer (nullable = true)
 |-- alleleType: string (nullable = true)
 |-- rsIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- alleleFrequencies: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- populationName: string (nullable = true)
 |    |    |-- alleleFrequency: double (nullable = true)
 |-- inSilicoPredictors: struct (nullable = false)
 |    |-- cadd: struct (nullable = true)
 |    |    |-- raw: float (nullable = true)
 |    |    |-- phred: float (nullable = true)
 |    |-- revelMax: double (nullable = true)
 |    |-- spliceaiDsMax: float (nullable = true)
 |    |-- pangolinLargestDs: double (nullable = true)
 |    |-- phylop: double (nullable = true)
 |    |-- siftMax: double (nullable = true)
 |    |-- polyphenMax: double (nullable = true)
 |-- vep: struct (nullable = false)
 |    |-- mostSevereConsequence: string (nullable = true)
 |    |-- transcriptConsequences: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- aminoAcids: string (nullable = true)
 |    |    |    |-- consequenceTerms: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- geneId: string (nullable = true)
 |    |    |    |-- lof: string (nullable = true)