Skip to content

Clumping

Clumping is a commonly used post-processing method that allows for the identification of independent association signals from GWAS summary statistics and curated associations. This process is critical because of the complex linkage disequilibrium (LD) structure in human populations, which can result in multiple statistically significant associations within the same genomic region. Clumping methods help reduce redundancy in GWAS results and ensure that each reported association represents an independent signal.

We have implemented two clumping methods:

  1. Distance-based clumping: Uses genomic window to clump the significant SNPs into one hit.
  2. LD-based clumping: Uses genomic window and LD to clump the significant SNPs into one hit.
  3. Locus-breaker clumping: Applies a distance cutoff between baseline significant SNPs. Returns the start and end position of the locus as well.

The algorithmic logic is similar to classic clumping approaches from PLINK (Reference: PLINK Clump Documentation). See details below:

Distance-based clumping

gentropy.method.window_based_clumping.WindowBasedClumping

Get semi-lead snps from summary statistics using a window based function.

Source code in src/gentropy/method/window_based_clumping.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class WindowBasedClumping:
    """Get semi-lead snps from summary statistics using a window based function."""

    @staticmethod
    def _cluster_peaks(
        study: Column, chromosome: Column, position: Column, window_length: int
    ) -> Column:
        """Cluster GWAS significant variants, were clusters are separated by a defined distance.

        !! Important to note that the length of the clusters can be arbitrarily big.

        Args:
            study (Column): study identifier
            chromosome (Column): chromosome identifier
            position (Column): position of the variant
            window_length (int): window length in basepair

        Returns:
            Column: containing cluster identifier

        Examples:
            >>> data = [
            ...     # Cluster 1:
            ...     ('s1', 'chr1', 2),
            ...     ('s1', 'chr1', 4),
            ...     ('s1', 'chr1', 12),
            ...     # Cluster 2 - Same chromosome:
            ...     ('s1', 'chr1', 31),
            ...     ('s1', 'chr1', 38),
            ...     ('s1', 'chr1', 42),
            ...     # Cluster 3 - New chromosome:
            ...     ('s1', 'chr2', 41),
            ...     ('s1', 'chr2', 44),
            ...     ('s1', 'chr2', 50),
            ...     # Cluster 4 - other study:
            ...     ('s2', 'chr2', 55),
            ...     ('s2', 'chr2', 62),
            ...     ('s2', 'chr2', 70),
            ... ]
            >>> window_length = 10
            >>> (
            ...     spark.createDataFrame(data, ['studyId', 'chromosome', 'position'])
            ...     .withColumn("cluster_id",
            ...         WindowBasedClumping._cluster_peaks(
            ...             f.col('studyId'),
            ...             f.col('chromosome'),
            ...             f.col('position'),
            ...             window_length
            ...         )
            ...     ).show()
            ... )
            +-------+----------+--------+----------+
            |studyId|chromosome|position|cluster_id|
            +-------+----------+--------+----------+
            |     s1|      chr1|       2| s1_chr1_2|
            |     s1|      chr1|       4| s1_chr1_2|
            |     s1|      chr1|      12| s1_chr1_2|
            |     s1|      chr1|      31|s1_chr1_31|
            |     s1|      chr1|      38|s1_chr1_31|
            |     s1|      chr1|      42|s1_chr1_31|
            |     s1|      chr2|      41|s1_chr2_41|
            |     s1|      chr2|      44|s1_chr2_41|
            |     s1|      chr2|      50|s1_chr2_41|
            |     s2|      chr2|      55|s2_chr2_55|
            |     s2|      chr2|      62|s2_chr2_55|
            |     s2|      chr2|      70|s2_chr2_55|
            +-------+----------+--------+----------+
            <BLANKLINE>

        """
        # By adding previous position, the cluster boundary can be identified:
        previous_position = f.lag(position).over(
            Window.partitionBy(study, chromosome).orderBy(position)
        )
        # We consider a cluster boudary if subsequent snps are further than the defined window:
        cluster_id = f.when(
            (previous_position.isNull())
            | (position - previous_position > window_length),
            f.concat_ws("_", study, chromosome, position),
        )
        # The cluster identifier is propagated across every variant of the cluster:
        return f.when(
            cluster_id.isNull(),
            f.last(cluster_id, ignorenulls=True).over(
                Window.partitionBy(study, chromosome)
                .orderBy(position)
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)
            ),
        ).otherwise(cluster_id)

    @staticmethod
    def _prune_peak(position: NDArray[np.float64], window_size: int) -> DenseVector:
        """Establish lead snps based on their positions listed by p-value.

        The function `find_peak` assigns lead SNPs based on their positions listed by p-value within a specified window size.

        Args:
            position (NDArray[np.float64]): positions of the SNPs sorted by p-value.
            window_size (int): the distance in bp within which associations are clumped together around the lead snp.

        Returns:
            DenseVector: binary vector where 1 indicates a lead SNP and 0 indicates a non-lead SNP.

        Examples:
            >>> from pyspark.ml import functions as fml
            >>> from pyspark.ml.linalg import DenseVector
            >>> WindowBasedClumping._prune_peak(np.array((3, 9, 8, 4, 6)), 2)
            DenseVector([1.0, 1.0, 0.0, 0.0, 1.0])

        """
        # Initializing the lead list with zeroes:
        is_lead = np.zeros(len(position))

        # List containing indices of leads:
        lead_indices: list[int] = []

        # Looping through all positions:
        for index in range(len(position)):
            # Looping through leads to find out if they are within a window:
            for lead_index in lead_indices:
                # If any of the leads within the window:
                if abs(position[lead_index] - position[index]) < window_size:
                    # Skipping further checks:
                    break
            else:
                # None of the leads were within the window:
                lead_indices.append(index)
                is_lead[index] = 1

        return DenseVector(is_lead)

    @staticmethod
    def clump(
        summary_statistics: SummaryStatistics,
        distance: int = WindowBasedClumpingStepConfig().distance,
        gwas_significance: float = WindowBasedClumpingStepConfig().gwas_significance,
    ) -> StudyLocus:
        """Clump significant signals from summary statistics based on window.

        Args:
            summary_statistics (SummaryStatistics): Summary statistics to be used for clumping.
            distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.
            gwas_significance (float): GWAS significance threshold. Defaults to 5e-8.

        Returns:
            StudyLocus: clumped summary statistics (without locus collection)

            Check WindowBasedClumpingStepConfig object for default values
        """
        # Create window for locus clusters
        # - variants where the distance between subsequent variants is below the defined threshold.
        # - Variants are sorted by descending significance
        cluster_window = Window.partitionBy(
            "studyId", "chromosome", "cluster_id"
        ).orderBy(f.col("pValueExponent").asc(), f.col("pValueMantissa").asc())

        return StudyLocus(
            _df=(
                summary_statistics
                # Dropping snps below significance - all subsequent steps are done on significant variants:
                .pvalue_filter(gwas_significance)
                .df
                # Clustering summary variants for efficient windowing (complexity reduction):
                .withColumn(
                    "cluster_id",
                    WindowBasedClumping._cluster_peaks(
                        f.col("studyId"),
                        f.col("chromosome"),
                        f.col("position"),
                        distance,
                    ),
                )
                # Within each cluster variants are ranked by significance:
                .withColumn("pvRank", f.row_number().over(cluster_window))
                # Collect positions in cluster for the most significant variant (complexity reduction):
                .withColumn(
                    "collectedPositions",
                    f.when(
                        f.col("pvRank") == 1,
                        f.collect_list(f.col("position")).over(
                            cluster_window.rowsBetween(
                                Window.currentRow, Window.unboundedFollowing
                            )
                        ),
                    ).otherwise(f.array()),
                )
                # Get semi indices only ONCE per cluster:
                .withColumn(
                    "semiIndices",
                    f.when(
                        f.size(f.col("collectedPositions")) > 0,
                        fml.vector_to_array(
                            f.udf(WindowBasedClumping._prune_peak, VectorUDT())(
                                fml.array_to_vector(f.col("collectedPositions")),
                                f.lit(distance),
                            )
                        ),
                    ),
                )
                # Propagating the result of the above calculation for all rows:
                .withColumn(
                    "semiIndices",
                    f.when(
                        f.col("semiIndices").isNull(),
                        f.first(f.col("semiIndices"), ignorenulls=True).over(
                            cluster_window
                        ),
                    ).otherwise(f.col("semiIndices")),
                )
                # Keeping semi indices only:
                .filter(f.col("semiIndices")[f.col("pvRank") - 1] > 0)
                .drop("pvRank", "collectedPositions", "semiIndices", "cluster_id")
                # Adding study-locus id:
                .withColumn(
                    "studyLocusId",
                    StudyLocus.assign_study_locus_id(
                        f.col("studyId"), f.col("variantId")
                    ),
                )
                # Initialize QC column as array of strings:
                .withColumn(
                    "qualityControls", f.array().cast(t.ArrayType(t.StringType()))
                )
            ),
            _schema=StudyLocus.get_schema(),
        )

