Skip to content

Summary Statistics

gentropy.datasource.ukb_ppp_eur.summary_stats.UkbPppEurSummaryStats dataclass

Summary statistics dataset for UKB PPP (EUR).

Source code in src/gentropy/datasource/ukb_ppp_eur/summary_stats.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@dataclass
class UkbPppEurSummaryStats:
    """Summary statistics dataset for UKB PPP (EUR)."""

    @classmethod
    def from_source(
        cls: type[UkbPppEurSummaryStats],
        spark: SparkSession,
        raw_summary_stats_path: str,
        tmp_variant_annotation_path: str,
        chromosome: str,
    ) -> SummaryStatistics:
        """Ingest and harmonise all summary stats for UKB PPP (EUR) data.

        1. Rename chromosome 23 to X.
        2. Filter out low INFO rows.
        3. Filter out low frequency rows.
        4. Assign variant types.
        5. Create variant ID for joining the variant annotation dataset.
        6. Join with the Variant Annotation dataset.
        7. Drop bad quality variants.

        Args:
            spark (SparkSession): Spark session object.
            raw_summary_stats_path (str): Input raw summary stats path.
            tmp_variant_annotation_path (str): Input variant annotation dataset path.
            chromosome (str): Which chromosome to process.

        Returns:
            SummaryStatistics: Processed summary statistics dataset for a given chromosome.
        """
        # Read the precomputed variant annotation dataset.
        va_df = (
            spark
            .read
            .parquet(tmp_variant_annotation_path)
            .filter(f.col("vaChromosome") == ("X" if chromosome == "23" else chromosome))
            .persist()
        )

        # Read and process the summary stats dataset.
        df = (
            spark
            .read
            .parquet(raw_summary_stats_path)
            .filter(f.col("chromosome") == chromosome)
            # Harmonise, 1: Rename chromosome 23 to X.
            .withColumn(
                "chromosome",
                f.when(
                    f.col("chromosome") == "23", "X"
                ).otherwise(f.col("chromosome"))
            )
            # Harmonise, 2: Filter out low INFO rows.
            .filter(f.col("INFO") >= 0.8)
            # Harmonise, 3: Filter out low frequency rows.
            .withColumn(
                "MAF",
                f.when(f.col("A1FREQ") < 0.5, f.col("A1FREQ"))
                .otherwise(1 - f.col("A1FREQ"))
            )
            .filter(f.col("MAF") >= 0.0001)
            .drop("MAF")
            # Harmonise, 4: Assign variant types.
            .withColumn(
                "variant_type",
                f.when(
                    (f.length("ALLELE0") == 1) & (f.length("ALLELE1") == 1),
                    f.when(
                        ((f.col("ALLELE0") == "A") & (f.col("ALLELE1") == "T")) |
                        ((f.col("ALLELE0") == "T") & (f.col("ALLELE1") == "A")) |
                        ((f.col("ALLELE0") == "G") & (f.col("ALLELE1") == "C")) |
                        ((f.col("ALLELE0") == "C") & (f.col("ALLELE1") == "G")),
                        "snp_c"
                    )
                    .otherwise(
                        "snp_n"
                    )
                )
                .otherwise(
                    "indel"
                )
            )
            # Harmonise, 5: Create variant ID for joining the variant annotation dataset.
            .withColumn(
                "GENPOS",
                f.col("GENPOS").cast("integer")
            )
            .withColumn(
                "ukb_ppp_id",
                f.concat_ws(
                    "_",
                    f.col("chromosome"),
                    f.col("GENPOS"),
                    f.col("ALLELE0"),
                    f.col("ALLELE1")
                )
            )
        )
        # Harmonise, 6: Join with the Variant Annotation dataset.
        df = (
            df
            .join(va_df, (df["chromosome"] == va_df["vaChromosome"]) & (df["ukb_ppp_id"] == va_df["ukb_ppp_id"]), "inner")
            .drop("vaChromosome", "ukb_ppp_id")
            .withColumn(
                "effectAlleleFrequencyFromSource",
                f.when(
                    f.col("direction") == "direct",
                    f.col("A1FREQ").cast("float")
                ).otherwise(1 - f.col("A1FREQ").cast("float"))
            )
            .withColumn(
                "beta",
                f.when(
                    f.col("direction") == "direct",
                    f.col("BETA").cast("double")
                ).otherwise(-f.col("BETA").cast("double"))
            )
        )
        df = (
            # Harmonise, 7: Drop bad quality variants.
            df
            .filter(
                ~ ((f.col("variant_type") == "snp_c") & (f.col("direction") == "flip"))
            )
        )

        # Prepare the fields according to schema.
        df = (
            df
            .select(
                f.col("studyId"),
                f.col("chromosome"),
                f.col("variantId"),
                f.col("beta"),
                f.col("GENPOS").cast(t.IntegerType()).alias("position"),
                # Parse p-value into mantissa and exponent.
                *neglog_pvalue_to_mantissa_and_exponent(f.col("LOG10P").cast(t.DoubleType())),
                # Add standard error and sample size information.
                f.col("SE").cast("double").alias("standardError"),
                f.col("N").cast("integer").alias("sampleSize"),
            )
            # Drop rows which don't have proper position or beta value.
            .filter(
                f.col("position").cast(t.IntegerType()).isNotNull()
                & (f.col("beta") != 0)
            )
        )

        # Create the summary statistics object.
        return SummaryStatistics(
            _df=df,
            _schema=SummaryStatistics.get_schema(),
        )

from_source(spark: SparkSession, raw_summary_stats_path: str, tmp_variant_annotation_path: str, chromosome: str) -> SummaryStatistics classmethod

