Skip to content

processing

Common functions for gentropy dataset processing (data business logic)

gentropy.common.processing.parse_efos(efo_uri: Column) -> Column

Extracting EFO identifiers.

This function parses EFO identifiers from a comma-separated list of EFO URIs.

Parameters:

Name Type Description Default
efo_uri Column

column with a list of EFO URIs

required

Returns:

Name Type Description
Column Column

column with a sorted list of parsed EFO IDs

Examples:

>>> d = [("http://www.ebi.ac.uk/efo/EFO_0000001,http://www.ebi.ac.uk/efo/EFO_0000002",)]
>>> df = spark.createDataFrame(d).toDF("efos")
>>> df.withColumn("efos_parsed", parse_efos(f.col("efos"))).show(truncate=False)
+-------------------------------------------------------------------------+--------------------------+
|efos                                                                     |efos_parsed               |
+-------------------------------------------------------------------------+--------------------------+
|http://www.ebi.ac.uk/efo/EFO_0000001,http://www.ebi.ac.uk/efo/EFO_0000002|[EFO_0000001, EFO_0000002]|
+-------------------------------------------------------------------------+--------------------------+
Source code in src/gentropy/common/processing.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def parse_efos(efo_uri: Column) -> Column:
    """Extracting EFO identifiers.

    This function parses EFO identifiers from a comma-separated list of EFO URIs.

    Args:
        efo_uri (Column): column with a list of EFO URIs

    Returns:
        Column: column with a sorted list of parsed EFO IDs

    Examples:
        >>> d = [("http://www.ebi.ac.uk/efo/EFO_0000001,http://www.ebi.ac.uk/efo/EFO_0000002",)]
        >>> df = spark.createDataFrame(d).toDF("efos")
        >>> df.withColumn("efos_parsed", parse_efos(f.col("efos"))).show(truncate=False)
        +-------------------------------------------------------------------------+--------------------------+
        |efos                                                                     |efos_parsed               |
        +-------------------------------------------------------------------------+--------------------------+
        |http://www.ebi.ac.uk/efo/EFO_0000001,http://www.ebi.ac.uk/efo/EFO_0000002|[EFO_0000001, EFO_0000002]|
        +-------------------------------------------------------------------------+--------------------------+
        <BLANKLINE>

    """
    name = extract_column_name(efo_uri)
    return f.array_sort(f.expr(f"regexp_extract_all(`{name}`, '([A-Z]+_[0-9]+)')"))

gentropy.common.processing.extract_chromosome(variant_id: Column) -> Column

Extract chromosome from variant ID.

This function extracts the chromosome from a variant ID. The variantId is expected to be in the format chromosome_position_ref_alt. The function does not convert the GENCODE to Ensembl chromosome notation. See https://genome.ucsc.edu/FAQ/FAQgenes.html#:~:text=maps%20only%20once.-,The%20differences,-Some%20of%20our

Parameters:

Name Type Description Default
variant_id Column

Variant ID

required

Returns:

Name Type Description
Column Column

Chromosome

Examples:

>>> d = [("chr1_12345_A_T",),("15_KI270850v1_alt_48777_C_T",),]
>>> df = spark.createDataFrame(d).toDF("variantId")
>>> df.withColumn("chromosome", extract_chromosome(f.col("variantId"))).show(truncate=False)
+---------------------------+-----------------+
|variantId                  |chromosome       |
+---------------------------+-----------------+
|chr1_12345_A_T             |chr1             |
|15_KI270850v1_alt_48777_C_T|15_KI270850v1_alt|
+---------------------------+-----------------+
Source code in src/gentropy/common/processing.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def extract_chromosome(variant_id: Column) -> Column:
    """Extract chromosome from variant ID.

    This function extracts the chromosome from a variant ID. The variantId is expected to be in the format `chromosome_position_ref_alt`.
    The function does not convert the GENCODE to Ensembl chromosome notation.
    See https://genome.ucsc.edu/FAQ/FAQgenes.html#:~:text=maps%20only%20once.-,The%20differences,-Some%20of%20our

    Args:
        variant_id (Column): Variant ID

    Returns:
        Column: Chromosome

    Examples:
        >>> d = [("chr1_12345_A_T",),("15_KI270850v1_alt_48777_C_T",),]
        >>> df = spark.createDataFrame(d).toDF("variantId")
        >>> df.withColumn("chromosome", extract_chromosome(f.col("variantId"))).show(truncate=False)
        +---------------------------+-----------------+
        |variantId                  |chromosome       |
        +---------------------------+-----------------+
        |chr1_12345_A_T             |chr1             |
        |15_KI270850v1_alt_48777_C_T|15_KI270850v1_alt|
        +---------------------------+-----------------+
        <BLANKLINE>

    """
    return f.regexp_extract(variant_id, r"^(.*)_\d+_.*$", 1)

