Skip to content

Clumping

Clumping is a commonly used post-processing method that allows for the identification of independent association signals from GWAS summary statistics and curated associations. This process is critical because of the complex linkage disequilibrium (LD) structure in human populations, which can result in multiple statistically significant associations within the same genomic region. Clumping methods help reduce redundancy in GWAS results and ensure that each reported association represents an independent signal.

We have implemented two clumping methods:

  1. Distance-based clumping: Uses genomic window to clump the significant SNPs into one hit.
  2. LD-based clumping: Uses genomic window and LD to clump the significant SNPs into one hit.
  3. Locus-breaker clumping: Applies a distance cutoff between baseline significant SNPs. Returns the start and end position of the locus as well.

The algorithmic logic is similar to classic clumping approaches from PLINK (Reference: PLINK Clump Documentation). See details below:

Distance-based clumping

gentropy.method.window_based_clumping.WindowBasedClumping

Get semi-lead snps from summary statistics using a window based function.

Source code in src/gentropy/method/window_based_clumping.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class WindowBasedClumping:
    """Get semi-lead snps from summary statistics using a window based function."""

    @staticmethod
    def _cluster_peaks(
        study: Column, chromosome: Column, position: Column, window_length: int
    ) -> Column:
        """Cluster GWAS significant variants, were clusters are separated by a defined distance.

        !! Important to note that the length of the clusters can be arbitrarily big.

        Args:
            study (Column): study identifier
            chromosome (Column): chromosome identifier
            position (Column): position of the variant
            window_length (int): window length in basepair

        Returns:
            Column: containing cluster identifier

        Examples:
            >>> data = [
            ...     # Cluster 1:
            ...     ('s1', 'chr1', 2),
            ...     ('s1', 'chr1', 4),
            ...     ('s1', 'chr1', 12),
            ...     # Cluster 2 - Same chromosome:
            ...     ('s1', 'chr1', 31),
            ...     ('s1', 'chr1', 38),
            ...     ('s1', 'chr1', 42),
            ...     # Cluster 3 - New chromosome:
            ...     ('s1', 'chr2', 41),
            ...     ('s1', 'chr2', 44),
            ...     ('s1', 'chr2', 50),
            ...     # Cluster 4 - other study:
            ...     ('s2', 'chr2', 55),
            ...     ('s2', 'chr2', 62),
            ...     ('s2', 'chr2', 70),
            ... ]
            >>> window_length = 10
            >>> (
            ...     spark.createDataFrame(data, ['studyId', 'chromosome', 'position'])
            ...     .withColumn("cluster_id",
            ...         WindowBasedClumping._cluster_peaks(
            ...             f.col('studyId'),
            ...             f.col('chromosome'),
            ...             f.col('position'),
            ...             window_length
            ...         )
            ...     ).show()
            ... )
            +-------+----------+--------+----------+
            |studyId|chromosome|position|cluster_id|
            +-------+----------+--------+----------+
            |     s1|      chr1|       2| s1_chr1_2|
            |     s1|      chr1|       4| s1_chr1_2|
            |     s1|      chr1|      12| s1_chr1_2|
            |     s1|      chr1|      31|s1_chr1_31|
            |     s1|      chr1|      38|s1_chr1_31|
            |     s1|      chr1|      42|s1_chr1_31|
            |     s1|      chr2|      41|s1_chr2_41|
            |     s1|      chr2|      44|s1_chr2_41|
            |     s1|      chr2|      50|s1_chr2_41|
            |     s2|      chr2|      55|s2_chr2_55|
            |     s2|      chr2|      62|s2_chr2_55|
            |     s2|      chr2|      70|s2_chr2_55|
            +-------+----------+--------+----------+
            <BLANKLINE>

        """
        # By adding previous position, the cluster boundary can be identified:
        previous_position = f.lag(position).over(
            Window.partitionBy(study, chromosome).orderBy(position)
        )
        # We consider a cluster boudary if subsequent snps are further than the defined window:
        cluster_id = f.when(
            (previous_position.isNull())
            | (position - previous_position > window_length),
            f.concat_ws("_", study, chromosome, position),
        )
        # The cluster identifier is propagated across every variant of the cluster:
        return f.when(
            cluster_id.isNull(),
            f.last(cluster_id, ignorenulls=True).over(
                Window.partitionBy(study, chromosome)
                .orderBy(position)
                .rowsBetween(Window.unboundedPreceding, Window.currentRow)
            ),
        ).otherwise(cluster_id)

    @staticmethod
    def _prune_peak(position: NDArray[np.float64], window_size: int) -> DenseVector:
        """Establish lead snps based on their positions listed by p-value.

        The function `find_peak` assigns lead SNPs based on their positions listed by p-value within a specified window size.

        Args:
            position (NDArray[np.float64]): positions of the SNPs sorted by p-value.
            window_size (int): the distance in bp within which associations are clumped together around the lead snp.

        Returns:
            DenseVector: binary vector where 1 indicates a lead SNP and 0 indicates a non-lead SNP.

        Examples:
            >>> from pyspark.ml import functions as fml
            >>> from pyspark.ml.linalg import DenseVector
            >>> WindowBasedClumping._prune_peak(np.array((3, 9, 8, 4, 6)), 2)
            DenseVector([1.0, 1.0, 0.0, 0.0, 1.0])

        """
        # Initializing the lead list with zeroes:
        is_lead = np.zeros(len(position))

        # List containing indices of leads:
        lead_indices: list[int] = []

        # Looping through all positions:
        for index in range(len(position)):
            # Looping through leads to find out if they are within a window:
            for lead_index in lead_indices:
                # If any of the leads within the window:
                if abs(position[lead_index] - position[index]) < window_size:
                    # Skipping further checks:
                    break
            else:
                # None of the leads were within the window:
                lead_indices.append(index)
                is_lead[index] = 1

        return DenseVector(is_lead)

    @staticmethod
    def clump(
        unclumped_associations: SummaryStatistics | StudyLocus,
        distance: int = WindowBasedClumpingStepConfig().distance,
    ) -> StudyLocus:
        """Clump single point associations from summary statistics or study locus dataset based on window.

        Args:
            unclumped_associations (SummaryStatistics | StudyLocus): Input dataset to be used for clumping. Assumes that the input dataset is already filtered for significant variants.
            distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.

        Returns:
            StudyLocus: clumped associations, where the clumped variants are flagged.
        """
        # Quality check expression that flags variants that are not considered lead variant:
        qc_check = f.col("semiIndices")[f.col("pvRank") - 1] <= 0

        # The quality control expression will depend on the input dataset, as the column might be already present:
        qc_expression = (
            # When the column is already present and the condition is met, the value is appended to the array, otherwise keep as is:
            f.when(
                qc_check,
                f.array_union(
                    f.col("qualityControls"),
                    f.array(f.lit(StudyLocusQualityCheck.WINDOW_CLUMPED.value)),
                ),
            ).otherwise(f.col("qualityControls"))
            if "qualityControls" in unclumped_associations.df.columns
            # If column is not there yet, initialize it with the flag value, or an empty array:
            else f.when(
                qc_check, f.array(f.lit(StudyLocusQualityCheck.WINDOW_CLUMPED.value))
            ).otherwise(f.array().cast(t.ArrayType(t.StringType())))
        )

        # Create window for locus clusters
        # - variants where the distance between subsequent variants is below the defined threshold.
        # - Variants are sorted by descending significance
        cluster_window = Window.partitionBy(
            "studyId", "chromosome", "cluster_id"
        ).orderBy(f.col("pValueExponent").asc(), f.col("pValueMantissa").asc())

        return StudyLocus(
            _df=(
                unclumped_associations.df
                # Clustering variants for efficient windowing (complexity reduction):
                .withColumn(
                    "cluster_id",
                    WindowBasedClumping._cluster_peaks(
                        f.col("studyId"),
                        f.col("chromosome"),
                        f.col("position"),
                        distance,
                    ),
                )
                # Within each cluster variants are ranked by significance:
                .withColumn("pvRank", f.row_number().over(cluster_window))
                # Collect positions in cluster for the most significant variant (complexity reduction):
                .withColumn(
                    "collectedPositions",
                    f.when(
                        f.col("pvRank") == 1,
                        f.collect_list(f.col("position")).over(
                            cluster_window.rowsBetween(
                                Window.currentRow, Window.unboundedFollowing
                            )
                        ),
                    ).otherwise(f.array()),
                )
                # Collect top loci per cluster:
                .withColumn(
                    "semiIndices",
                    f.when(
                        f.size(f.col("collectedPositions")) > 0,
                        fml.vector_to_array(
                            f.udf(WindowBasedClumping._prune_peak, VectorUDT())(
                                fml.array_to_vector(f.col("collectedPositions")),
                                f.lit(distance),
                            )
                        ),
                    ),
                )
                # Propagating the result of the above calculation for all rows:
                .withColumn(
                    "semiIndices",
                    f.when(
                        f.col("semiIndices").isNull(),
                        f.first(f.col("semiIndices"), ignorenulls=True).over(
                            cluster_window
                        ),
                    ).otherwise(f.col("semiIndices")),
                )
                # Adding study-locus id:
                .withColumn(
                    "studyLocusId",
                    StudyLocus.assign_study_locus_id(
                        ["studyId", "variantId"]
                    ),
                )
                # Initialize QC column as array of strings:
                .withColumn("qualityControls", qc_expression)
                .drop("pvRank", "collectedPositions", "semiIndices", "cluster_id")
            ),
            _schema=StudyLocus.get_schema(),
        )

