Skip to content

Summary statistics

Bases: Dataset

Summary Statistics dataset.

A summary statistics dataset contains all single point statistics resulting from a GWAS.

Source code in src/otg/dataset/summary_statistics.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
@dataclass
class SummaryStatistics(Dataset):
    """Summary Statistics dataset.

    A summary statistics dataset contains all single point statistics resulting from a GWAS.
    """

    @classmethod
    def get_schema(cls: type[SummaryStatistics]) -> StructType:
        """Provides the schema for the SummaryStatistics dataset."""
        return parse_spark_schema("summary_statistics.json")

    @classmethod
    def from_gwas_harmonized_summary_stats(
        cls: type[SummaryStatistics],
        sumstats_df: DataFrame,
        study_id: str,
    ) -> SummaryStatistics:
        """Create summary statistics object from summary statistics flatfile, harmonized by the GWAS Catalog.

        Args:
            sumstats_df (DataFrame): Harmonized dataset read as a spark dataframe from GWAS Catalog.
            study_id (str): GWAS Catalog study accession.

        Returns:
            SummaryStatistics
        """
        # The effect allele frequency is an optional column, we have to test if it is there:
        allele_frequency_expression = (
            f.col("hm_effect_allele_frequency").cast(t.FloatType())
            if "hm_effect_allele_frequency" in sumstats_df.columns
            else f.lit(None)
        )

        # Processing columns of interest:
        processed_sumstats_df = (
            sumstats_df
            # Dropping rows which doesn't have proper position:
            .filter(f.col("hm_pos").cast(t.IntegerType()).isNotNull())
            .select(
                # Adding study identifier:
                f.lit(study_id).cast(t.StringType()).alias("studyId"),
                # Adding variant identifier:
                f.col("hm_variant_id").alias("variantId"),
                f.col("hm_chrom").alias("chromosome"),
                f.col("hm_pos").cast(t.IntegerType()).alias("position"),
                # Parsing p-value mantissa and exponent:
                *parse_pvalue(f.col("p_value")),
                # Converting/calculating effect and confidence interval:
                *convert_odds_ratio_to_beta(
                    f.col("hm_beta").cast(t.DoubleType()),
                    f.col("hm_odds_ratio").cast(t.DoubleType()),
                    f.col("standard_error").cast(t.DoubleType()),
                ),
                allele_frequency_expression.alias("effectAlleleFrequencyFromSource"),
            )
            # The previous select expression generated the necessary fields for calculating the confidence intervals:
            .select(
                "*",
                *calculate_confidence_interval(
                    f.col("pValueMantissa"),
                    f.col("pValueExponent"),
                    f.col("beta"),
                    f.col("standardError"),
                ),
            )
            .repartition(200, "chromosome")
            .sortWithinPartitions("position")
        )

        # Initializing summary statistics object:
        return cls(
            _df=processed_sumstats_df,
            _schema=cls.get_schema(),
        )

    def pvalue_filter(self: SummaryStatistics, pvalue: float) -> SummaryStatistics:
        """Filter summary statistics based on the provided p-value threshold.

        Args:
            pvalue (float): upper limit of the p-value to be filtered upon.

        Returns:
            SummaryStatistics: summary statistics object containing single point associations with p-values at least as significant as the provided threshold.
        """
        # Converting p-value to mantissa and exponent:
        (mantissa, exponent) = split_pvalue(pvalue)

        # Applying filter:
        df = self._df.filter(
            (f.col("pValueExponent") < exponent)
            | (
                (f.col("pValueExponent") == exponent)
                & (f.col("pValueMantissa") <= mantissa)
            )
        )
        return SummaryStatistics(_df=df, _schema=self._schema)

    def window_based_clumping(
        self: SummaryStatistics,
        distance: int,
        gwas_significance: float = 5e-8,
        with_locus: bool = False,
        baseline_significance: float = 0.05,
    ) -> StudyLocus:
        """Generate study-locus from summary statistics by distance based clumping + collect locus.

        Args:
            distance (int): Distance in base pairs to be used for clumping.
            gwas_significance (float, optional): GWAS significance threshold. Defaults to 5e-8.
            baseline_significance (float, optional): Baseline significance threshold for inclusion in the locus. Defaults to 0.05.

        Returns:
            StudyLocus: Clumped study-locus containing variants based on window.
        """
        # Based on if we want to get the locus different clumping function is called:
        if with_locus:
            clumped_df = WindowBasedClumping.clump_with_locus(
                self,
                window_length=distance,
                p_value_significance=gwas_significance,
                p_value_baseline=baseline_significance,
            )
        else:
            clumped_df = WindowBasedClumping.clump(
                self, window_length=distance, p_value_significance=gwas_significance
            )

        return clumped_df

    def exclude_region(self: SummaryStatistics, region: str) -> SummaryStatistics:
        """Exclude a region from the summary stats dataset.

        Args:
            region (str): region given in "chr##:#####-####" format

        Returns:
            SummaryStatistics: filtered summary statistics.
        """
        (chromosome, start_position, end_position) = parse_region(region)

        return SummaryStatistics(
            _df=(
                self.df.filter(
                    ~(
                        (f.col("chromosome") == chromosome)
                        & (
                            (f.col("position") >= start_position)
                            & (f.col("position") <= end_position)
                        )
                    )
                )
            ),
            _schema=SummaryStatistics.get_schema(),
        )

