Skip to content

Clumping

Clumping is a commonly used post-processing method that allows for the identification of independent association signals from GWAS summary statistics and curated associations. This process is critical because of the complex linkage disequilibrium (LD) structure in human populations, which can result in multiple statistically significant associations within the same genomic region. Clumping methods help reduce redundancy in GWAS results and ensure that each reported association represents an independent signal.

We have implemented two clumping methods:

  1. Distance-based clumping: Uses genomic window to clump the significant SNPs into one hit.
  2. LD-based clumping: Uses genomic window and LD to clump the significant SNPs into one hit.

The algorithmic logic is similar to classic clumping approaches from PLINK (Reference: PLINK Clump Documentation). See details below:

Distance-based clumping

gentropy.method.window_based_clumping.WindowBasedClumping

Get semi-lead snps from summary statistics using a window based function.

Source code in src/gentropy/method/window_based_clumping.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
class WindowBasedClumping:
    """Get semi-lead snps from summary statistics using a window based function."""

    @staticmethod
    def _cluster_peaks(
        study: Column, chromosome: Column, position: Column, window_length: int
    ) -> Column:
        """Cluster GWAS significant variants, were clusters are separated by a defined distance.

        !! Important to note that the length of the clusters can be arbitrarily big.

        Args:
            study (Column): study identifier
            chromosome (Column): chromosome identifier
            position (Column): position of the variant
            window_length (int): window length in basepair

        Returns:
            Column: containing cluster identifier

        Examples:
            >>> data = [
            ...     # Cluster 1:
            ...     ('s1', 'chr1', 2),
            ...     ('s1', 'chr1', 4),
            ...     ('s1', 'chr1', 12),
            ...     # Cluster 2 - Same chromosome:
            ...     ('s1', 'chr1', 31),
            ...     ('s1', 'chr1', 38),
            ...     ('s1', 'chr1', 42),
            ...     # Cluster 3 - New chromosome:
            ...     ('s1', 'chr2', 41),
            ...     ('s1', 'chr2', 44),
            ...     ('s1', 'chr2', 50),
            ...     # Cluster 4 - other study:
            ...     ('s2', 'chr2', 55),
            ...     ('s2', 'chr2', 62),
            ...     ('s2', 'chr2', 70),
            ... ]
            >>> window_length = 10
            >>> (
            ...     spark.createDataFrame(data, ['studyId', 'chromosome', 'position'])
            ...     .withColumn("cluster_id",
            ...         WindowBasedClumping._cluster_peaks(
            ...             f.col('studyId'),
            ...             f.col('chromosome'),
            ...             f.col('position'),
            ...             window_length
            ...         )
            ...     ).show()
            ... )
            +-------+----------+--------+----------+
            |studyId|chromosome|position|cluster_id|
            +-------+----------+--------+----------+
            |     s1|      chr1|       2| s1_chr1_2|
            |     s1|      chr1|       4| s1_chr1_2|
            |     s1|      chr1|      12| s1_chr1_2|
            |     s1|      chr1|      31|s1_chr1_31|
            |     s1|      chr1|      38|s1_chr1_31|
            |     s1|      chr1|      42|s1_chr1_31|
            |     s1|      chr2|      41|s1_chr2_41|
            |     s1|      chr2|      44|s1_chr2_41|
            |     s1|      chr2|      50|s1_chr2_41|
            |     s2|      chr2|      55|s2_chr2_55|
            |     s2|      chr2|      62|s2_chr2_55|
            |     s2|      chr2|      70|s2_chr2_55|
            +-------+----------+--------+----------+
            <BLANKLINE>

        """
        # By adding previous position, the cluster boundary can be identified:
        previous_position = f.lag(position).over(
            Window.partitionBy(study, chromosome).orderBy(position)
        )
        # We consider a cluster boudary if subsequent snps are further than the defined window:
        cluster_id = f.when(
            (previous_position.isNull())
            | (position - previous_position > window_length),
            f.concat_ws("_", study, chromosome, position),
        )
        # The cluster identifier is propagated across every variant of the cluster:
        return f.when(
            cluster_id.isNull(),
            f.last(cluster_id, ignorenulls=True).over(
                Window.partitionBy(study, chromosome)
                .orderBy(position)
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)
            ),
        ).otherwise(cluster_id)

    @staticmethod
    def _prune_peak(position: NDArray[np.float64], window_size: int) -> DenseVector:
        """Establish lead snps based on their positions listed by p-value.

        The function `find_peak` assigns lead SNPs based on their positions listed by p-value within a specified window size.

        Args:
            position (NDArray[np.float64]): positions of the SNPs sorted by p-value.
            window_size (int): the distance in bp within which associations are clumped together around the lead snp.

        Returns:
            DenseVector: binary vector where 1 indicates a lead SNP and 0 indicates a non-lead SNP.

        Examples:
            >>> from pyspark.ml import functions as fml
            >>> from pyspark.ml.linalg import DenseVector
            >>> WindowBasedClumping._prune_peak(np.array((3, 9, 8, 4, 6)), 2)
            DenseVector([1.0, 1.0, 0.0, 0.0, 1.0])

        """
        # Initializing the lead list with zeroes:
        is_lead = np.zeros(len(position))

        # List containing indices of leads:
        lead_indices: list[int] = []

        # Looping through all positions:
        for index in range(len(position)):
            # Looping through leads to find out if they are within a window:
            for lead_index in lead_indices:
                # If any of the leads within the window:
                if abs(position[lead_index] - position[index]) < window_size:
                    # Skipping further checks:
                    break
            else:
                # None of the leads were within the window:
                lead_indices.append(index)
                is_lead[index] = 1

        return DenseVector(is_lead)

    @staticmethod
    def clump(
        summary_statistics: SummaryStatistics,
        distance: int = 500_000,
        gwas_significance: float = 5e-8,
    ) -> StudyLocus:
        """Clump significant signals from summary statistics based on window.

        Args:
            summary_statistics (SummaryStatistics): Summary statistics to be used for clumping.
            distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.
            gwas_significance (float): GWAS significance threshold. Defaults to 5e-8.

        Returns:
            StudyLocus: clumped summary statistics (without locus collection)
        """
        # Create window for locus clusters
        # - variants where the distance between subsequent variants is below the defined threshold.
        # - Variants are sorted by descending significance
        cluster_window = Window.partitionBy(
            "studyId", "chromosome", "cluster_id"
        ).orderBy(f.col("pValueExponent").asc(), f.col("pValueMantissa").asc())

        return StudyLocus(
            _df=(
                summary_statistics
                # Dropping snps below significance - all subsequent steps are done on significant variants:
                .pvalue_filter(gwas_significance)
                .df
                # Clustering summary variants for efficient windowing (complexity reduction):
                .withColumn(
                    "cluster_id",
                    WindowBasedClumping._cluster_peaks(
                        f.col("studyId"),
                        f.col("chromosome"),
                        f.col("position"),
                        distance,
                    ),
                )
                # Within each cluster variants are ranked by significance:
                .withColumn("pvRank", f.row_number().over(cluster_window))
                # Collect positions in cluster for the most significant variant (complexity reduction):
                .withColumn(
                    "collectedPositions",
                    f.when(
                        f.col("pvRank") == 1,
                        f.collect_list(f.col("position")).over(
                            cluster_window.rowsBetween(
                                Window.currentRow, Window.unboundedFollowing
                            )
                        ),
                    ).otherwise(f.array()),
                )
                # Get semi indices only ONCE per cluster:
                .withColumn(
                    "semiIndices",
                    f.when(
                        f.size(f.col("collectedPositions")) > 0,
                        fml.vector_to_array(
                            f.udf(WindowBasedClumping._prune_peak, VectorUDT())(
                                fml.array_to_vector(f.col("collectedPositions")),
                                f.lit(distance),
                            )
                        ),
                    ),
                )
                # Propagating the result of the above calculation for all rows:
                .withColumn(
                    "semiIndices",
                    f.when(
                        f.col("semiIndices").isNull(),
                        f.first(f.col("semiIndices"), ignorenulls=True).over(
                            cluster_window
                        ),
                    ).otherwise(f.col("semiIndices")),
                )
                # Keeping semi indices only:
                .filter(f.col("semiIndices")[f.col("pvRank") - 1] > 0)
                .drop("pvRank", "collectedPositions", "semiIndices", "cluster_id")
                # Adding study-locus id:
                .withColumn(
                    "studyLocusId",
                    StudyLocus.assign_study_locus_id(
                        f.col("studyId"), f.col("variantId")
                    ),
                )
                # Initialize QC column as array of strings:
                .withColumn(
                    "qualityControls", f.array().cast(t.ArrayType(t.StringType()))
                )
            ),
            _schema=StudyLocus.get_schema(),
        )