clump(unclumped_associations: SummaryStatistics | StudyLocus, distance: int = WindowBasedClumpingStepConfig().distance) -> StudyLocus staticmethod

Clump single point associations from summary statistics or study locus dataset based on window.

Parameters:

Name Type Description Default
unclumped_associations SummaryStatistics | StudyLocus

Input dataset to be used for clumping. Assumes that the input dataset is already filtered for significant variants.

required
distance int

Distance in base pairs to be used for clumping. Defaults to 500_000.

distance

Returns:

Name Type Description
StudyLocus StudyLocus

clumped associations, where the clumped variants are flagged.

Source code in src/gentropy/method/window_based_clumping.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@staticmethod
def clump(
    unclumped_associations: SummaryStatistics | StudyLocus,
    distance: int = WindowBasedClumpingStepConfig().distance,
) -> StudyLocus:
    """Clump single point associations from summary statistics or study locus dataset based on window.

    Args:
        unclumped_associations (SummaryStatistics | StudyLocus): Input dataset to be used for clumping. Assumes that the input dataset is already filtered for significant variants.
        distance (int): Distance in base pairs to be used for clumping. Defaults to 500_000.

    Returns:
        StudyLocus: clumped associations, where the clumped variants are flagged.
    """
    # Quality check expression that flags variants that are not considered lead variant:
    qc_check = f.col("semiIndices")[f.col("pvRank") - 1] <= 0

    # The quality control expression will depend on the input dataset, as the column might be already present:
    qc_expression = (
        # When the column is already present and the condition is met, the value is appended to the array, otherwise keep as is:
        f.when(
            qc_check,
            f.array_union(
                f.col("qualityControls"),
                f.array(f.lit(StudyLocusQualityCheck.WINDOW_CLUMPED.value)),
            ),
        ).otherwise(f.col("qualityControls"))
        if "qualityControls" in unclumped_associations.df.columns
        # If column is not there yet, initialize it with the flag value, or an empty array:
        else f.when(
            qc_check, f.array(f.lit(StudyLocusQualityCheck.WINDOW_CLUMPED.value))
        ).otherwise(f.array().cast(t.ArrayType(t.StringType())))
    )

    # Create window for locus clusters
    # - variants where the distance between subsequent variants is below the defined threshold.
    # - Variants are sorted by descending significance
    cluster_window = Window.partitionBy(
        "studyId", "chromosome", "cluster_id"
    ).orderBy(f.col("pValueExponent").asc(), f.col("pValueMantissa").asc())

    return StudyLocus(
        _df=(
            unclumped_associations.df
            # Clustering variants for efficient windowing (complexity reduction):
            .withColumn(
                "cluster_id",
                WindowBasedClumping._cluster_peaks(
                    f.col("studyId"),
                    f.col("chromosome"),
                    f.col("position"),
                    distance,
                ),
            )
            # Within each cluster variants are ranked by significance:
            .withColumn("pvRank", f.row_number().over(cluster_window))
            # Collect positions in cluster for the most significant variant (complexity reduction):
            .withColumn(
                "collectedPositions",
                f.when(
                    f.col("pvRank") == 1,
                    f.collect_list(f.col("position")).over(
                        cluster_window.rowsBetween(
                            Window.currentRow, Window.unboundedFollowing
                        )
                    ),
                ).otherwise(f.array()),
            )
            # Collect top loci per cluster:
            .withColumn(
                "semiIndices",
                f.when(
                    f.size(f.col("collectedPositions")) > 0,
                    fml.vector_to_array(
                        f.udf(WindowBasedClumping._prune_peak, VectorUDT())(
                            fml.array_to_vector(f.col("collectedPositions")),
                            f.lit(distance),
                        )
                    ),
                ),
            )
            # Propagating the result of the above calculation for all rows:
            .withColumn(
                "semiIndices",
                f.when(
                    f.col("semiIndices").isNull(),
                    f.first(f.col("semiIndices"), ignorenulls=True).over(
                        cluster_window
                    ),
                ).otherwise(f.col("semiIndices")),
            )
            # Adding study-locus id:
            .withColumn(
                "studyLocusId",
                StudyLocus.assign_study_locus_id(
                    ["studyId", "variantId"]
                ),
            )
            # Initialize QC column as array of strings:
            .withColumn("qualityControls", qc_expression)
            .drop("pvRank", "collectedPositions", "semiIndices", "cluster_id")
        ),
        _schema=StudyLocus.get_schema(),
    )