exclude_region(region)

Exclude a region from the summary stats dataset.

Parameters:

Name Type Description Default
region str

region given in "chr##:#####-####" format

required

Returns:

Name Type Description
SummaryStatistics SummaryStatistics

filtered summary statistics.

Source code in src/otg/dataset/summary_statistics.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def exclude_region(self: SummaryStatistics, region: str) -> SummaryStatistics:
    """Exclude a region from the summary stats dataset.

    Args:
        region (str): region given in "chr##:#####-####" format

    Returns:
        SummaryStatistics: filtered summary statistics.
    """
    (chromosome, start_position, end_position) = parse_region(region)

    return SummaryStatistics(
        _df=(
            self.df.filter(
                ~(
                    (f.col("chromosome") == chromosome)
                    & (
                        (f.col("position") >= start_position)
                        & (f.col("position") <= end_position)
                    )
                )
            )
        ),
        _schema=SummaryStatistics.get_schema(),
    )

from_gwas_harmonized_summary_stats(sumstats_df, study_id) classmethod

Create summary statistics object from summary statistics flatfile, harmonized by the GWAS Catalog.

Parameters:

Name Type Description Default
sumstats_df DataFrame

Harmonized dataset read as a spark dataframe from GWAS Catalog.

required
study_id str

GWAS Catalog study accession.

required

Returns:

Type Description
SummaryStatistics

SummaryStatistics

Source code in src/otg/dataset/summary_statistics.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@classmethod
def from_gwas_harmonized_summary_stats(
    cls: type[SummaryStatistics],
    sumstats_df: DataFrame,
    study_id: str,
) -> SummaryStatistics:
    """Create summary statistics object from summary statistics flatfile, harmonized by the GWAS Catalog.

    Args:
        sumstats_df (DataFrame): Harmonized dataset read as a spark dataframe from GWAS Catalog.
        study_id (str): GWAS Catalog study accession.

    Returns:
        SummaryStatistics
    """
    # The effect allele frequency is an optional column, we have to test if it is there:
    allele_frequency_expression = (
        f.col("hm_effect_allele_frequency").cast(t.FloatType())
        if "hm_effect_allele_frequency" in sumstats_df.columns
        else f.lit(None)
    )

    # Processing columns of interest:
    processed_sumstats_df = (
        sumstats_df
        # Dropping rows which doesn't have proper position:
        .filter(f.col("hm_pos").cast(t.IntegerType()).isNotNull())
        .select(
            # Adding study identifier:
            f.lit(study_id).cast(t.StringType()).alias("studyId"),
            # Adding variant identifier:
            f.col("hm_variant_id").alias("variantId"),
            f.col("hm_chrom").alias("chromosome"),
            f.col("hm_pos").cast(t.IntegerType()).alias("position"),
            # Parsing p-value mantissa and exponent:
            *parse_pvalue(f.col("p_value")),
            # Converting/calculating effect and confidence interval:
            *convert_odds_ratio_to_beta(
                f.col("hm_beta").cast(t.DoubleType()),
                f.col("hm_odds_ratio").cast(t.DoubleType()),
                f.col("standard_error").cast(t.DoubleType()),
            ),
            allele_frequency_expression.alias("effectAlleleFrequencyFromSource"),
        )
        # The previous select expression generated the necessary fields for calculating the confidence intervals:
        .select(
            "*",
            *calculate_confidence_interval(
                f.col("pValueMantissa"),
                f.col("pValueExponent"),
                f.col("beta"),
                f.col("standardError"),
            ),
        )
        .repartition(200, "chromosome")
        .sortWithinPartitions("position")
    )

    # Initializing summary statistics object:
    return cls(
        _df=processed_sumstats_df,
        _schema=cls.get_schema(),
    )

