Skip to content

Variant index

gentropy.dataset.variant_index.VariantIndex dataclass

Bases: Dataset

Dataset for representing variants and methods applied on them.

Source code in src/gentropy/dataset/variant_index.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
@dataclass
class VariantIndex(Dataset):
    """Dataset for representing variants and methods applied on them."""

    def __post_init__(self: VariantIndex) -> None:
        """Forcing the presence of empty arrays even if the schema allows missing values.

        To bring in annotations from other sources, we use the `array_union()` function. However it assumes
        both columns have arrays (not just the array schema!). If one of the array is null, the union
        is nullified. This needs to be avoided.
        """
        # Calling dataset's post init to validate schema:
        super().__post_init__()

        # Composing a list of expressions to replace nulls with empty arrays if the schema assumes:
        array_columns = {
            column.name: f.when(f.col(column.name).isNull(), f.array()).otherwise(
                f.col(column.name)
            )
            for column in self.df.schema
            if "ArrayType" in column.dataType.__str__()
        }

        # Not returning, but changing the data:
        self.df = self.df.withColumns(array_columns)

    @classmethod
    def get_schema(cls: type[VariantIndex]) -> StructType:
        """Provides the schema for the variant index dataset.

        Returns:
            StructType: Schema for the VariantIndex dataset
        """
        return parse_spark_schema("variant_index.json")

    @classmethod
    def assign_variant_id(
        cls: type[VariantIndex],
    ) -> Column:
        """Creates a column with the variant ID that will be used to index the variant index.

        This is to ensure that the variant ID is unique and not too long.

        Returns:
            Column: Column with the variant ID containing the hash if the variant ID is longer than 100 characters
        """
        return (
            f.when(
                f.length(f.col("variantId")) >= 100,
                f.concat(
                    f.lit("otvar_"),
                    f.xxhash64(f.col("variantId")).cast("string"),
                ),
            )
            .otherwise(f.col("variantId"))
            .alias("variantId")
        )

    @staticmethod
    def hash_long_variant_ids(
        variant_id: Column, chromosome: Column, position: Column, threshold: int = 100
    ) -> Column:
        """Hash long variant identifiers.

        Args:
            variant_id (Column): Column containing variant identifiers.
            chromosome (Column): Chromosome column.
            position (Column): position column.
            threshold (int): Above this limit, a hash will be generated.

        Returns:
            Column: Hashed variant identifiers for long variants.

        Examples:
            >>> (
            ...    spark.createDataFrame([('v_short', 'x', 23),('v_looooooong', '23', 23), ('no_chrom', None, None), (None, None, None)], ['variantId', 'chromosome', 'position'])
            ...    .select('variantId', VariantIndex.hash_long_variant_ids(f.col('variantId'), f.col('chromosome'), f.col('position'), 10).alias('hashedVariantId'))
            ...    .show(truncate=False)
            ... )
            +------------+--------------------------------------------+
            |variantId   |hashedVariantId                             |
            +------------+--------------------------------------------+
            |v_short     |v_short                                     |
            |v_looooooong|OTVAR_23_23_3749d019d645894770c364992ae70a05|
            |no_chrom    |OTVAR_41acfcd7d4fd523b33600b504914ef25      |
            |null        |null                                        |
            +------------+--------------------------------------------+
            <BLANKLINE>
        """
        return (
            # If either the position or the chromosome is missing, we hash the identifier:
            f.when(
                chromosome.isNull() | position.isNull(),
                f.concat(
                    f.lit("OTVAR_"),
                    f.md5(variant_id).cast("string"),
                ),
            )
            # If chromosome and position are given, but alleles are too long, create hash:
            .when(
                f.length(variant_id) > threshold,
                f.concat_ws(
                    "_",
                    f.lit("OTVAR"),
                    chromosome,
                    position,
                    f.md5(variant_id).cast("string"),
                ),
            )
            # Missing and regular variant identifiers are left unchanged:
            .otherwise(variant_id)
        )

    def add_annotation(
        self: VariantIndex, annotation_source: VariantIndex
    ) -> VariantIndex:
        """Import annotation from an other variant index dataset.

        At this point the annotation can be extended with extra cross-references,
        in-silico predictions and allele frequencies.

        Args:
            annotation_source (VariantIndex): Annotation to add to the dataset

        Returns:
            VariantIndex: VariantIndex dataset with the annotation added
        """
        # Prefix for renaming columns:
        prefix = "annotation_"

        # Generate select expressions that to merge and import columns from annotation:
        select_expressions = []

        # Collect columns by iterating over the variant index schema:
        for field in VariantIndex.get_schema():
            column = field.name

            # If an annotation column can be found in both datasets:
            if (column in self.df.columns) and (column in annotation_source.df.columns):
                # Arrays are merged:
                if "ArrayType" in field.dataType.__str__():
                    select_expressions.append(
                        safe_array_union(
                            f.col(column), f.col(f"{prefix}{column}")
                        ).alias(column)
                    )
                # Non-array columns are coalesced:
                else:
                    select_expressions.append(
                        f.coalesce(f.col(column), f.col(f"{prefix}{column}")).alias(
                            column
                        )
                    )
            # If the column is only found in the annotation dataset rename it:
            elif column in annotation_source.df.columns:
                select_expressions.append(f.col(f"{prefix}{column}").alias(column))
            # If the column is only found in the main dataset:
            elif column in self.df.columns:
                select_expressions.append(f.col(column))
            # VariantIndex columns not found in either dataset are ignored.

        # Join the annotation to the dataset:
        return VariantIndex(
            _df=(
                f.broadcast(self.df)
                .join(
                    rename_all_columns(annotation_source.df, prefix),
                    on=[f.col("variantId") == f.col(f"{prefix}variantId")],
                    how="left",
                )
                .select(*select_expressions)
            ),
            _schema=self.schema,
        )

    def max_maf(self: VariantIndex) -> Column:
        """Maximum minor allele frequency accross all populations assuming all variants biallelic.

        Returns:
            Column: Maximum minor allele frequency accross all populations.

        Raises:
            ValueError: Allele frequencies are not present in the dataset.
        """
        if "alleleFrequencies" not in self.df.columns:
            raise ValueError("Allele frequencies are not present in the dataset.")

        return f.array_max(
            f.transform(
                self.df.alleleFrequencies,
                lambda af: f.when(
                    af.alleleFrequency > 0.5, 1 - af.alleleFrequency
                ).otherwise(af.alleleFrequency),
            )
        )

    def filter_by_variant(self: VariantIndex, df: DataFrame) -> VariantIndex:
        """Filter variant annotation dataset by a variant dataframe.

        Args:
            df (DataFrame): A dataframe of variants

        Returns:
            VariantIndex: A filtered variant annotation dataset
        """
        join_columns = ["variantId", "chromosome"]

        assert all(
            col in df.columns for col in join_columns
        ), "The variant dataframe must contain the columns 'variantId' and 'chromosome'."

        return VariantIndex(
            _df=self._df.join(
                f.broadcast(df.select(*join_columns).distinct()),
                on=join_columns,
                how="inner",
            ),
            _schema=self.schema,
        )

    def get_transcript_consequence_df(
        self: VariantIndex, gene_index: GeneIndex | None = None
    ) -> DataFrame:
        """Dataframe of exploded transcript consequences.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            gene_index (GeneIndex | None): A gene index. Defaults to None.

        Returns:
            DataFrame: A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence
        """
        # exploding the array removes records without VEP annotation
        transript_consequences = self.df.withColumn(
            "transcriptConsequence", f.explode("transcriptConsequences")
        ).select(
            "variantId",
            "chromosome",
            "position",
            "transcriptConsequence",
            f.col("transcriptConsequence.targetId").alias("geneId"),
        )
        if gene_index:
            transript_consequences = transript_consequences.join(
                f.broadcast(gene_index.df),
                on=["chromosome", "geneId"],
            )
        return transript_consequences

    def get_distance_to_tss(
        self: VariantIndex,
        gene_index: GeneIndex,
        max_distance: int = 500_000,
    ) -> V2G:
        """Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

        Args:
            gene_index (GeneIndex): A gene index to filter by.
            max_distance (int): The maximum distance from the TSS to consider. Defaults to 500_000.

        Returns:
            V2G: variant to gene assignments with their distance to the TSS
        """
        return V2G(
            _df=(
                self.df.alias("variant")
                .join(
                    f.broadcast(gene_index.locations_lut()).alias("gene"),
                    on=[
                        f.col("variant.chromosome") == f.col("gene.chromosome"),
                        f.abs(f.col("variant.position") - f.col("gene.tss"))
                        <= max_distance,
                    ],
                    how="inner",
                )
                .withColumn(
                    "distance", f.abs(f.col("variant.position") - f.col("gene.tss"))
                )
                .withColumn(
                    "inverse_distance",
                    max_distance - f.col("distance"),
                )
                .transform(lambda df: normalise_column(df, "inverse_distance", "score"))
                .select(
                    "variantId",
                    f.col("variant.chromosome").alias("chromosome"),
                    "distance",
                    "geneId",
                    "score",
                    f.lit("distance").alias("datatypeId"),
                    f.lit("canonical_tss").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_plof_v2g(self: VariantIndex, gene_index: GeneIndex) -> V2G:
        """Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            gene_index (GeneIndex): A gene index to filter by.

        Returns:
            V2G: variant to gene assignments from the LOFTEE algorithm
        """
        return V2G(
            _df=(
                self.get_transcript_consequence_df(gene_index)
                .filter(f.col("transcriptConsequence.lofteePrediction").isNotNull())
                .withColumn(
                    "isHighQualityPlof",
                    f.when(
                        f.col("transcriptConsequence.lofteePrediction") == "HC", True
                    ).when(
                        f.col("transcriptConsequence.lofteePrediction") == "LC", False
                    ),
                )
                .withColumn(
                    "score",
                    f.when(f.col("isHighQualityPlof"), 1.0).when(
                        ~f.col("isHighQualityPlof"), 0
                    ),
                )
                .select(
                    "variantId",
                    "chromosome",
                    "geneId",
                    "isHighQualityPlof",
                    f.col("score"),
                    f.lit("vep").alias("datatypeId"),
                    f.lit("loftee").alias("datasourceId"),
                )
            ),
            _schema=V2G.get_schema(),
        )

    def get_most_severe_transcript_consequence(
        self: VariantIndex,
        vep_consequences: DataFrame,
        gene_index: GeneIndex,
    ) -> V2G:
        """Creates a dataset with variant to gene assignments based on VEP's predicted consequence of the transcript.

        Optionally the trancript consequences can be reduced to the universe of a gene index.

        Args:
            vep_consequences (DataFrame): A dataframe of VEP consequences
            gene_index (GeneIndex): A gene index to filter by. Defaults to None.

        Returns:
            V2G: High and medium severity variant to gene assignments
        """
        return V2G(
            _df=self.get_transcript_consequence_df(gene_index)
            .select(
                "variantId",
                "chromosome",
                f.col("transcriptConsequence.targetId").alias("geneId"),
                f.explode(
                    "transcriptConsequence.variantFunctionalConsequenceIds"
                ).alias("variantFunctionalConsequenceId"),
                f.lit("vep").alias("datatypeId"),
                f.lit("variantConsequence").alias("datasourceId"),
            )
            .join(
                f.broadcast(vep_consequences),
                on="variantFunctionalConsequenceId",
                how="inner",
            )
            .drop("label")
            .filter(f.col("score") != 0)
            # A variant can have multiple predicted consequences on a transcript, the most severe one is selected
            .transform(
                lambda df: get_record_with_maximum_value(
                    df, ["variantId", "geneId"], "score"
                )
            ),
            _schema=V2G.get_schema(),
        )

add_annotation(annotation_source: VariantIndex) -> VariantIndex

Import annotation from an other variant index dataset.

At this point the annotation can be extended with extra cross-references, in-silico predictions and allele frequencies.

Parameters:

Name Type Description Default
annotation_source VariantIndex

Annotation to add to the dataset

required

Returns:

Name Type Description
VariantIndex VariantIndex

VariantIndex dataset with the annotation added

Source code in src/gentropy/dataset/variant_index.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def add_annotation(
    self: VariantIndex, annotation_source: VariantIndex
) -> VariantIndex:
    """Import annotation from an other variant index dataset.

    At this point the annotation can be extended with extra cross-references,
    in-silico predictions and allele frequencies.

    Args:
        annotation_source (VariantIndex): Annotation to add to the dataset

    Returns:
        VariantIndex: VariantIndex dataset with the annotation added
    """
    # Prefix for renaming columns:
    prefix = "annotation_"

    # Generate select expressions that to merge and import columns from annotation:
    select_expressions = []

    # Collect columns by iterating over the variant index schema:
    for field in VariantIndex.get_schema():
        column = field.name

        # If an annotation column can be found in both datasets:
        if (column in self.df.columns) and (column in annotation_source.df.columns):
            # Arrays are merged:
            if "ArrayType" in field.dataType.__str__():
                select_expressions.append(
                    safe_array_union(
                        f.col(column), f.col(f"{prefix}{column}")
                    ).alias(column)
                )
            # Non-array columns are coalesced:
            else:
                select_expressions.append(
                    f.coalesce(f.col(column), f.col(f"{prefix}{column}")).alias(
                        column
                    )
                )
        # If the column is only found in the annotation dataset rename it:
        elif column in annotation_source.df.columns:
            select_expressions.append(f.col(f"{prefix}{column}").alias(column))
        # If the column is only found in the main dataset:
        elif column in self.df.columns:
            select_expressions.append(f.col(column))
        # VariantIndex columns not found in either dataset are ignored.

    # Join the annotation to the dataset:
    return VariantIndex(
        _df=(
            f.broadcast(self.df)
            .join(
                rename_all_columns(annotation_source.df, prefix),
                on=[f.col("variantId") == f.col(f"{prefix}variantId")],
                how="left",
            )
            .select(*select_expressions)
        ),
        _schema=self.schema,
    )

assign_variant_id() -> Column classmethod

Creates a column with the variant ID that will be used to index the variant index.

This is to ensure that the variant ID is unique and not too long.

Returns:

Name Type Description
Column Column

Column with the variant ID containing the hash if the variant ID is longer than 100 characters

Source code in src/gentropy/dataset/variant_index.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@classmethod
def assign_variant_id(
    cls: type[VariantIndex],
) -> Column:
    """Creates a column with the variant ID that will be used to index the variant index.

    This is to ensure that the variant ID is unique and not too long.

    Returns:
        Column: Column with the variant ID containing the hash if the variant ID is longer than 100 characters
    """
    return (
        f.when(
            f.length(f.col("variantId")) >= 100,
            f.concat(
                f.lit("otvar_"),
                f.xxhash64(f.col("variantId")).cast("string"),
            ),
        )
        .otherwise(f.col("variantId"))
        .alias("variantId")
    )

filter_by_variant(df: DataFrame) -> VariantIndex

Filter variant annotation dataset by a variant dataframe.

Parameters:

Name Type Description Default
df DataFrame

A dataframe of variants

required

Returns:

Name Type Description
VariantIndex VariantIndex

A filtered variant annotation dataset

Source code in src/gentropy/dataset/variant_index.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def filter_by_variant(self: VariantIndex, df: DataFrame) -> VariantIndex:
    """Filter variant annotation dataset by a variant dataframe.

    Args:
        df (DataFrame): A dataframe of variants

    Returns:
        VariantIndex: A filtered variant annotation dataset
    """
    join_columns = ["variantId", "chromosome"]

    assert all(
        col in df.columns for col in join_columns
    ), "The variant dataframe must contain the columns 'variantId' and 'chromosome'."

    return VariantIndex(
        _df=self._df.join(
            f.broadcast(df.select(*join_columns).distinct()),
            on=join_columns,
            how="inner",
        ),
        _schema=self.schema,
    )

get_distance_to_tss(gene_index: GeneIndex, max_distance: int = 500000) -> V2G

Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

Parameters:

Name Type Description Default
gene_index GeneIndex

A gene index to filter by.

required
max_distance int

The maximum distance from the TSS to consider. Defaults to 500_000.

500000

Returns:

Name Type Description
V2G V2G

variant to gene assignments with their distance to the TSS

Source code in src/gentropy/dataset/variant_index.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def get_distance_to_tss(
    self: VariantIndex,
    gene_index: GeneIndex,
    max_distance: int = 500_000,
) -> V2G:
    """Extracts variant to gene assignments for variants falling within a window of a gene's TSS.

    Args:
        gene_index (GeneIndex): A gene index to filter by.
        max_distance (int): The maximum distance from the TSS to consider. Defaults to 500_000.

    Returns:
        V2G: variant to gene assignments with their distance to the TSS
    """
    return V2G(
        _df=(
            self.df.alias("variant")
            .join(
                f.broadcast(gene_index.locations_lut()).alias("gene"),
                on=[
                    f.col("variant.chromosome") == f.col("gene.chromosome"),
                    f.abs(f.col("variant.position") - f.col("gene.tss"))
                    <= max_distance,
                ],
                how="inner",
            )
            .withColumn(
                "distance", f.abs(f.col("variant.position") - f.col("gene.tss"))
            )
            .withColumn(
                "inverse_distance",
                max_distance - f.col("distance"),
            )
            .transform(lambda df: normalise_column(df, "inverse_distance", "score"))
            .select(
                "variantId",
                f.col("variant.chromosome").alias("chromosome"),
                "distance",
                "geneId",
                "score",
                f.lit("distance").alias("datatypeId"),
                f.lit("canonical_tss").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_most_severe_transcript_consequence(vep_consequences: DataFrame, gene_index: GeneIndex) -> V2G

Creates a dataset with variant to gene assignments based on VEP's predicted consequence of the transcript.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
vep_consequences DataFrame

A dataframe of VEP consequences

required
gene_index GeneIndex

A gene index to filter by. Defaults to None.

required

Returns:

Name Type Description
V2G V2G

High and medium severity variant to gene assignments

Source code in src/gentropy/dataset/variant_index.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def get_most_severe_transcript_consequence(
    self: VariantIndex,
    vep_consequences: DataFrame,
    gene_index: GeneIndex,
) -> V2G:
    """Creates a dataset with variant to gene assignments based on VEP's predicted consequence of the transcript.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        vep_consequences (DataFrame): A dataframe of VEP consequences
        gene_index (GeneIndex): A gene index to filter by. Defaults to None.

    Returns:
        V2G: High and medium severity variant to gene assignments
    """
    return V2G(
        _df=self.get_transcript_consequence_df(gene_index)
        .select(
            "variantId",
            "chromosome",
            f.col("transcriptConsequence.targetId").alias("geneId"),
            f.explode(
                "transcriptConsequence.variantFunctionalConsequenceIds"
            ).alias("variantFunctionalConsequenceId"),
            f.lit("vep").alias("datatypeId"),
            f.lit("variantConsequence").alias("datasourceId"),
        )
        .join(
            f.broadcast(vep_consequences),
            on="variantFunctionalConsequenceId",
            how="inner",
        )
        .drop("label")
        .filter(f.col("score") != 0)
        # A variant can have multiple predicted consequences on a transcript, the most severe one is selected
        .transform(
            lambda df: get_record_with_maximum_value(
                df, ["variantId", "geneId"], "score"
            )
        ),
        _schema=V2G.get_schema(),
    )

get_plof_v2g(gene_index: GeneIndex) -> V2G

Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
gene_index GeneIndex

A gene index to filter by.

required

Returns:

Name Type Description
V2G V2G

variant to gene assignments from the LOFTEE algorithm

Source code in src/gentropy/dataset/variant_index.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def get_plof_v2g(self: VariantIndex, gene_index: GeneIndex) -> V2G:
    """Creates a dataset with variant to gene assignments with a flag indicating if the variant is predicted to be a loss-of-function variant by the LOFTEE algorithm.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        gene_index (GeneIndex): A gene index to filter by.

    Returns:
        V2G: variant to gene assignments from the LOFTEE algorithm
    """
    return V2G(
        _df=(
            self.get_transcript_consequence_df(gene_index)
            .filter(f.col("transcriptConsequence.lofteePrediction").isNotNull())
            .withColumn(
                "isHighQualityPlof",
                f.when(
                    f.col("transcriptConsequence.lofteePrediction") == "HC", True
                ).when(
                    f.col("transcriptConsequence.lofteePrediction") == "LC", False
                ),
            )
            .withColumn(
                "score",
                f.when(f.col("isHighQualityPlof"), 1.0).when(
                    ~f.col("isHighQualityPlof"), 0
                ),
            )
            .select(
                "variantId",
                "chromosome",
                "geneId",
                "isHighQualityPlof",
                f.col("score"),
                f.lit("vep").alias("datatypeId"),
                f.lit("loftee").alias("datasourceId"),
            )
        ),
        _schema=V2G.get_schema(),
    )

get_schema() -> StructType classmethod

Provides the schema for the variant index dataset.

Returns:

Name Type Description
StructType StructType

Schema for the VariantIndex dataset

Source code in src/gentropy/dataset/variant_index.py
53
54
55
56
57
58
59
60
@classmethod
def get_schema(cls: type[VariantIndex]) -> StructType:
    """Provides the schema for the variant index dataset.

    Returns:
        StructType: Schema for the VariantIndex dataset
    """
    return parse_spark_schema("variant_index.json")

get_transcript_consequence_df(gene_index: GeneIndex | None = None) -> DataFrame

Dataframe of exploded transcript consequences.

Optionally the trancript consequences can be reduced to the universe of a gene index.

Parameters:

Name Type Description Default
gene_index GeneIndex | None

A gene index. Defaults to None.

None

Returns:

Name Type Description
DataFrame DataFrame

A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence

Source code in src/gentropy/dataset/variant_index.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def get_transcript_consequence_df(
    self: VariantIndex, gene_index: GeneIndex | None = None
) -> DataFrame:
    """Dataframe of exploded transcript consequences.

    Optionally the trancript consequences can be reduced to the universe of a gene index.

    Args:
        gene_index (GeneIndex | None): A gene index. Defaults to None.

    Returns:
        DataFrame: A dataframe exploded by transcript consequences with the columns variantId, chromosome, transcriptConsequence
    """
    # exploding the array removes records without VEP annotation
    transript_consequences = self.df.withColumn(
        "transcriptConsequence", f.explode("transcriptConsequences")
    ).select(
        "variantId",
        "chromosome",
        "position",
        "transcriptConsequence",
        f.col("transcriptConsequence.targetId").alias("geneId"),
    )
    if gene_index:
        transript_consequences = transript_consequences.join(
            f.broadcast(gene_index.df),
            on=["chromosome", "geneId"],
        )
    return transript_consequences

hash_long_variant_ids(variant_id: Column, chromosome: Column, position: Column, threshold: int = 100) -> Column staticmethod

Hash long variant identifiers.

Parameters:

Name Type Description Default
variant_id Column

Column containing variant identifiers.

required
chromosome Column

Chromosome column.

required
position Column

position column.

required
threshold int

Above this limit, a hash will be generated.

100

Returns:

Name Type Description
Column Column

Hashed variant identifiers for long variants.

Examples:

>>> (
...    spark.createDataFrame([('v_short', 'x', 23),('v_looooooong', '23', 23), ('no_chrom', None, None), (None, None, None)], ['variantId', 'chromosome', 'position'])
...    .select('variantId', VariantIndex.hash_long_variant_ids(f.col('variantId'), f.col('chromosome'), f.col('position'), 10).alias('hashedVariantId'))
...    .show(truncate=False)
... )
+------------+--------------------------------------------+
|variantId   |hashedVariantId                             |
+------------+--------------------------------------------+
|v_short     |v_short                                     |
|v_looooooong|OTVAR_23_23_3749d019d645894770c364992ae70a05|
|no_chrom    |OTVAR_41acfcd7d4fd523b33600b504914ef25      |
|null        |null                                        |
+------------+--------------------------------------------+
Source code in src/gentropy/dataset/variant_index.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@staticmethod
def hash_long_variant_ids(
    variant_id: Column, chromosome: Column, position: Column, threshold: int = 100
) -> Column:
    """Hash long variant identifiers.

    Args:
        variant_id (Column): Column containing variant identifiers.
        chromosome (Column): Chromosome column.
        position (Column): position column.
        threshold (int): Above this limit, a hash will be generated.

    Returns:
        Column: Hashed variant identifiers for long variants.

    Examples:
        >>> (
        ...    spark.createDataFrame([('v_short', 'x', 23),('v_looooooong', '23', 23), ('no_chrom', None, None), (None, None, None)], ['variantId', 'chromosome', 'position'])
        ...    .select('variantId', VariantIndex.hash_long_variant_ids(f.col('variantId'), f.col('chromosome'), f.col('position'), 10).alias('hashedVariantId'))
        ...    .show(truncate=False)
        ... )
        +------------+--------------------------------------------+
        |variantId   |hashedVariantId                             |
        +------------+--------------------------------------------+
        |v_short     |v_short                                     |
        |v_looooooong|OTVAR_23_23_3749d019d645894770c364992ae70a05|
        |no_chrom    |OTVAR_41acfcd7d4fd523b33600b504914ef25      |
        |null        |null                                        |
        +------------+--------------------------------------------+
        <BLANKLINE>
    """
    return (
        # If either the position or the chromosome is missing, we hash the identifier:
        f.when(
            chromosome.isNull() | position.isNull(),
            f.concat(
                f.lit("OTVAR_"),
                f.md5(variant_id).cast("string"),
            ),
        )
        # If chromosome and position are given, but alleles are too long, create hash:
        .when(
            f.length(variant_id) > threshold,
            f.concat_ws(
                "_",
                f.lit("OTVAR"),
                chromosome,
                position,
                f.md5(variant_id).cast("string"),
            ),
        )
        # Missing and regular variant identifiers are left unchanged:
        .otherwise(variant_id)
    )

max_maf() -> Column

Maximum minor allele frequency accross all populations assuming all variants biallelic.

Returns:

Name Type Description
Column Column

Maximum minor allele frequency accross all populations.

Raises:

Type Description
ValueError

Allele frequencies are not present in the dataset.

Source code in src/gentropy/dataset/variant_index.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def max_maf(self: VariantIndex) -> Column:
    """Maximum minor allele frequency accross all populations assuming all variants biallelic.

    Returns:
        Column: Maximum minor allele frequency accross all populations.

    Raises:
        ValueError: Allele frequencies are not present in the dataset.
    """
    if "alleleFrequencies" not in self.df.columns:
        raise ValueError("Allele frequencies are not present in the dataset.")

    return f.array_max(
        f.transform(
            self.df.alleleFrequencies,
            lambda af: f.when(
                af.alleleFrequency > 0.5, 1 - af.alleleFrequency
            ).otherwise(af.alleleFrequency),
        )
    )

Schema

root
 |-- variantId: string (nullable = false)
 |-- chromosome: string (nullable = false)
 |-- position: integer (nullable = false)
 |-- referenceAllele: string (nullable = false)
 |-- alternateAllele: string (nullable = false)
 |-- inSilicoPredictors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- method: string (nullable = true)
 |    |    |-- assessment: string (nullable = true)
 |    |    |-- score: float (nullable = true)
 |    |    |-- assessmentFlag: string (nullable = true)
 |    |    |-- targetId: string (nullable = true)
 |-- mostSevereConsequenceId: string (nullable = true)
 |-- transcriptConsequences: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- variantFunctionalConsequenceIds: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- aminoAcidChange: string (nullable = true)
 |    |    |-- uniprotAccessions: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- isEnsemblCanonical: boolean (nullable = false)
 |    |    |-- codons: string (nullable = true)
 |    |    |-- distance: long (nullable = true)
 |    |    |-- targetId: string (nullable = true)
 |    |    |-- impact: string (nullable = true)
 |    |    |-- lofteePrediction: string (nullable = true)
 |    |    |-- siftPrediction: float (nullable = true)
 |    |    |-- polyphenPrediction: float (nullable = true)
 |    |    |-- transcriptId: string (nullable = true)
 |-- rsIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- alleleFrequencies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- populationName: string (nullable = true)
 |    |    |-- alleleFrequency: double (nullable = true)
 |-- dbXrefs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- source: string (nullable = true)