LD-based clumping:

gentropy.method.clump.LDclumping

LD clumping reports the most significant genetic associations in a region in terms of a smaller number of “clumps” of genetically linked SNPs.

Source code in src/gentropy/method/clump.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class LDclumping:
    """LD clumping reports the most significant genetic associations in a region in terms of a smaller number of “clumps” of genetically linked SNPs."""

    @staticmethod
    def _is_lead_linked(
        study_id: Column,
        chromosome: Column,
        variant_id: Column,
        p_value_exponent: Column,
        p_value_mantissa: Column,
        ld_set: Column,
    ) -> Column:
        """Evaluates whether a lead variant is linked to a tag (with lowest p-value) in the same studyLocus dataset.

        Args:
            study_id (Column): studyId
            chromosome (Column): chromosome
            variant_id (Column): Lead variant id
            p_value_exponent (Column): p-value exponent
            p_value_mantissa (Column): p-value mantissa
            ld_set (Column): Array of variants in LD with the lead variant

        Returns:
            Column: Boolean in which True indicates that the lead is linked to another tag in the same dataset.
        """
        # Partitoning data by study and chromosome - this is the scope for looking for linked loci.
        # Within the partition, we order the data by increasing p-value, and we collect the more significant lead variants in the window.
        windowspec = (
            Window.partitionBy(study_id, chromosome)
            .orderBy(p_value_exponent.asc(), p_value_mantissa.asc())
            .rowsBetween(Window.unboundedPreceding, Window.currentRow)
        )
        more_significant_leads = f.collect_set(variant_id).over(windowspec)

        # Collect all variants from the ld_set + adding the lead variant to the list to make sure that the lead is always in the list.
        tags_in_studylocus = f.array_distinct(
            f.array_union(
                f.array(variant_id),
                f.transform(ld_set, lambda x: x.getField("tagVariantId")),
            )
        )

        # If more than one tags of the ld_set can be found in the list of the more significant leads, the lead is linked.
        # Study loci without variantId is considered as not linked.
        # Also leads that were not found in the LD index is also considered as not linked.
        return f.when(
            variant_id.isNotNull(),
            f.size(f.array_intersect(more_significant_leads, tags_in_studylocus)) > 1,
        ).otherwise(f.lit(False))

    @classmethod
    def clump(cls: type[LDclumping], associations: StudyLocus) -> StudyLocus:
        """Perform clumping on studyLocus dataset.

        Args:
            associations (StudyLocus): StudyLocus dataset

        Returns:
            StudyLocus: including flag and removing locus information for LD clumped loci.
        """
        return associations.clump()