clump(summary_statistics: SummaryStatistics, distance: int = WindowBasedClumpingStepConfig().distance, gwas_significance: float = WindowBasedClumpingStepConfig().gwas_significance) -> StudyLocus staticmethod

Clump significant signals from summary statistics based on window.

Parameters:

Name Type Description Default
summary_statistics SummaryStatistics

Summary statistics to be used for clumping.

required
distance int

Distance in base pairs to be used for clumping. Defaults to 500_000.

distance
gwas_significance float

GWAS significance threshold. Defaults to 5e-8.

gwas_significance

Returns:

Name Type Description
StudyLocus StudyLocus

clumped summary statistics (without locus collection)

StudyLocus

Check WindowBasedClumpingStepConfig object for default values

Source code in src/gentropy/method/window_based_clumping.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@staticmethod
def clump(
    summary_statistics: SummaryStatistics,
    distance: int = WindowBasedClumpingStepConfig().distance,
    gwas_significance: float = WindowBasedClumpingStepConfig().gwas_significance,
) -> StudyLocus:
    """Clump significant signals from summary statistics based on window.

    Args:
        summary_statistics (SummaryStatistics): Summary statistics to be used for clumping.
        distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.
        gwas_significance (float): GWAS significance threshold. Defaults to 5e-8.

    Returns:
        StudyLocus: clumped summary statistics (without locus collection)

        Check WindowBasedClumpingStepConfig object for default values
    """
    # Create window for locus clusters
    # - variants where the distance between subsequent variants is below the defined threshold.
    # - Variants are sorted by descending significance
    cluster_window = Window.partitionBy(
        "studyId", "chromosome", "cluster_id"
    ).orderBy(f.col("pValueExponent").asc(), f.col("pValueMantissa").asc())

    return StudyLocus(
        _df=(
            summary_statistics
            # Dropping snps below significance - all subsequent steps are done on significant variants:
            .pvalue_filter(gwas_significance)
            .df
            # Clustering summary variants for efficient windowing (complexity reduction):
            .withColumn(
                "cluster_id",
                WindowBasedClumping._cluster_peaks(
                    f.col("studyId"),
                    f.col("chromosome"),
                    f.col("position"),
                    distance,
                ),
            )
            # Within each cluster variants are ranked by significance:
            .withColumn("pvRank", f.row_number().over(cluster_window))
            # Collect positions in cluster for the most significant variant (complexity reduction):
            .withColumn(
                "collectedPositions",
                f.when(
                    f.col("pvRank") == 1,
                    f.collect_list(f.col("position")).over(
                        cluster_window.rowsBetween(
                            Window.currentRow, Window.unboundedFollowing
                        )
                    ),
                ).otherwise(f.array()),
            )
            # Get semi indices only ONCE per cluster:
            .withColumn(
                "semiIndices",
                f.when(
                    f.size(f.col("collectedPositions")) > 0,
                    fml.vector_to_array(
                        f.udf(WindowBasedClumping._prune_peak, VectorUDT())(
                            fml.array_to_vector(f.col("collectedPositions")),
                            f.lit(distance),
                        )
                    ),
                ),
            )
            # Propagating the result of the above calculation for all rows:
            .withColumn(
                "semiIndices",
                f.when(
                    f.col("semiIndices").isNull(),
                    f.first(f.col("semiIndices"), ignorenulls=True).over(
                        cluster_window
                    ),
                ).otherwise(f.col("semiIndices")),
            )
            # Keeping semi indices only:
            .filter(f.col("semiIndices")[f.col("pvRank") - 1] > 0)
            .drop("pvRank", "collectedPositions", "semiIndices", "cluster_id")
            # Adding study-locus id:
            .withColumn(
                "studyLocusId",
                StudyLocus.assign_study_locus_id(
                    f.col("studyId"), f.col("variantId")
                ),
            )
            # Initialize QC column as array of strings:
            .withColumn(
                "qualityControls", f.array().cast(t.ArrayType(t.StringType()))
            )
        ),
        _schema=StudyLocus.get_schema(),
    )