clump(summary_statistics: SummaryStatistics, distance: int = 500000, gwas_significance: float = 5e-08) -> StudyLocus staticmethod

Clump significant signals from summary statistics based on window.

Parameters:

Name Type Description Default
summary_statistics SummaryStatistics

Summary statistics to be used for clumping.

required
distance int

Distance in base pairs to be used for clumping. Defaults to 500_000.

500000
gwas_significance float

GWAS significance threshold. Defaults to 5e-8.

5e-08

Returns:

Name Type Description
StudyLocus StudyLocus

clumped summary statistics (without locus collection)

Source code in src/gentropy/method/window_based_clumping.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
@staticmethod
def clump(
    summary_statistics: SummaryStatistics,
    distance: int = 500_000,
    gwas_significance: float = 5e-8,
) -> StudyLocus:
    """Clump significant signals from summary statistics based on window.

    Args:
        summary_statistics (SummaryStatistics): Summary statistics to be used for clumping.
        distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.
        gwas_significance (float): GWAS significance threshold. Defaults to 5e-8.

    Returns:
        StudyLocus: clumped summary statistics (without locus collection)
    """
    # Create window for locus clusters
    # - variants where the distance between subsequent variants is below the defined threshold.
    # - Variants are sorted by descending significance
    cluster_window = Window.partitionBy(
        "studyId", "chromosome", "cluster_id"
    ).orderBy(f.col("pValueExponent").asc(), f.col("pValueMantissa").asc())

    return StudyLocus(
        _df=(
            summary_statistics
            # Dropping snps below significance - all subsequent steps are done on significant variants:
            .pvalue_filter(gwas_significance)
            .df
            # Clustering summary variants for efficient windowing (complexity reduction):
            .withColumn(
                "cluster_id",
                WindowBasedClumping._cluster_peaks(
                    f.col("studyId"),
                    f.col("chromosome"),
                    f.col("position"),
                    distance,
                ),
            )
            # Within each cluster variants are ranked by significance:
            .withColumn("pvRank", f.row_number().over(cluster_window))
            # Collect positions in cluster for the most significant variant (complexity reduction):
            .withColumn(
                "collectedPositions",
                f.when(
                    f.col("pvRank") == 1,
                    f.collect_list(f.col("position")).over(
                        cluster_window.rowsBetween(
                            Window.currentRow, Window.unboundedFollowing
                        )
                    ),
                ).otherwise(f.array()),
            )
            # Get semi indices only ONCE per cluster:
            .withColumn(
                "semiIndices",
                f.when(
                    f.size(f.col("collectedPositions")) > 0,
                    fml.vector_to_array(
                        f.udf(WindowBasedClumping._prune_peak, VectorUDT())(
                            fml.array_to_vector(f.col("collectedPositions")),
                            f.lit(distance),
                        )
                    ),
                ),
            )
            # Propagating the result of the above calculation for all rows:
            .withColumn(
                "semiIndices",
                f.when(
                    f.col("semiIndices").isNull(),
                    f.first(f.col("semiIndices"), ignorenulls=True).over(
                        cluster_window
                    ),
                ).otherwise(f.col("semiIndices")),
            )
            # Keeping semi indices only:
            .filter(f.col("semiIndices")[f.col("pvRank") - 1] > 0)
            .drop("pvRank", "collectedPositions", "semiIndices", "cluster_id")
            # Adding study-locus id:
            .withColumn(
                "studyLocusId",
                StudyLocus.assign_study_locus_id(
                    f.col("studyId"), f.col("variantId")
                ),
            )
            # Initialize QC column as array of strings:
            .withColumn(
                "qualityControls", f.array().cast(t.ArrayType(t.StringType()))
            )
        ),
        _schema=StudyLocus.get_schema(),
    )