clump(associations: StudyLocus) -> StudyLocus classmethod

Perform clumping on studyLocus dataset.

Parameters:

Name Type Description Default
associations StudyLocus

StudyLocus dataset

required

Returns:

Name Type Description
StudyLocus StudyLocus

including flag and removing locus information for LD clumped loci.

Source code in src/gentropy/method/clump.py
66
67
68
69
70
71
72
73
74
75
76
@classmethod
def clump(cls: type[LDclumping], associations: StudyLocus) -> StudyLocus:
    """Perform clumping on studyLocus dataset.

    Args:
        associations (StudyLocus): StudyLocus dataset

    Returns:
        StudyLocus: including flag and removing locus information for LD clumped loci.
    """
    return associations.clump()

Locus-breaker clumping

gentropy.method.locus_breaker_clumping.LocusBreakerClumping

Locus-breaker clumping method.

Source code in src/gentropy/method/locus_breaker_clumping.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class LocusBreakerClumping:
    """Locus-breaker clumping method."""

    @staticmethod
    def locus_breaker(
        summary_statistics: SummaryStatistics,
        baseline_pvalue_cutoff: float,
        distance_cutoff: int,
        pvalue_cutoff: float,
        flanking_distance: int,
    ) -> StudyLocus:
        """Identify GWAS associated loci based on the provided p-value and distance cutoff.

        - The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
        - The distance is only calculted between single point associations that reach the baseline p-value cutoff.
        - As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
        - To ensure completeness, the locus is extended by a flanking distance in both ends.

        Args:
            summary_statistics (SummaryStatistics): Input summary statistics dataset.
            baseline_pvalue_cutoff (float): baseline significance we consider for the locus.
            distance_cutoff (int): minimum distance that separates two loci.
            pvalue_cutoff (float): the minimum significance the locus should have.
            flanking_distance (int): the distance to extend the locus in both directions.

        Returns:
            StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus.
        """
        # Extract columns from the summary statistics:
        columns_sumstats_columns = summary_statistics.df.columns
        # Convert pvalue_cutoff to neglog scale:
        neglog_pv_cutoff = -np.log10(pvalue_cutoff)

        # First window to calculate the distance between consecutive positions:
        w1 = Window.partitionBy("studyId", "chromosome").orderBy("position")

        # Second window to calculate the locus start and end:
        w2 = (
            Window.partitionBy("studyId", "chromosome", "locusStart")
            .orderBy("position")
            .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
        )

        # Third window to rank the variants within the locus based on neglog p-value to find top loci:
        w3 = Window.partitionBy(
            "studyId", "chromosome", "locusStart", "locusEnd"
        ).orderBy(f.col("negLogPValue").desc())

        return StudyLocus(
            _df=(
                # Applying the baseline p-value cutoff:
                summary_statistics.pvalue_filter(baseline_pvalue_cutoff)
                # Calculating the neglog p-value for easier sorting:
                .df.withColumn(
                    "negLogPValue",
                    calculate_neglog_pvalue(
                        f.col("pValueMantissa"), f.col("pValueExponent")
                    ),
                )
                # Calculating the distance between consecutive positions, then identifying the locus start and end:
                .withColumn("next_position", f.lag(f.col("position")).over(w1))
                .withColumn("distance", f.col("position") - f.col("next_position"))
                .withColumn(
                    "locusStart",
                    f.when(
                        (f.col("distance") > distance_cutoff)
                        | f.col("distance").isNull(),
                        f.col("position"),
                    ),
                )
                .withColumn(
                    "locusStart",
                    f.when(
                        f.last(f.col("locusStart") - flanking_distance, True).over(
                            w1.rowsBetween(-sys.maxsize, 0)
                        )
                        > 0,
                        f.last(f.col("locusStart") - flanking_distance, True).over(
                            w1.rowsBetween(-sys.maxsize, 0)
                        ),
                    ).otherwise(f.lit(0)),
                )
                .withColumn(
                    "locusEnd", f.max(f.col("position") + flanking_distance).over(w2)
                )
                .withColumn("rank", f.rank().over(w3))
                .filter(
                    (f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)
                )
                .select(
                    *columns_sumstats_columns,
                    # To make sure that the type of locusStart and locusEnd follows schema of StudyLocus:
                    f.col("locusStart").cast(t.IntegerType()).alias("locusStart"),
                    f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"),
                    f.lit(None)
                    .cast(t.ArrayType(t.StringType()))
                    .alias("qualityControls"),
                    StudyLocus.assign_study_locus_id(["studyId", "variantId"]),
                )
            ),
            _schema=StudyLocus.get_schema(),
        )

    @staticmethod
    def process_locus_breaker_output(
        lbc: StudyLocus,
        wbc: StudyLocus,
        large_loci_size: int,
    ) -> StudyLocus:
        """Process the locus breaker method result, and run window-based clumping on large loci.

        Args:
            lbc (StudyLocus): StudyLocus object from locus-breaker clumping.
            wbc (StudyLocus): StudyLocus object from window-based clumping.
            large_loci_size (int): the size to define large loci which should be broken with wbc.

        Returns:
            StudyLocus: clumped study loci with large loci broken by window-based clumping.
        """
        large_loci_size = int(large_loci_size)
        small_loci = lbc.filter(
            (f.col("locusEnd") - f.col("locusStart")) <= large_loci_size
        )
        large_loci = lbc.filter(
            (f.col("locusEnd") - f.col("locusStart")) > large_loci_size
        )
        large_loci_wbc = StudyLocus(
            wbc.df.alias("wbc")
            .join(
                large_loci.df.alias("ll"),
                (f.col("wbc.studyId") == f.col("ll.studyId"))
                & (f.col("wbc.chromosome") == f.col("ll.chromosome"))
                & (
                    f.col("wbc.position").between(
                        f.col("ll.locusStart"), f.col("ll.locusEnd")
                    )
                ),
                "semi",
            )
            .withColumns(
                {
                    "locusStart": f.col("position") - large_loci_size // 2,
                    "locusEnd": f.col("position") + large_loci_size // 2,
                }
            ),
            StudyLocus.get_schema(),
        )
        return StudyLocus(
            large_loci_wbc.df.unionByName(small_loci.df),
            StudyLocus.get_schema(),
        )