LD-based clumping:

gentropy.method.clump.LDclumping

LD clumping reports the most significant genetic associations in a region in terms of a smaller number of “clumps” of genetically linked SNPs.

Source code in src/gentropy/method/clump.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class LDclumping:
    """LD clumping reports the most significant genetic associations in a region in terms of a smaller number of “clumps” of genetically linked SNPs."""

    @staticmethod
    def _is_lead_linked(
        study_id: Column,
        variant_id: Column,
        p_value_exponent: Column,
        p_value_mantissa: Column,
        ld_set: Column,
    ) -> Column:
        """Evaluates whether a lead variant is linked to a tag (with lowest p-value) in the same studyLocus dataset.

        Args:
            study_id (Column): studyId
            variant_id (Column): Lead variant id
            p_value_exponent (Column): p-value exponent
            p_value_mantissa (Column): p-value mantissa
            ld_set (Column): Array of variants in LD with the lead variant

        Returns:
            Column: Boolean in which True indicates that the lead is linked to another tag in the same dataset.
        """
        leads_in_study = f.collect_set(variant_id).over(Window.partitionBy(study_id))
        tags_in_studylocus = f.array_union(
            # Get all tag variants from the credible set per studyLocusId
            f.transform(ld_set, lambda x: x.tagVariantId),
            # And append the lead variant so that the intersection is the same for all studyLocusIds in a study
            f.array(variant_id),
        )
        intersect_lead_tags = f.array_sort(
            f.array_intersect(leads_in_study, tags_in_studylocus)
        )
        return (
            # If the lead is in the credible set, we rank the peaks by p-value
            f.when(
                f.size(intersect_lead_tags) > 0,
                f.row_number().over(
                    Window.partitionBy(study_id, intersect_lead_tags).orderBy(
                        p_value_exponent, p_value_mantissa
                    )
                )
                > 1,
            )
            # If the intersection is empty (lead is not in the credible set or cred set is empty), the association is not linked
            .otherwise(f.lit(False))
        )

    @classmethod
    def clump(cls: type[LDclumping], associations: StudyLocus) -> StudyLocus:
        """Perform clumping on studyLocus dataset.

        Args:
            associations (StudyLocus): StudyLocus dataset

        Returns:
            StudyLocus: including flag and removing locus information for LD clumped loci.
        """
        return associations.clump()

