Skip to content

Variant annotation

Bases: Dataset

Dataset with variant-level annotations derived from GnomAD.

Source code in src/otg/dataset/variant_annotation.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
@dataclass
class VariantAnnotation(Dataset):
    """Dataset with variant-level annotations derived from GnomAD."""

    @classmethod
    def get_schema(cls: type[VariantAnnotation]) -> StructType:
        """Provides the schema for the VariantAnnotation dataset."""
        return parse_spark_schema("variant_annotation.json")

    @classmethod
    def from_gnomad(
        cls: type[VariantAnnotation],
        gnomad_file: str,
        grch38_to_grch37_chain: str,
        populations: list,
    ) -> VariantAnnotation:
        """Generate variant annotation dataset from gnomAD.

        Some relevant modifications to the original dataset are:

        1. The transcript consequences features provided by VEP are filtered to only refer to the Ensembl canonical transcript.
        2. Genome coordinates are liftovered from GRCh38 to GRCh37 to keep as annotation.
        3. Field names are converted to camel case to follow the convention.

        Args:
            gnomad_file (str): Path to `gnomad.genomes.vX.X.X.sites.ht` gnomAD dataset
            grch38_to_grch37_chain (str): Path to chain file for liftover
            populations (list): List of populations to include in the dataset

        Returns:
            VariantAnnotation: Variant annotation dataset
        """
        # Load variants dataset
        ht = hl.read_table(
            gnomad_file,
            _load_refs=False,
        )

        # Liftover
        grch37 = hl.get_reference("GRCh37")
        grch38 = hl.get_reference("GRCh38")
        grch38.add_liftover(grch38_to_grch37_chain, grch37)

        # Drop non biallelic variants
        ht = ht.filter(ht.alleles.length() == 2)
        # Liftover
        ht = ht.annotate(locus_GRCh37=hl.liftover(ht.locus, "GRCh37"))
        # Select relevant fields and nested records to create class
        return cls(
            _df=(
                ht.select(
                    gnomad3VariantId=hl.str("-").join(
                        [
                            ht.locus.contig.replace("chr", ""),
                            hl.str(ht.locus.position),
                            ht.alleles[0],
                            ht.alleles[1],
                        ]
                    ),
                    chromosome=ht.locus.contig.replace("chr", ""),
                    position=convert_gnomad_position_to_ensembl_hail(
                        ht.locus.position, ht.alleles[0], ht.alleles[1]
                    ),
                    variantId=hl.str("_").join(
                        [
                            ht.locus.contig.replace("chr", ""),
                            hl.str(
                                convert_gnomad_position_to_ensembl_hail(
                                    ht.locus.position, ht.alleles[0], ht.alleles[1]
                                )
                            ),
                            ht.alleles[0],
                            ht.alleles[1],
                        ]
                    ),
                    chromosomeB37=ht.locus_GRCh37.contig.replace("chr", ""),
                    positionB37=ht.locus_GRCh37.position,
                    referenceAllele=ht.alleles[0],
                    alternateAllele=ht.alleles[1],
                    rsIds=ht.rsid,
                    alleleType=ht.allele_info.allele_type,
                    cadd=hl.struct(
                        phred=ht.cadd.phred,
                        raw=ht.cadd.raw_score,
                    ),
                    alleleFrequencies=hl.set([f"{pop}-adj" for pop in populations]).map(
                        lambda p: hl.struct(
                            populationName=p,
                            alleleFrequency=ht.freq[ht.globals.freq_index_dict[p]].AF,
                        )
                    ),
                    vep=hl.struct(
                        mostSevereConsequence=ht.vep.most_severe_consequence,
                        transcriptConsequences=hl.map(
                            lambda x: hl.struct(
                                aminoAcids=x.amino_acids,
                                consequenceTerms=x.consequence_terms,
                                geneId=x.gene_id,
                                lof=x.lof,
                                polyphenScore=x.polyphen_score,
                                polyphenPrediction=x.polyphen_prediction,
                                siftScore=x.sift_score,
                                siftPrediction=x.sift_prediction,
                            ),
                            # Only keeping canonical transcripts
                            ht.vep.transcript_consequences.filter(
                                lambda x: (x.canonical == 1)
                                & (x.gene_symbol_source == "HGNC")
                            ),
                        ),
                    ),
                )
                .key_by("chromosome", "position")
                .drop("locus", "alleles")
                .select_globals()
                .to_spark(flatten=False)
            ),
            _schema=cls.get_schema(),
        )

    def max_maf(self: VariantAnnotation) -> Column:
        """Maximum minor allele frequency accross all populations.

        Returns:
            Column: Maximum minor allele frequency accross all populations.
        """
        return f.array_max(
            f.transform(
                self.df.alleleFrequencies,
                lambda af: f.when(
                    af.alleleFrequency > 0.5, 1 - af.alleleFrequency
                ).otherwise(af.alleleFrequency),
            )
        )

    def filter_by_variant_df(
        self: VariantAnnotation, df: DataFrame, cols: list[str]
    ) -> VariantAnnotation:
        """Filter variant annotation dataset by a variant dataframe.

        Args:
            df (DataFrame): A dataframe of variants
            cols (List[str]): A list of columns to join on

        Returns:
            VariantAnnotation: A filtered variant annotation dataset
        """
        self.df = self._df.join(f.broadcast(df.select(cols)), on=cols, how="inner")
        return self

    def get_transcript_consequence_df(
        self: VariantAnnotation, filter_by: Optional[GeneIndex] = None
    ) -> DataFrame:
        """Dataframe of exploded transcript consequences.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            filter_by (GeneIndex): A gene index. Defaults to None.

        Returns:
            DataFrame: A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence
        """
        # exploding the array removes records without VEP annotation
        transript_consequences = self.df.withColumn(
            "transcriptConsequence", f.explode("vep.transcriptConsequences")
        ).select(
            "variantId",
            "chromosome",
            "position",
            "transcriptConsequence",
            f.col("transcriptConsequence.geneId").alias("geneId"),
        )
        if filter_by:
            transript_consequences = transript_consequences.join(
                f.broadcast(filter_by.df),
                on=["chromosome", "geneId"],
            )
        return transript_consequences.persist()

    def get_most_severe_vep_v2g(
        self: VariantAnnotation,
        vep_consequences: DataFrame,
        filter_by: GeneIndex,
    ) -> V2G:
        """Creates a dataset with variant to gene assignments based on VEP's predicted consequence on the transcript.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            vep_consequences (DataFrame): A dataframe of VEP consequences
            filter_by (GeneIndex): A gene index to filter by. Defaults to None.

        Returns:
            V2G: High and medium severity variant to gene assignments
        """
        vep_lut = vep_consequences.select(
            f.element_at(f.split("Accession", r"/"), -1).alias(
                "variantFunctionalConsequenceId"
            ),
            f.col("Term").alias("label"),
            f.col("v2g_score").cast("double").alias("score"),
        )

        return V2G(
            _df=self.get_transcript_consequence_df(filter_by).select(
                "variantId",
                "chromosome",
                "position",
                f.col("transcriptConsequence.geneId").alias("geneId"),
                f.explode("transcriptConsequence.consequenceTerms").alias("label"),
                f.lit("vep").alias("datatypeId"),
                f.lit("variantConsequence").alias("datasourceId"),
            )
            # A variant can have multiple predicted consequences on a transcript, the most severe one is selected
            .join(
                f.broadcast(vep_lut),
                on="label",
                how="inner",
            )
            .filter(f.col("score") != 0)
            .transform(
                lambda df: get_record_with_maximum_value(
                    df, ["variantId", "geneId"], "score"
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_polyphen_v2g(
        self: VariantAnnotation, filter_by: Optional[GeneIndex] = None
    ) -> V2G:
        """Creates a dataset with variant to gene assignments with a PolyPhen's predicted score on the transcript.

        Polyphen informs about the probability that a substitution is damaging. Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            filter_by (GeneIndex): A gene index to filter by. Defaults to None.

        Returns:
            V2G: variant to gene assignments with their polyphen scores
        """
        return V2G(
            _df=(
                self.get_transcript_consequence_df(filter_by)
                .filter(f.col("transcriptConsequence.polyphenScore").isNotNull())
                .select(
                    "variantId",
                    "chromosome",
                    "position",
                    "geneId",
                    f.col("transcriptConsequence.polyphenScore").alias("score"),
                    f.col("transcriptConsequence.polyphenPrediction").alias("label"),
                    f.lit("vep").alias("datatypeId"),
                    f.lit("polyphen").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_sift_v2g(self: VariantAnnotation, filter_by: GeneIndex) -> V2G:
        """Creates a dataset with variant to gene assignments with a SIFT's predicted score on the transcript.

        SIFT informs about the probability that a substitution is tolerated so scores nearer zero are more likely to be deleterious.
        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            filter_by (GeneIndex): A gene index to filter by.

        Returns:
            V2G: variant to gene assignments with their SIFT scores
        """
        return V2G(
            _df=(
                self.get_transcript_consequence_df(filter_by)
                .filter(f.col("transcriptConsequence.siftScore").isNotNull())
                .select(
                    "variantId",
                    "chromosome",
                    "position",
                    "geneId",
                    f.expr("1 - transcriptConsequence.siftScore").alias("score"),
                    f.col("transcriptConsequence.siftPrediction").alias("label"),
                    f.lit("vep").alias("datatypeId"),
                    f.lit("sift").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_plof_v2g(self: VariantAnnotation, filter_by: GeneIndex) -> V2G:
        """Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            filter_by (GeneIndex): A gene index to filter by.

        Returns:
            V2G: variant to gene assignments from the LOFTEE algorithm
        """
        return V2G(
            _df=(
                self.get_transcript_consequence_df(filter_by)
                .filter(f.col("transcriptConsequence.lof").isNotNull())
                .withColumn(
                    "isHighQualityPlof",
                    f.when(f.col("transcriptConsequence.lof") == "HC", True).when(
                        f.col("transcriptConsequence.lof") == "LC", False
                    ),
                )
                .withColumn(
                    "score",
                    f.when(f.col("isHighQualityPlof"), 1.0).when(
                        ~f.col("isHighQualityPlof"), 0
                    ),
                )
                .select(
                    "variantId",
                    "chromosome",
                    "position",
                    "geneId",
                    "isHighQualityPlof",
                    f.col("score"),
                    f.lit("vep").alias("datatypeId"),
                    f.lit("loftee").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_distance_to_tss(
        self: VariantAnnotation,
        filter_by: GeneIndex,
        max_distance: int = 500_000,
    ) -> V2G:
        """Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

        Args:
            filter_by (GeneIndex): A gene index to filter by.
            max_distance (int): The maximum distance from the TSS to consider. Defaults to 500_000.

        Returns:
            V2G: variant to gene assignments with their distance to the TSS
        """
        return V2G(
            _df=(
                self.df.alias("variant")
                .join(
                    f.broadcast(filter_by.locations_lut()).alias("gene"),
                    on=[
                        f.col("variant.chromosome") == f.col("gene.chromosome"),
                        f.abs(f.col("variant.position") - f.col("gene.tss"))
                        <= max_distance,
                    ],
                    how="inner",
                )
                .withColumn(
                    "inverse_distance",
                    max_distance - f.abs(f.col("variant.position") - f.col("gene.tss")),
                )
                .transform(lambda df: normalise_column(df, "inverse_distance", "score"))
                .select(
                    "variantId",
                    f.col("variant.chromosome").alias("chromosome"),
                    "position",
                    "geneId",
                    "score",
                    f.lit("distance").alias("datatypeId"),
                    f.lit("canonical_tss").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

filter_by_variant_df(df, cols)

Filter variant annotation dataset by a variant dataframe.

Parameters:

Name Type Description Default
df DataFrame

A dataframe of variants

required
cols List[str]

A list of columns to join on

required

Returns:

Name Type Description
VariantAnnotation VariantAnnotation

A filtered variant annotation dataset

Source code in src/otg/dataset/variant_annotation.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def filter_by_variant_df(
    self: VariantAnnotation, df: DataFrame, cols: list[str]
) -> VariantAnnotation:
    """Filter variant annotation dataset by a variant dataframe.

    Args:
        df (DataFrame): A dataframe of variants
        cols (List[str]): A list of columns to join on

    Returns:
        VariantAnnotation: A filtered variant annotation dataset
    """
    self.df = self._df.join(f.broadcast(df.select(cols)), on=cols, how="inner")
    return self

from_gnomad(gnomad_file, grch38_to_grch37_chain, populations) classmethod

Generate variant annotation dataset from gnomAD.

Some relevant modifications to the original dataset are:

  1. The transcript consequences features provided by VEP are filtered to only refer to the Ensembl canonical transcript.
  2. Genome coordinates are liftovered from GRCh38 to GRCh37 to keep as annotation.
  3. Field names are converted to camel case to follow the convention.

Parameters:

Name Type Description Default
gnomad_file str

Path to gnomad.genomes.vX.X.X.sites.ht gnomAD dataset

required
grch38_to_grch37_chain str

Path to chain file for liftover

required
populations list

List of populations to include in the dataset

required

Returns:

Name Type Description
VariantAnnotation VariantAnnotation

Variant annotation dataset

Source code in src/otg/dataset/variant_annotation.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@classmethod
def from_gnomad(
    cls: type[VariantAnnotation],
    gnomad_file: str,
    grch38_to_grch37_chain: str,
    populations: list,
) -> VariantAnnotation:
    """Generate variant annotation dataset from gnomAD.

    Some relevant modifications to the original dataset are:

    1. The transcript consequences features provided by VEP are filtered to only refer to the Ensembl canonical transcript.
    2. Genome coordinates are liftovered from GRCh38 to GRCh37 to keep as annotation.
    3. Field names are converted to camel case to follow the convention.

    Args:
        gnomad_file (str): Path to `gnomad.genomes.vX.X.X.sites.ht` gnomAD dataset
        grch38_to_grch37_chain (str): Path to chain file for liftover
        populations (list): List of populations to include in the dataset

    Returns:
        VariantAnnotation: Variant annotation dataset
    """
    # Load variants dataset
    ht = hl.read_table(
        gnomad_file,
        _load_refs=False,
    )

    # Liftover
    grch37 = hl.get_reference("GRCh37")
    grch38 = hl.get_reference("GRCh38")
    grch38.add_liftover(grch38_to_grch37_chain, grch37)

    # Drop non biallelic variants
    ht = ht.filter(ht.alleles.length() == 2)
    # Liftover
    ht = ht.annotate(locus_GRCh37=hl.liftover(ht.locus, "GRCh37"))
    # Select relevant fields and nested records to create class
    return cls(
        _df=(
            ht.select(
                gnomad3VariantId=hl.str("-").join(
                    [
                        ht.locus.contig.replace("chr", ""),
                        hl.str(ht.locus.position),
                        ht.alleles[0],
                        ht.alleles[1],
                    ]
                ),
                chromosome=ht.locus.contig.replace("chr", ""),
                position=convert_gnomad_position_to_ensembl_hail(
                    ht.locus.position, ht.alleles[0], ht.alleles[1]
                ),
                variantId=hl.str("_").join(
                    [
                        ht.locus.contig.replace("chr", ""),
                        hl.str(
                            convert_gnomad_position_to_ensembl_hail(
                                ht.locus.position, ht.alleles[0], ht.alleles[1]
                            )
                        ),
                        ht.alleles[0],
                        ht.alleles[1],
                    ]
                ),
                chromosomeB37=ht.locus_GRCh37.contig.replace("chr", ""),
                positionB37=ht.locus_GRCh37.position,
                referenceAllele=ht.alleles[0],
                alternateAllele=ht.alleles[1],
                rsIds=ht.rsid,
                alleleType=ht.allele_info.allele_type,
                cadd=hl.struct(
                    phred=ht.cadd.phred,
                    raw=ht.cadd.raw_score,
                ),
                alleleFrequencies=hl.set([f"{pop}-adj" for pop in populations]).map(
                    lambda p: hl.struct(
                        populationName=p,
                        alleleFrequency=ht.freq[ht.globals.freq_index_dict[p]].AF,
                    )
                ),
                vep=hl.struct(
                    mostSevereConsequence=ht.vep.most_severe_consequence,
                    transcriptConsequences=hl.map(
                        lambda x: hl.struct(
                            aminoAcids=x.amino_acids,
                            consequenceTerms=x.consequence_terms,
                            geneId=x.gene_id,
                            lof=x.lof,
                            polyphenScore=x.polyphen_score,
                            polyphenPrediction=x.polyphen_prediction,
                            siftScore=x.sift_score,
                            siftPrediction=x.sift_prediction,
                        ),
                        # Only keeping canonical transcripts
                        ht.vep.transcript_consequences.filter(
                            lambda x: (x.canonical == 1)
                            & (x.gene_symbol_source == "HGNC")
                        ),
                    ),
                ),
            )
            .key_by("chromosome", "position")
            .drop("locus", "alleles")
            .select_globals()
            .to_spark(flatten=False)
        ),
        _schema=cls.get_schema(),
    )

get_distance_to_tss(filter_by, max_distance=500000)

Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

Parameters:

Name Type Description Default
filter_by GeneIndex

A gene index to filter by.

required
max_distance int

The maximum distance from the TSS to consider. Defaults to 500_000.

500000

Returns:

Name Type Description
V2G V2G

variant to gene assignments with their distance to the TSS

Source code in src/otg/dataset/variant_annotation.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def get_distance_to_tss(
    self: VariantAnnotation,
    filter_by: GeneIndex,
    max_distance: int = 500_000,
) -> V2G:
    """Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

    Args:
        filter_by (GeneIndex): A gene index to filter by.
        max_distance (int): The maximum distance from the TSS to consider. Defaults to 500_000.

    Returns:
        V2G: variant to gene assignments with their distance to the TSS
    """
    return V2G(
        _df=(
            self.df.alias("variant")
            .join(
                f.broadcast(filter_by.locations_lut()).alias("gene"),
                on=[
                    f.col("variant.chromosome") == f.col("gene.chromosome"),
                    f.abs(f.col("variant.position") - f.col("gene.tss"))
                    <= max_distance,
                ],
                how="inner",
            )
            .withColumn(
                "inverse_distance",
                max_distance - f.abs(f.col("variant.position") - f.col("gene.tss")),
            )
            .transform(lambda df: normalise_column(df, "inverse_distance", "score"))
            .select(
                "variantId",
                f.col("variant.chromosome").alias("chromosome"),
                "position",
                "geneId",
                "score",
                f.lit("distance").alias("datatypeId"),
                f.lit("canonical_tss").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_most_severe_vep_v2g(vep_consequences, filter_by)

Creates a dataset with variant to gene assignments based on VEP's predicted consequence on the transcript.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
vep_consequences DataFrame

A dataframe of VEP consequences

required
filter_by GeneIndex

A gene index to filter by. Defaults to None.

required

Returns:

Name Type Description
V2G V2G

High and medium severity variant to gene assignments

Source code in src/otg/dataset/variant_annotation.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def get_most_severe_vep_v2g(
    self: VariantAnnotation,
    vep_consequences: DataFrame,
    filter_by: GeneIndex,
) -> V2G:
    """Creates a dataset with variant to gene assignments based on VEP's predicted consequence on the transcript.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        vep_consequences (DataFrame): A dataframe of VEP consequences
        filter_by (GeneIndex): A gene index to filter by. Defaults to None.

    Returns:
        V2G: High and medium severity variant to gene assignments
    """
    vep_lut = vep_consequences.select(
        f.element_at(f.split("Accession", r"/"), -1).alias(
            "variantFunctionalConsequenceId"
        ),
        f.col("Term").alias("label"),
        f.col("v2g_score").cast("double").alias("score"),
    )

    return V2G(
        _df=self.get_transcript_consequence_df(filter_by).select(
            "variantId",
            "chromosome",
            "position",
            f.col("transcriptConsequence.geneId").alias("geneId"),
            f.explode("transcriptConsequence.consequenceTerms").alias("label"),
            f.lit("vep").alias("datatypeId"),
            f.lit("variantConsequence").alias("datasourceId"),
        )
        # A variant can have multiple predicted consequences on a transcript, the most severe one is selected
        .join(
            f.broadcast(vep_lut),
            on="label",
            how="inner",
        )
        .filter(f.col("score") != 0)
        .transform(
            lambda df: get_record_with_maximum_value(
                df, ["variantId", "geneId"], "score"
            )
        ),
        _schema=V2G.get_schema(),
    )

get_plof_v2g(filter_by)

Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
filter_by GeneIndex

A gene index to filter by.

required

Returns:

Name Type Description
V2G V2G

variant to gene assignments from the LOFTEE algorithm

Source code in src/otg/dataset/variant_annotation.py
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def get_plof_v2g(self: VariantAnnotation, filter_by: GeneIndex) -> V2G:
    """Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        filter_by (GeneIndex): A gene index to filter by.

    Returns:
        V2G: variant to gene assignments from the LOFTEE algorithm
    """
    return V2G(
        _df=(
            self.get_transcript_consequence_df(filter_by)
            .filter(f.col("transcriptConsequence.lof").isNotNull())
            .withColumn(
                "isHighQualityPlof",
                f.when(f.col("transcriptConsequence.lof") == "HC", True).when(
                    f.col("transcriptConsequence.lof") == "LC", False
                ),
            )
            .withColumn(
                "score",
                f.when(f.col("isHighQualityPlof"), 1.0).when(
                    ~f.col("isHighQualityPlof"), 0
                ),
            )
            .select(
                "variantId",
                "chromosome",
                "position",
                "geneId",
                "isHighQualityPlof",
                f.col("score"),
                f.lit("vep").alias("datatypeId"),
                f.lit("loftee").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_polyphen_v2g(filter_by=None)

Creates a dataset with variant to gene assignments with a PolyPhen's predicted score on the transcript.

Polyphen informs about the probability that a substitution is damaging. Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
filter_by GeneIndex

A gene index to filter by. Defaults to None.

None

Returns:

Name Type Description
V2G V2G

variant to gene assignments with their polyphen scores

Source code in src/otg/dataset/variant_annotation.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def get_polyphen_v2g(
    self: VariantAnnotation, filter_by: Optional[GeneIndex] = None
) -> V2G:
    """Creates a dataset with variant to gene assignments with a PolyPhen's predicted score on the transcript.

    Polyphen informs about the probability that a substitution is damaging. Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        filter_by (GeneIndex): A gene index to filter by. Defaults to None.

    Returns:
        V2G: variant to gene assignments with their polyphen scores
    """
    return V2G(
        _df=(
            self.get_transcript_consequence_df(filter_by)
            .filter(f.col("transcriptConsequence.polyphenScore").isNotNull())
            .select(
                "variantId",
                "chromosome",
                "position",
                "geneId",
                f.col("transcriptConsequence.polyphenScore").alias("score"),
                f.col("transcriptConsequence.polyphenPrediction").alias("label"),
                f.lit("vep").alias("datatypeId"),
                f.lit("polyphen").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_schema() classmethod

Provides the schema for the VariantAnnotation dataset.

Source code in src/otg/dataset/variant_annotation.py
27
28
29
30
@classmethod
def get_schema(cls: type[VariantAnnotation]) -> StructType:
    """Provides the schema for the VariantAnnotation dataset."""
    return parse_spark_schema("variant_annotation.json")

get_sift_v2g(filter_by)

Creates a dataset with variant to gene assignments with a SIFT's predicted score on the transcript.

SIFT informs about the probability that a substitution is tolerated so scores nearer zero are more likely to be deleterious. Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
filter_by GeneIndex

A gene index to filter by.

required

Returns:

Name Type Description
V2G V2G

variant to gene assignments with their SIFT scores

Source code in src/otg/dataset/variant_annotation.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
def get_sift_v2g(self: VariantAnnotation, filter_by: GeneIndex) -> V2G:
    """Creates a dataset with variant to gene assignments with a SIFT's predicted score on the transcript.

    SIFT informs about the probability that a substitution is tolerated so scores nearer zero are more likely to be deleterious.
    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        filter_by (GeneIndex): A gene index to filter by.

    Returns:
        V2G: variant to gene assignments with their SIFT scores
    """
    return V2G(
        _df=(
            self.get_transcript_consequence_df(filter_by)
            .filter(f.col("transcriptConsequence.siftScore").isNotNull())
            .select(
                "variantId",
                "chromosome",
                "position",
                "geneId",
                f.expr("1 - transcriptConsequence.siftScore").alias("score"),
                f.col("transcriptConsequence.siftPrediction").alias("label"),
                f.lit("vep").alias("datatypeId"),
                f.lit("sift").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_transcript_consequence_df(filter_by=None)

Dataframe of exploded transcript consequences.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
filter_by GeneIndex

A gene index. Defaults to None.

None

Returns:

Name Type Description
DataFrame DataFrame

A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence

Source code in src/otg/dataset/variant_annotation.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def get_transcript_consequence_df(
    self: VariantAnnotation, filter_by: Optional[GeneIndex] = None
) -> DataFrame:
    """Dataframe of exploded transcript consequences.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        filter_by (GeneIndex): A gene index. Defaults to None.

    Returns:
        DataFrame: A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence
    """
    # exploding the array removes records without VEP annotation
    transript_consequences = self.df.withColumn(
        "transcriptConsequence", f.explode("vep.transcriptConsequences")
    ).select(
        "variantId",
        "chromosome",
        "position",
        "transcriptConsequence",
        f.col("transcriptConsequence.geneId").alias("geneId"),
    )
    if filter_by:
        transript_consequences = transript_consequences.join(
            f.broadcast(filter_by.df),
            on=["chromosome", "geneId"],
        )
    return transript_consequences.persist()

max_maf()

Maximum minor allele frequency accross all populations.

Returns:

Name Type Description
Column Column

Maximum minor allele frequency accross all populations.

Source code in src/otg/dataset/variant_annotation.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def max_maf(self: VariantAnnotation) -> Column:
    """Maximum minor allele frequency accross all populations.

    Returns:
        Column: Maximum minor allele frequency accross all populations.
    """
    return f.array_max(
        f.transform(
            self.df.alleleFrequencies,
            lambda af: f.when(
                af.alleleFrequency > 0.5, 1 - af.alleleFrequency
            ).otherwise(af.alleleFrequency),
        )
    )

Schema

root
 |-- variantId: string (nullable = false)
 |-- chromosome: string (nullable = false)
 |-- position: integer (nullable = false)
 |-- gnomad3VariantId: string (nullable = false)
 |-- referenceAllele: string (nullable = false)
 |-- alternateAllele: string (nullable = false)
 |-- chromosomeB37: string (nullable = true)
 |-- positionB37: integer (nullable = true)
 |-- alleleType: string (nullable = true)
 |-- rsIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- alleleFrequencies: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- populationName: string (nullable = true)
 |    |    |-- alleleFrequency: double (nullable = true)
 |-- cadd: struct (nullable = true)
 |    |-- phred: float (nullable = true)
 |    |-- raw: float (nullable = true)
 |-- vep: struct (nullable = false)
 |    |-- mostSevereConsequence: string (nullable = true)
 |    |-- transcriptConsequences: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- aminoAcids: string (nullable = true)
 |    |    |    |-- consequenceTerms: array (nullable = true)
 |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |-- geneId: string (nullable = true)
 |    |    |    |-- lof: string (nullable = true)
 |    |    |    |-- polyphenScore: double (nullable = true)
 |    |    |    |-- polyphenPrediction: string (nullable = true)
 |    |    |    |-- siftScore: double (nullable = true)
 |    |    |    |-- siftPrediction: string (nullable = true)