locus_breaker(summary_statistics: SummaryStatistics, baseline_pvalue_cutoff: float, distance_cutoff: int, pvalue_cutoff: float, flanking_distance: int) -> StudyLocus staticmethod

Identify GWAS associated loci based on the provided p-value and distance cutoff.

  • The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
  • The distance is only calculted between single point associations that reach the baseline p-value cutoff.
  • As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
  • To ensure completeness, the locus is extended by a flanking distance in both ends.

Parameters:

Name Type Description Default
summary_statistics SummaryStatistics

Input summary statistics dataset.

required
baseline_pvalue_cutoff float

baseline significance we consider for the locus.

required
distance_cutoff int

minimum distance that separates two loci.

required
pvalue_cutoff float

the minimum significance the locus should have.

required
flanking_distance int

the distance to extend the locus in both directions.

required

Returns:

Name Type Description
StudyLocus StudyLocus

clumped study loci with locus start and end positions + lead variant from the locus.

Source code in src/gentropy/method/locus_breaker_clumping.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
@staticmethod
def locus_breaker(
    summary_statistics: SummaryStatistics,
    baseline_pvalue_cutoff: float,
    distance_cutoff: int,
    pvalue_cutoff: float,
    flanking_distance: int,
) -> StudyLocus:
    """Identify GWAS associated loci based on the provided p-value and distance cutoff.

    - The GWAS associated loci identified by this method have a varying width, and are separated by a distance greater than the provided distance cutoff.
    - The distance is only calculted between single point associations that reach the baseline p-value cutoff.
    - As the width of the selected genomic region dynamically depends on the loci, the resulting StudyLocus object will contain the locus start and end position.
    - To ensure completeness, the locus is extended by a flanking distance in both ends.

    Args:
        summary_statistics (SummaryStatistics): Input summary statistics dataset.
        baseline_pvalue_cutoff (float): baseline significance we consider for the locus.
        distance_cutoff (int): minimum distance that separates two loci.
        pvalue_cutoff (float): the minimum significance the locus should have.
        flanking_distance (int): the distance to extend the locus in both directions.

    Returns:
        StudyLocus: clumped study loci with locus start and end positions + lead variant from the locus.
    """
    # Extract columns from the summary statistics:
    columns_sumstats_columns = summary_statistics.df.columns
    # Convert pvalue_cutoff to neglog scale:
    neglog_pv_cutoff = -np.log10(pvalue_cutoff)

    # First window to calculate the distance between consecutive positions:
    w1 = Window.partitionBy("studyId", "chromosome").orderBy("position")

    # Second window to calculate the locus start and end:
    w2 = (
        Window.partitionBy("studyId", "chromosome", "locusStart")
        .orderBy("position")
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )

    # Third window to rank the variants within the locus based on neglog p-value to find top loci:
    w3 = Window.partitionBy(
        "studyId", "chromosome", "locusStart", "locusEnd"
    ).orderBy(f.col("negLogPValue").desc())

    return StudyLocus(
        _df=(
            # Applying the baseline p-value cutoff:
            summary_statistics.pvalue_filter(baseline_pvalue_cutoff)
            # Calculating the neglog p-value for easier sorting:
            .df.withColumn(
                "negLogPValue",
                calculate_neglog_pvalue(
                    f.col("pValueMantissa"), f.col("pValueExponent")
                ),
            )
            # Calculating the distance between consecutive positions, then identifying the locus start and end:
            .withColumn("next_position", f.lag(f.col("position")).over(w1))
            .withColumn("distance", f.col("position") - f.col("next_position"))
            .withColumn(
                "locusStart",
                f.when(
                    (f.col("distance") > distance_cutoff)
                    | f.col("distance").isNull(),
                    f.col("position"),
                ),
            )
            .withColumn(
                "locusStart",
                f.when(
                    f.last(f.col("locusStart") - flanking_distance, True).over(
                        w1.rowsBetween(-sys.maxsize, 0)
                    )
                    > 0,
                    f.last(f.col("locusStart") - flanking_distance, True).over(
                        w1.rowsBetween(-sys.maxsize, 0)
                    ),
                ).otherwise(f.lit(0)),
            )
            .withColumn(
                "locusEnd", f.max(f.col("position") + flanking_distance).over(w2)
            )
            .withColumn("rank", f.rank().over(w3))
            .filter(
                (f.col("rank") == 1) & (f.col("negLogPValue") > neglog_pv_cutoff)
            )
            .select(
                *columns_sumstats_columns,
                # To make sure that the type of locusStart and locusEnd follows schema of StudyLocus:
                f.col("locusStart").cast(t.IntegerType()).alias("locusStart"),
                f.col("locusEnd").cast(t.IntegerType()).alias("locusEnd"),
                f.lit(None)
                .cast(t.ArrayType(t.StringType()))
                .alias("qualityControls"),
                StudyLocus.assign_study_locus_id(["studyId", "variantId"]),
            )
        ),
        _schema=StudyLocus.get_schema(),
    )