clump(associations: StudyLocus) -> StudyLocus classmethod

Perform clumping on studyLocus dataset.

Parameters:

Name Type Description Default
associations StudyLocus

StudyLocus dataset

required

Returns:

Name Type Description
StudyLocus StudyLocus

including flag and removing locus information for LD clumped loci.

Source code in src/gentropy/method/clump.py
65
66
67
68
69
70
71
72
73
74
75
@classmethod
def clump(cls: type[LDclumping], associations: StudyLocus) -> StudyLocus:
    """Perform clumping on studyLocus dataset.

    Args:
        associations (StudyLocus): StudyLocus dataset

    Returns:
        StudyLocus: including flag and removing locus information for LD clumped loci.
    """
    return associations.clump()

Locus-breaker clumping

gentropy.method.locus_breaker_clumping.LocusBreakerClumping

Locus-breaker clumping method.

Source code in src/gentropy/method/locus_breaker_clumping.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class LocusBreakerClumping:
    """Locus-breaker clumping method."""

    @staticmethod
    def locus_breaker(
        summary_statistics: SummaryStatistics,
        baseline_pvalue_cutoff: float,
        distance_cutoff: int,
        pvalue_cutoff: float,
        flanking_distance: int,
    ) -> StudyLocus:
        """Identify GWAS associated loci based on the provided p-value and distance cutoff.

        - The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
        - The distance is only calculted between single point associations that reach the baseline p-value cutoff.
        - As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
        - To ensure completeness, the locus is extended by a flanking distance in both ends.

        Args:
            summary_statistics (SummaryStatistics): Input summary statistics dataset.
            baseline_pvalue_cutoff (float): baseline significance we consider for the locus.
            distance_cutoff (int): minimum distance that separates two loci.
            pvalue_cutoff (float): the minimum significance the locus should have.
            flanking_distance (int): the distance to extend the locus in both directions.

        Returns:
            StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus.
        """
        # Extract columns from the summary statistics:
        columns_sumstats_columns = summary_statistics.df.columns
        # Convert pvalue_cutoff to neglog scale:
        neglog_pv_cutoff = -np.log10(pvalue_cutoff)

        # First window to calculate the distance between consecutive positions:
        w1 = Window.partitionBy("studyId", "chromosome").orderBy("position")

        # Second window to calculate the locus start and end:
        w2 = (
            Window.partitionBy("studyId", "chromosome", "locusStart")
            .orderBy("position")
            .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        )

        # Third window to rank the variants within the locus based on neglog p-value to find top loci:
        w3 = Window.partitionBy(
            "studyId", "chromosome", "locusStart", "locusEnd"
        ).orderBy(f.col("negLogPValue").desc())

        return StudyLocus(
            _df=(
                # Applying the baseline p-value cutoff:
                summary_statistics.pvalue_filter(baseline_pvalue_cutoff)
                # Calculating the neglog p-value for easier sorting:
                .df.withColumn(
                    "negLogPValue",
                    calculate_neglog_pvalue(
                        f.col("pValueMantissa"), f.col("pValueExponent")
                    ),
                )
                # Calculating the distance between consecutive positions, then identifying the locus start and end:
                .withColumn("next_position", f.lag(f.col("position")).over(w1))
                .withColumn("distance", f.col("position") - f.col("next_position"))
                .withColumn(
                    "locusStart",
                    f.when(
                        (f.col("distance") > distance_cutoff)
                        | f.col("distance").isNull(),
                        f.col("position"),
                    ),
                )
                .withColumn(
                    "locusStart",
                    f.when(
                        f.last(f.col("locusStart") - flanking_distance, True).over(
                            w1.rowsBetween(-sys.maxsize, 0)
                        )
                        > 0,
                        f.last(f.col("locusStart") - flanking_distance, True).over(
                            w1.rowsBetween(-sys.maxsize, 0)
                        ),
                    ).otherwise(f.lit(0)),
                )
                .withColumn(
                    "locusEnd", f.max(f.col("position") + flanking_distance).over(w2)
                )
                .withColumn("rank", f.rank().over(w3))
                .filter(
                    (f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)
                )
                .select(
                    *columns_sumstats_columns,
                    # To make sure that the type of locusStart and locusEnd follows schema of StudyLocus:
                    f.col("locusStart").cast(t.IntegerType()).alias("locusStart"),
                    f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"),
                    f.lit(None)
                    .cast(t.ArrayType(t.StringType()))
                    .alias("qualityControls"),
                    StudyLocus.assign_study_locus_id(
                        f.col("studyId"), f.col("variantId")
                    ).alias("studyLocusId"),
                )
            ),
            _schema=StudyLocus.get_schema(),
        )

    @staticmethod
    def process_locus_breaker_output(
        lbc: StudyLocus,
        wbc: StudyLocus,
        large_loci_size: int,
    ) -> StudyLocus:
        """Process the locus breaker method result, and run window-based clumping on large loci.

        Args:
            lbc (StudyLocus): StudyLocus object from locus-breaker clumping.
            wbc (StudyLocus): StudyLocus object from window-based clumping.
            large_loci_size (int): the size to define large loci which should be broken with wbc.

        Returns:
            StudyLocus: clumped study loci with large loci broken by window-based clumping.
        """
        small_loci = lbc.filter(
            (f.col("locusEnd") - f.col("locusStart")) <= large_loci_size
        )
        large_loci = lbc.filter(
            (f.col("locusEnd") - f.col("locusStart")) > large_loci_size
        )
        large_loci_wbc = StudyLocus(
            wbc.df.alias("wbc")
            .join(
                large_loci.df.alias("ll"),
                (f.col("wbc.studyId") == f.col("ll.studyId"))
                & (f.col("wbc.chromosome") == f.col("ll.chromosome"))
                & (
                    f.col("wbc.position").between(
                        f.col("ll.locusStart"), f.col("ll.locusEnd")
                    )
                ),
                "semi",
            )
            .withColumns(
                {
                    "locusStart": f.col("position") - large_loci_size // 2,
                    "locusEnd": f.col("position") + large_loci_size // 2,
                }
            ),
            StudyLocus.get_schema(),
        )
        return StudyLocus(
            large_loci_wbc.df.unionByName(small_loci.df),
            StudyLocus.get_schema(),
        )