gentropy.common.processing.extract_position(variant_id: Column) -> Column

Extract position from variant ID.

This function extracts the position from a variant ID. The variantId is expected to be in the format chromosome_position_ref_alt.

Parameters:

Name Type Description Default
variant_id Column

Variant ID

required

Returns:

Name Type Description
Column Column

Position

Examples:

>>> d = [("chr1_12345_A_T",),("15_KI270850v1_alt_48777_C_T",),]
>>> df = spark.createDataFrame(d).toDF("variantId")
>>> df.withColumn("position", extract_position(f.col("variantId"))).show(truncate=False)
+---------------------------+--------+
|variantId                  |position|
+---------------------------+--------+
|chr1_12345_A_T             |12345   |
|15_KI270850v1_alt_48777_C_T|48777   |
+---------------------------+--------+
Source code in src/gentropy/common/processing.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def extract_position(variant_id: Column) -> Column:
    """Extract position from variant ID.

    This function extracts the position from a variant ID. The variantId is expected to be in the format `chromosome_position_ref_alt`.

    Args:
        variant_id (Column): Variant ID

    Returns:
        Column: Position

    Examples:
        >>> d = [("chr1_12345_A_T",),("15_KI270850v1_alt_48777_C_T",),]
        >>> df = spark.createDataFrame(d).toDF("variantId")
        >>> df.withColumn("position", extract_position(f.col("variantId"))).show(truncate=False)
        +---------------------------+--------+
        |variantId                  |position|
        +---------------------------+--------+
        |chr1_12345_A_T             |12345   |
        |15_KI270850v1_alt_48777_C_T|48777   |
        +---------------------------+--------+
        <BLANKLINE>

    """
    return f.regexp_extract(variant_id, r"^.*_(\d+)_.*$", 1)

Harmonisation

gentropy.common.processing.harmonise_summary_stats(spark: SparkSession, raw_summary_stats_path: str, tmp_variant_annotation_path: str, chromosome: str, colname_position: str, colname_allele0: str, colname_allele1: str, colname_a1freq: str | None, colname_info: str | None, colname_beta: str, colname_se: str, colname_mlog10p: str, colname_n: str | None) -> DataFrame

Ingest and harmonise the summary stats.

  1. Rename chromosome 23 to X.
  2. Filter out low INFO rows.
  3. Filter out low frequency rows.
  4. Assign variant types.
  5. Create variant ID for joining the variant annotation dataset.
  6. Join with the Variant Annotation dataset.
  7. Drop bad quality variants.

Parameters:

Name Type Description Default
spark SparkSession

Spark session object.

required
raw_summary_stats_path str

Input raw summary stats path.

required
tmp_variant_annotation_path str

Path to the Variant Annotation dataset which has been further prepared and processed by the per_chromosome module (previous PR in the chain) to speed up the joins in the harmonisation phase. It includes all variants in both the direct (A0/A1) and reverse (A1/A0) orientations, so that the direction of the variant can be easily determined on joining.

required
chromosome str

Which chromosome to process.

required
colname_position str

Column name for position.

required
colname_allele0 str

Column name for allele0.

required
colname_allele1 str

Column name for allele1.

required
colname_a1freq str | None

Column name for allele1 frequency (optional).

required
colname_info str | None

Column name for INFO, reflecting variant quality (optional).

required
colname_beta str

Column name for beta.

required
colname_se str