Ingest and harmonise all summary stats for UKB PPP (EUR) data.

  1. Rename chromosome 23 to X.
  2. Filter out low INFO rows.
  3. Filter out low frequency rows.
  4. Assign variant types.
  5. Create variant ID for joining the variant annotation dataset.
  6. Join with the Variant Annotation dataset.
  7. Drop bad quality variants.

Parameters:

Name Type Description Default
spark SparkSession

Spark session object.

required
raw_summary_stats_path str

Input raw summary stats path.

required
tmp_variant_annotation_path str

Input variant annotation dataset path.

required
chromosome str

Which chromosome to process.

required

Returns:

Name Type Description
SummaryStatistics SummaryStatistics

Processed summary statistics dataset for a given chromosome.

Source code in src/gentropy/datasource/ukb_ppp_eur/summary_stats.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@classmethod
def from_source(
    cls: type[UkbPppEurSummaryStats],
    spark: SparkSession,
    raw_summary_stats_path: str,
    tmp_variant_annotation_path: str,
    chromosome: str,
) -> SummaryStatistics:
    """Ingest and harmonise all summary stats for UKB PPP (EUR) data.

    1. Rename chromosome 23 to X.
    2. Filter out low INFO rows.
    3. Filter out low frequency rows.
    4. Assign variant types.
    5. Create variant ID for joining the variant annotation dataset.
    6. Join with the Variant Annotation dataset.
    7. Drop bad quality variants.

    Args:
        spark (SparkSession): Spark session object.
        raw_summary_stats_path (str): Input raw summary stats path.
        tmp_variant_annotation_path (str): Input variant annotation dataset path.
        chromosome (str): Which chromosome to process.

    Returns:
        SummaryStatistics: Processed summary statistics dataset for a given chromosome.
    """
    # Read the precomputed variant annotation dataset.
    va_df = (
        spark
        .read
        .parquet(tmp_variant_annotation_path)
        .filter(f.col("vaChromosome") == ("X" if chromosome == "23" else chromosome))
        .persist()
    )

    # Read and process the summary stats dataset.
    df = (
        spark
        .read
        .parquet(raw_summary_stats_path)
        .filter(f.col("chromosome") == chromosome)
        # Harmonise, 1: Rename chromosome 23 to X.
        .withColumn(
            "chromosome",
            f.when(
                f.col("chromosome") == "23", "X"
            ).otherwise(f.col("chromosome"))
        )
        # Harmonise, 2: Filter out low INFO rows.
        .filter(f.col("INFO") >= 0.8)
        # Harmonise, 3: Filter out low frequency rows.
        .withColumn(
            "MAF",
            f.when(f.col("A1FREQ") < 0.5, f.col("A1FREQ"))
            .otherwise(1 - f.col("A1FREQ"))
        )
        .filter(f.col("MAF") >= 0.0001)
        .drop("MAF")
        # Harmonise, 4: Assign variant types.
        .withColumn(
            "variant_type",
            f.when(
                (f.length("ALLELE0") == 1) & (f.length("ALLELE1") == 1),
                f.when(
                    ((f.col("ALLELE0") == "A") & (f.col("ALLELE1") == "T")) |
                    ((f.col("ALLELE0") == "T") & (f.col("ALLELE1") == "A")) |
                    ((f.col("ALLELE0") == "G") & (f.col("ALLELE1") == "C")) |
                    ((f.col("ALLELE0") == "C") & (f.col("ALLELE1") == "G")),
                    "snp_c"
                )
                .otherwise(
                    "snp_n"
                )
            )
            .otherwise(
                "indel"
            )
        )
        # Harmonise, 5: Create variant ID for joining the variant annotation dataset.
        .withColumn(
            "GENPOS",
            f.col("GENPOS").cast("integer")
        )
        .withColumn(
            "ukb_ppp_id",
            f.concat_ws(
                "_",
                f.col("chromosome"),
                f.col("GENPOS"),
                f.col("ALLELE0"),
                f.col("ALLELE1")
            )
        )
    )
    # Harmonise, 6: Join with the Variant Annotation dataset.
    df = (
        df
        .join(va_df, (df["chromosome"] == va_df["vaChromosome"]) & (df["ukb_ppp_id"] == va_df["ukb_ppp_id"]), "inner")
        .drop("vaChromosome", "ukb_ppp_id")
        .withColumn(
            "effectAlleleFrequencyFromSource",
            f.when(
                f.col("direction") == "direct",
                f.col("A1FREQ").cast("float")
            ).otherwise(1 - f.col("A1FREQ").cast("float"))
        )
        .withColumn(
            "beta",
            f.when(
                f.col("direction") == "direct",
                f.col("BETA").cast("double")
            ).otherwise(-f.col("BETA").cast("double"))
        )
    )
    df = (
        # Harmonise, 7: Drop bad quality variants.
        df
        .filter(
            ~ ((f.col("variant_type") == "snp_c") & (f.col("direction") == "flip"))
        )
    )

    # Prepare the fields according to schema.
    df = (
        df
        .select(
            f.col("studyId"),
            f.col("chromosome"),
            f.col("variantId"),
            f.col("beta"),
            f.col("GENPOS").cast(t.IntegerType()).alias("position"),
            # Parse p-value into mantissa and exponent.
            *neglog_pvalue_to_mantissa_and_exponent(f.col("LOG10P").cast(t.DoubleType())),
            # Add standard error and sample size information.
            f.col("SE").cast("double").alias("standardError"),
            f.col("N").cast("integer").alias("sampleSize"),
        )
        # Drop rows which don't have proper position or beta value.
        .filter(
            f.col("position").cast(t.IntegerType()).isNotNull()
            & (f.col("beta") != 0)
        )
    )

    # Create the summary statistics object.
    return SummaryStatistics(
        _df=df,
        _schema=SummaryStatistics.get_schema(),
    )