locus_breaker(summary_statistics: SummaryStatistics, baseline_pvalue_cutoff: float, distance_cutoff: int, pvalue_cutoff: float, flanking_distance: int) -> StudyLocus staticmethod

Identify GWAS associated loci based on the provided p-value and distance cutoff.

  • The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
  • The distance is only calculted between single point associations that reach the baseline p-value cutoff.
  • As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
  • To ensure completeness, the locus is extended by a flanking distance in both ends.

Parameters:

Name Type Description Default
summary_statistics SummaryStatistics

Input summary statistics dataset.

required
baseline_pvalue_cutoff float

baseline significance we consider for the locus.

required
distance_cutoff int

minimum distance that separates two loci.

required
pvalue_cutoff float

the minimum significance the locus should have.

required
flanking_distance int

the distance to extend the locus in both directions.

required

Returns:

Name Type Description
StudyLocus StudyLocus

clumped study loci with locus start and end positions + lead variant from the locus.

Source code in src/gentropy/method/locus_breaker_clumping.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@staticmethod
def locus_breaker(
    summary_statistics: SummaryStatistics,
    baseline_pvalue_cutoff: float,
    distance_cutoff: int,
    pvalue_cutoff: float,
    flanking_distance: int,
) -> StudyLocus:
    """Identify GWAS associated loci based on the provided p-value and distance cutoff.

    - The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
    - The distance is only calculted between single point associations that reach the baseline p-value cutoff.
    - As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
    - To ensure completeness, the locus is extended by a flanking distance in both ends.

    Args:
        summary_statistics (SummaryStatistics): Input summary statistics dataset.
        baseline_pvalue_cutoff (float): baseline significance we consider for the locus.
        distance_cutoff (int): minimum distance that separates two loci.
        pvalue_cutoff (float): the minimum significance the locus should have.
        flanking_distance (int): the distance to extend the locus in both directions.

    Returns:
        StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus.
    """
    # Extract columns from the summary statistics:
    columns_sumstats_columns = summary_statistics.df.columns
    # Convert pvalue_cutoff to neglog scale:
    neglog_pv_cutoff = -np.log10(pvalue_cutoff)

    # First window to calculate the distance between consecutive positions:
    w1 = Window.partitionBy("studyId", "chromosome").orderBy("position")

    # Second window to calculate the locus start and end:
    w2 = (
        Window.partitionBy("studyId", "chromosome", "locusStart")
        .orderBy("position")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )

    # Third window to rank the variants within the locus based on neglog p-value to find top loci:
    w3 = Window.partitionBy(
        "studyId", "chromosome", "locusStart", "locusEnd"
    ).orderBy(f.col("negLogPValue").desc())

    return StudyLocus(
        _df=(
            # Applying the baseline p-value cutoff:
            summary_statistics.pvalue_filter(baseline_pvalue_cutoff)
            # Calculating the neglog p-value for easier sorting:
            .df.withColumn(
                "negLogPValue",
                calculate_neglog_pvalue(
                    f.col("pValueMantissa"), f.col("pValueExponent")
                ),
            )
            # Calculating the distance between consecutive positions, then identifying the locus start and end:
            .withColumn("next_position", f.lag(f.col("position")).over(w1))
            .withColumn("distance", f.col("position") - f.col("next_position"))
            .withColumn(
                "locusStart",
                f.when(
                    (f.col("distance") > distance_cutoff)
                    | f.col("distance").isNull(),
                    f.col("position"),
                ),
            )
            .withColumn(
                "locusStart",
                f.when(
                    f.last(f.col("locusStart") - flanking_distance, True).over(
                        w1.rowsBetween(-sys.maxsize, 0)
                    )
                    > 0,
                    f.last(f.col("locusStart") - flanking_distance, True).over(
                        w1.rowsBetween(-sys.maxsize, 0)
                    ),
                ).otherwise(f.lit(0)),
            )
            .withColumn(
                "locusEnd", f.max(f.col("position") + flanking_distance).over(w2)
            )
            .withColumn("rank", f.rank().over(w3))
            .filter(
                (f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)
            )
            .select(
                *columns_sumstats_columns,
                # To make sure that the type of locusStart and locusEnd follows schema of StudyLocus:
                f.col("locusStart").cast(t.IntegerType()).alias("locusStart"),
                f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"),
                f.lit(None)
                .cast(t.ArrayType(t.StringType()))
                .alias("qualityControls"),
                StudyLocus.assign_study_locus_id(
                    f.col("studyId"), f.col("variantId")
                ).alias("studyLocusId"),
            )
        ),
        _schema=StudyLocus.get_schema(),
    )