Column name for beta standard error.

required
colname_mlog10p str

Column name for -log10(p).

required
colname_n str | None

Column name for the number of samples (optional).

required

Returns:

Name Type Description
DataFrame DataFrame

A harmonised summary stats dataframe.

Source code in src/gentropy/common/processing.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def harmonise_summary_stats(
    spark: SparkSession,
    raw_summary_stats_path: str,
    tmp_variant_annotation_path: str,
    chromosome: str,
    colname_position: str,
    colname_allele0: str,
    colname_allele1: str,
    colname_a1freq: str | None,
    colname_info: str | None,
    colname_beta: str,
    colname_se: str,
    colname_mlog10p: str,
    colname_n: str | None,
) -> DataFrame:
    """Ingest and harmonise the summary stats.

    1. Rename chromosome 23 to X.
    2. Filter out low INFO rows.
    3. Filter out low frequency rows.
    4. Assign variant types.
    5. Create variant ID for joining the variant annotation dataset.
    6. Join with the Variant Annotation dataset.
    7. Drop bad quality variants.

    Args:
        spark (SparkSession): Spark session object.
        raw_summary_stats_path (str): Input raw summary stats path.
        tmp_variant_annotation_path (str): Path to the Variant Annotation dataset which has been further prepared and processed by the per_chromosome module (previous PR in the chain) to speed up the joins in the harmonisation phase. It includes all variants in both the direct (A0/A1) and reverse (A1/A0) orientations, so that the direction of the variant can be easily determined on joining.
        chromosome (str): Which chromosome to process.
        colname_position (str): Column name for position.
        colname_allele0 (str): Column name for allele0.
        colname_allele1 (str): Column name for allele1.
        colname_a1freq (str | None): Column name for allele1 frequency (optional).
        colname_info (str | None): Column name for INFO, reflecting variant quality (optional).
        colname_beta (str): Column name for beta.
        colname_se (str): Column name for beta standard error.
        colname_mlog10p (str): Column name for -log10(p).
        colname_n (str | None): Column name for the number of samples (optional).

    Returns:
        DataFrame: A harmonised summary stats dataframe.
    """
    # Read the precomputed variant annotation dataset.
    va_df = (
        spark.read.parquet(tmp_variant_annotation_path)
        .filter(f.col("vaChromosome") == ("X" if chromosome == "23" else chromosome))
        .persist()
    )

    # Read and process the summary stats dataset.
    df = (
        spark.read.parquet(raw_summary_stats_path)
        .filter(f.col("chromosome") == chromosome)
        # Harmonise, 1: Rename chromosome 23 to X.
        .withColumn(
            "chromosome",
            f.when(f.col("chromosome") == "23", "X").otherwise(f.col("chromosome")),
        )
    )
    if colname_info:
        # Harmonise, 2: Filter out low INFO rows.
        df = df.filter(f.col(colname_info) >= 0.8)
    if colname_a1freq:
        # Harmonise, 3: Filter out low frequency rows.
        df = (
            df.withColumn(
                "MAF",
                f.when(f.col(colname_a1freq) < 0.5, f.col(colname_a1freq)).otherwise(
                    1 - f.col(colname_a1freq)
                ),
            )
            .filter(f.col("MAF") >= 0.0001)
            .drop("MAF")
        )
    df = (
        df
        # Harmonise, 4: Assign variant types.
        # There are three possible variant types:
        # 1. `snp_c` means an SNP converting a base into its complementary base: A<>T or G><C.
        # 2. `snp_n` means any other SNP where the length of each allele is still exactly 1.
        # 3. `indel` means any other variant where the length of at least one allele is greater than 1.
        .withColumn(
            "variant_type",
            f.when(
                (f.length(colname_allele0) == 1) & (f.length(colname_allele1) == 1),
                f.when(
                    ((f.col(colname_allele0) == "A") & (f.col(colname_allele1) == "T"))
                    | (
                        (f.col(colname_allele0) == "T")
                        & (f.col(colname_allele1) == "A")
                    )
                    | (
                        (f.col(colname_allele0) == "G")
                        & (f.col(colname_allele1) == "C")
                    )
                    | (
                        (f.col(colname_allele0) == "C")
                        & (f.col(colname_allele1) == "G")
                    ),
                    "snp_c",
                ).otherwise("snp_n"),
            ).otherwise("indel"),
        )
        # Harmonise, 5: Create variant ID for joining the variant annotation dataset.
        .withColumn(colname_position, f.col(colname_position).cast("integer"))
        .withColumn(
            "summary_stats_id",
            f.concat_ws(
                "_",
                f.col("chromosome"),
                f.col(colname_position),
                f.col(colname_allele0),
                f.col(colname_allele1),
            ),
        )
    )
    # Harmonise, 6: Join with the Variant Annotation dataset.
    df = (
        df.join(
            va_df,
            (df["chromosome"] == va_df["vaChromosome"])
            & (df["summary_stats_id"] == va_df["summary_stats_id"]),
            "inner",
        )
        .drop("vaChromosome", "summary_stats_id")
        .withColumn(
            "beta",
            f.when(
                f.col("direction") == "direct", f.col(colname_beta).cast("double")
            ).otherwise(-f.col(colname_beta).cast("double")),
        )
    )
    if colname_a1freq:
        df = df.withColumn(
            "effectAlleleFrequencyFromSource",
            f.when(
                f.col("direction") == "direct", f.col(colname_a1freq).cast("float")
            ).otherwise(1 - f.col(colname_a1freq).cast("float")),
        )
    df = (
        # Harmonise, 7: Drop bad quality variants.
        df.filter(
            ~((f.col("variant_type") == "snp_c") & (f.col("direction") == "flip"))
        )
    )

    # Prepare the fields according to schema.
    select_expr = [
        f.col("studyId"),
        f.col("chromosome"),
        f.col("variantId"),
        f.col("beta"),
        f.col(colname_position).cast(t.IntegerType()).alias("position"),
        # Parse p-value into mantissa and exponent.
        *pvalue_from_neglogpval(f.col(colname_mlog10p).cast(t.DoubleType())),
        # Add standard error and sample size information.
        f.col(colname_se).cast("double").alias("standardError"),
    ]
    if colname_n:
        select_expr.append(f.col(colname_n).cast("integer").alias("sampleSize"))

    df = (
        df.select(*select_expr)
        # Dropping associations where no harmonized position is available:
        .filter(f.col("position").isNotNull())
        # We are not interested in associations empty beta values:
        .filter(f.col("beta").isNotNull())
        # We are not interested in associations with zero effect:
        .filter(f.col("beta") != 0)
        # Drop rows which don't have proper position or beta value.
    )
    # NOTE! In case the standard error is empty, recompute it from p-value and beta.
    # If we leave the standard error empty for all fields, we will cause the sanity filter
    # to skip all rows.
    # Make sure the beta is non empty before computation.
    computed_chi2 = chi2_from_pvalue(f.col("pValueMantissa"), f.col("pValueExponent"))
    computed_stderr = stderr_from_chi2_and_effect_size(computed_chi2, f.col("beta"))
    df = df.withColumn(
        "standardError", f.coalesce(f.col("standardError"), computed_stderr)
    ).orderBy(f.col("chromosome"), f.col("position"))

    # Return the dataframe.
    return df