get_schema() classmethod

Provides the schema for the SummaryStatistics dataset.

Source code in src/otg/dataset/summary_statistics.py
35
36
37
38
@classmethod
def get_schema(cls: type[SummaryStatistics]) -> StructType:
    """Provides the schema for the SummaryStatistics dataset."""
    return parse_spark_schema("summary_statistics.json")

pvalue_filter(pvalue)

Filter summary statistics based on the provided p-value threshold.

Parameters:

Name Type Description Default
pvalue float

upper limit of the p-value to be filtered upon.

required

Returns:

Name Type Description
SummaryStatistics SummaryStatistics

summary statistics object containing single point associations with p-values at least as significant as the provided threshold.

Source code in src/otg/dataset/summary_statistics.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def pvalue_filter(self: SummaryStatistics, pvalue: float) -> SummaryStatistics:
    """Filter summary statistics based on the provided p-value threshold.

    Args:
        pvalue (float): upper limit of the p-value to be filtered upon.

    Returns:
        SummaryStatistics: summary statistics object containing single point associations with p-values at least as significant as the provided threshold.
    """
    # Converting p-value to mantissa and exponent:
    (mantissa, exponent) = split_pvalue(pvalue)

    # Applying filter:
    df = self._df.filter(
        (f.col("pValueExponent") < exponent)
        | (
            (f.col("pValueExponent") == exponent)
            & (f.col("pValueMantissa") <= mantissa)
        )
    )
    return SummaryStatistics(_df=df, _schema=self._schema)

window_based_clumping(distance, gwas_significance=5e-08, with_locus=False, baseline_significance=0.05)

Generate study-locus from summary statistics by distance based clumping + collect locus.

Parameters:

Name Type Description Default
distance int

Distance in base pairs to be used for clumping.

required
gwas_significance float

GWAS significance threshold. Defaults to 5e-8.

5e-08
baseline_significance float

Baseline significance threshold for inclusion in the locus. Defaults to 0.05.

0.05

Returns:

Name Type Description
StudyLocus StudyLocus

Clumped study-locus containing variants based on window.

Source code in src/otg/dataset/summary_statistics.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def window_based_clumping(
    self: SummaryStatistics,
    distance: int,
    gwas_significance: float = 5e-8,
    with_locus: bool = False,
    baseline_significance: float = 0.05,
) -> StudyLocus:
    """Generate study-locus from summary statistics by distance based clumping + collect locus.

    Args:
        distance (int): Distance in base pairs to be used for clumping.
        gwas_significance (float, optional): GWAS significance threshold. Defaults to 5e-8.
        baseline_significance (float, optional): Baseline significance threshold for inclusion in the locus. Defaults to 0.05.

    Returns:
        StudyLocus: Clumped study-locus containing variants based on window.
    """
    # Based on if we want to get the locus different clumping function is called:
    if with_locus:
        clumped_df = WindowBasedClumping.clump_with_locus(
            self,
            window_length=distance,
            p_value_significance=gwas_significance,
            p_value_baseline=baseline_significance,
        )
    else:
        clumped_df = WindowBasedClumping.clump(
            self, window_length=distance, p_value_significance=gwas_significance
        )

    return clumped_df

Schema

root
 |-- studyId: string (nullable = false)
 |-- variantId: string (nullable = false)
 |-- chromosome: string (nullable = false)
 |-- position: integer (nullable = false)
 |-- beta: double (nullable = false)
 |-- betaConfidenceIntervalLower: double (nullable = true)
 |-- betaConfidenceIntervalUpper: double (nullable = true)
 |-- pValueMantissa: float (nullable = false)
 |-- pValueExponent: integer (nullable = false)
 |-- effectAlleleFrequencyFromSource: float (nullable = true)
 |-- standardError: double (nullable = true)