LD-based clumping:

gentropy.method.clump.LDclumping

LD clumping reports the most significant genetic associations in a region in terms of a smaller number of “clumps” of genetically linked SNPs.

Source code in src/gentropy/method/clump.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class LDclumping:
    """LD clumping reports the most significant genetic associations in a region in terms of a smaller number of “clumps” of genetically linked SNPs."""

    @staticmethod
    def _is_lead_linked(
        study_id: Column,
        variant_id: Column,
        p_value_exponent: Column,
        p_value_mantissa: Column,
        ld_set: Column,
    ) -> Column:
        """Evaluates whether a lead variant is linked to a tag (with lowest p-value) in the same studyLocus dataset.

        Args:
            study_id (Column): studyId
            variant_id (Column): Lead variant id
            p_value_exponent (Column): p-value exponent
            p_value_mantissa (Column): p-value mantissa
            ld_set (Column): Array of variants in LD with the lead variant

        Returns:
            Column: Boolean in which True indicates that the lead is linked to another tag in the same dataset.
        """
        leads_in_study = f.collect_set(variant_id).over(Window.partitionBy(study_id))
        tags_in_studylocus = f.array_union(
            # Get all tag variants from the credible set per studyLocusId
            f.transform(ld_set, lambda x: x.tagVariantId),
            # And append the lead variant so that the intersection is the same for all studyLocusIds in a study
            f.array(variant_id),
        )
        intersect_lead_tags = f.array_sort(
            f.array_intersect(leads_in_study, tags_in_studylocus)
        )
        return (
            # If the lead is in the credible set, we rank the peaks by p-value
            f.when(
                f.size(intersect_lead_tags) > 0,
                f.row_number().over(
                    Window.partitionBy(study_id, intersect_lead_tags).orderBy(
                        p_value_exponent, p_value_mantissa
                    )
                )
                > 1,
            )
            # If the intersection is empty (lead is not in the credible set or cred set is empty), the association is not linked
            .otherwise(f.lit(False))
        )

    @classmethod
    def clump(cls: type[LDclumping], associations: StudyLocus) -> StudyLocus:
        """Perform clumping on studyLocus dataset.

        Args:
            associations (StudyLocus): StudyLocus dataset

        Returns:
            StudyLocus: including flag and removing locus information for LD clumped loci.
        """
        return associations.clump()

clump(associations: StudyLocus) -> StudyLocus classmethod

Perform clumping on studyLocus dataset.

Parameters:

Name Type Description Default
associations StudyLocus

StudyLocus dataset

required

Returns:

Name Type Description
StudyLocus StudyLocus

including flag and removing locus information for LD clumped loci.

Source code in src/gentropy/method/clump.py
65
66
67
68
69
70
71
72
73
74
75
@classmethod
def clump(cls: type[LDclumping], associations: StudyLocus) -> StudyLocus:
    """Perform clumping on studyLocus dataset.

    Args:
        associations (StudyLocus): StudyLocus dataset

    Returns:
        StudyLocus: including flag and removing locus information for LD clumped loci.
    """
    return associations.clump()