gentropy.common.processing.prepare_va(session: Session, variant_annotation_path: str, tmp_variant_annotation_path: str) -> None

Prepare the Variant Annotation dataset for efficient per-chromosome joins.

Parameters:

Name Type Description Default
session Session

The gentropy session wrapper to be used for reading and writing data.

required
variant_annotation_path str

The path to the input variant annotation dataset.

required
tmp_variant_annotation_path str

The path to store the temporary output for the repartitioned annotation dataset.

required
Source code in src/gentropy/common/processing.py
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
def prepare_va(session: Session, variant_annotation_path: str, tmp_variant_annotation_path: str) -> None:
    """Prepare the Variant Annotation dataset for efficient per-chromosome joins.

    Args:
        session (Session): The gentropy session wrapper to be used for reading and writing data.
        variant_annotation_path (str): The path to the input variant annotation dataset.
        tmp_variant_annotation_path (str): The path to store the temporary output for the repartitioned annotation dataset.
    """
    va_df = (
        session
        .spark
        .read
        .parquet(variant_annotation_path)
    )
    va_df_direct = (
        va_df.
        select(
            f.col("chromosome").alias("vaChromosome"),
            f.col("variantId"),
            f.concat_ws(
                "_",
                f.col("chromosome"),
                f.col("position"),
                f.col("referenceAllele"),
                f.col("alternateAllele")
            ).alias("summary_stats_id"),
            f.lit("direct").alias("direction")
        )
    )
    va_df_flip = (
        va_df.
        select(
            f.col("chromosome").alias("vaChromosome"),
            f.col("variantId"),
            f.concat_ws(
                "_",
                f.col("chromosome"),
                f.col("position"),
                f.col("alternateAllele"),
                f.col("referenceAllele")
            ).alias("summary_stats_id"),
            f.lit("flip").alias("direction")
        )
    )
    (
        va_df_direct.union(va_df_flip)
        .coalesce(1)
        .repartition("vaChromosome")
        .write
        .partitionBy("vaChromosome")
        .mode("overwrite")
        .parquet(tmp_variant_annotation_path)
    )