process_locus_breaker_output(lbc: StudyLocus, wbc: StudyLocus, large_loci_size: int) -> StudyLocus staticmethod

Process the locus breaker method result, and run window-based clumping on large loci.

Parameters:

Name Type Description Default
lbc StudyLocus

StudyLocus object from locus-breaker clumping.

required
wbc StudyLocus

StudyLocus object from window-based clumping.

required
large_loci_size int

the size to define large loci which should be broken with wbc.

required

Returns:

Name Type Description
StudyLocus StudyLocus

clumped study loci with large loci broken by window-based clumping.

Source code in src/gentropy/method/locus_breaker_clumping.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@staticmethod
def process_locus_breaker_output(
    lbc: StudyLocus,
    wbc: StudyLocus,
    large_loci_size: int,
) -> StudyLocus:
    """Process the locus breaker method result, and run window-based clumping on large loci.

    Args:
        lbc (StudyLocus): StudyLocus object from locus-breaker clumping.
        wbc (StudyLocus): StudyLocus object from window-based clumping.
        large_loci_size (int): the size to define large loci which should be broken with wbc.

    Returns:
        StudyLocus: clumped study loci with large loci broken by window-based clumping.
    """
    large_loci_size = int(large_loci_size)
    small_loci = lbc.filter(
        (f.col("locusEnd") - f.col("locusStart")) <= large_loci_size
    )
    large_loci = lbc.filter(
        (f.col("locusEnd") - f.col("locusStart")) > large_loci_size
    )
    large_loci_wbc = StudyLocus(
        wbc.df.alias("wbc")
        .join(
            large_loci.df.alias("ll"),
            (f.col("wbc.studyId") == f.col("ll.studyId"))
            & (f.col("wbc.chromosome") == f.col("ll.chromosome"))
            & (
                f.col("wbc.position").between(
                    f.col("ll.locusStart"), f.col("ll.locusEnd")
                )
            ),
            "semi",
        )
        .withColumns(
            {
                "locusStart": f.col("position") - large_loci_size // 2,
                "locusEnd": f.col("position") + large_loci_size // 2,
            }
        ),
        StudyLocus.get_schema(),
    )
    return StudyLocus(
        large_loci_wbc.df.unionByName(small_loci.df),
        StudyLocus.get_schema(),
    )