process_locus_breaker_output(lbc: StudyLocus, wbc: StudyLocus, large_loci_size: int) -> StudyLocus staticmethod

Process the locus breaker method result, and run window-based clumping on large loci.

Parameters:

Name Type Description Default
lbc StudyLocus

StudyLocus object from locus-breaker clumping.

required
wbc StudyLocus

StudyLocus object from window-based clumping.

required
large_loci_size int

the size to define large loci which should be broken with wbc.

required

Returns:

Name Type Description
StudyLocus StudyLocus

clumped study loci with large loci broken by window-based clumping.

Source code in src/gentropy/method/locus_breaker_clumping.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
@staticmethod
def process_locus_breaker_output(
    lbc: StudyLocus,
    wbc: StudyLocus,
    large_loci_size: int,
) -> StudyLocus:
    """Process the locus breaker method result, and run window-based clumping on large loci.

    Args:
        lbc (StudyLocus): StudyLocus object from locus-breaker clumping.
        wbc (StudyLocus): StudyLocus object from window-based clumping.
        large_loci_size (int): the size to define large loci which should be broken with wbc.

    Returns:
        StudyLocus: clumped study loci with large loci broken by window-based clumping.
    """
    small_loci = lbc.filter(
        (f.col("locusEnd") - f.col("locusStart")) <= large_loci_size
    )
    large_loci = lbc.filter(
        (f.col("locusEnd") - f.col("locusStart")) > large_loci_size
    )
    large_loci_wbc = StudyLocus(
        wbc.df.alias("wbc")
        .join(
            large_loci.df.alias("ll"),
            (f.col("wbc.studyId") == f.col("ll.studyId"))
            & (f.col("wbc.chromosome") == f.col("ll.chromosome"))
            & (
                f.col("wbc.position").between(
                    f.col("ll.locusStart"), f.col("ll.locusEnd")
                )
            ),
            "semi",
        )
        .withColumns(
            {
                "locusStart": f.col("position") - large_loci_size // 2,
                "locusEnd": f.col("position") + large_loci_size // 2,
            }
        ),
        StudyLocus.get_schema(),
    )
    return StudyLocus(
        large_loci_wbc.df.unionByName(small_loci.df),
        StudyLocus.get_schema(),
    )