gentropy.common.processing.process_summary_stats_per_chromosome(session: Session, ingestion_class: type[UkbPppEurSummaryStats] | type[FinngenUkbMetaSummaryStats], raw_summary_stats_path: str, tmp_variant_annotation_path: str, summary_stats_output_path: str, study_index_path: str) -> None

Processes summary statistics for each chromosome, partitioning and writing results.

Parameters:

Name Type Description Default
session Session

The Gentropy session session to use for distributed data processing.

required
ingestion_class type[UkbPppEurSummaryStats] | type[FinngenUkbMetaSummaryStats]

The class used to handle ingestion of source data. Must have a from_source method returning a DataFrame.

required
raw_summary_stats_path str

The path to the raw summary statistics files.

required
tmp_variant_annotation_path str

The path to temporary variant annotation data, used for chromosome joins.

required
summary_stats_output_path str

The output path to write processed summary statistics as parquet files.

required
study_index_path str

The path to study index, which is necessary in some cases to populate the sample size column.

required
Source code in src/gentropy/common/processing.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def process_summary_stats_per_chromosome(
        session: Session,
        ingestion_class: type[UkbPppEurSummaryStats] | type[FinngenUkbMetaSummaryStats],
        raw_summary_stats_path: str,
        tmp_variant_annotation_path: str,
        summary_stats_output_path: str,
        study_index_path: str,
    ) -> None:
    """Processes summary statistics for each chromosome, partitioning and writing results.

    Args:
        session (Session): The Gentropy session session to use for distributed data processing.
        ingestion_class (type[UkbPppEurSummaryStats] | type[FinngenUkbMetaSummaryStats]): The class used to handle ingestion of source data. Must have a `from_source` method returning a DataFrame.
        raw_summary_stats_path (str): The path to the raw summary statistics files.
        tmp_variant_annotation_path (str): The path to temporary variant annotation data, used for chromosome joins.
        summary_stats_output_path (str): The output path to write processed summary statistics as parquet files.
        study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.
    """
    # Set mode to overwrite for processing the first chromosome.
    write_mode = "overwrite"
    # Chromosome 23 is X, this is handled downstream.
    for chromosome in list(range(1, 24)):
        logging_message = f"  Processing chromosome {chromosome}"
        session.logger.info(logging_message)
        (
            ingestion_class.from_source(
                spark=session.spark,
                raw_summary_stats_path=raw_summary_stats_path,
                tmp_variant_annotation_path=tmp_variant_annotation_path,
                chromosome=str(chromosome),
                study_index_path=study_index_path,
            )
            .df
            .coalesce(1)
            .repartition("studyId", "chromosome")
            .write
            .partitionBy("studyId", "chromosome")
            .mode(write_mode)
            .parquet(summary_stats_output_path)
        )
        # Now that we have written the first chromosome, change mode to append for subsequent operations.
        write_mode = "append"