Skip to content

Study index gwas catalog

Bases: StudyIndex

Study index dataset from GWAS Catalog.

The following information is harmonised from the GWAS Catalog:

  • All publication related information retained.
  • Mapped measured and background traits parsed.
  • Flagged if harmonized summary statistics datasets available.
  • If available, the ftp path to these files presented.
  • Ancestries from the discovery and replication stages are structured with sample counts.
  • Case/control counts extracted.
  • The number of samples with European ancestry extracted.
Source code in src/otg/dataset/study_index.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@dataclass
class StudyIndexGWASCatalog(StudyIndex):
    """Study index dataset from GWAS Catalog.

    The following information is harmonised from the GWAS Catalog:

    - All publication related information retained.
    - Mapped measured and background traits parsed.
    - Flagged if harmonized summary statistics datasets available.
    - If available, the ftp path to these files presented.
    - Ancestries from the discovery and replication stages are structured with sample counts.
    - Case/control counts extracted.
    - The number of samples with European ancestry extracted.

    """

    @staticmethod
    def _gwas_ancestry_to_gnomad(gwas_catalog_ancestry: Column) -> Column:
        """Normalised ancestry column from GWAS Catalog into Gnomad ancestry.

        Args:
            gwas_catalog_ancestry (Column): GWAS Catalog ancestry

        Returns:
            Column: mapped Gnomad ancestry using LUT
        """
        # GWAS Catalog to p-value mapping
        json_dict = json.loads(
            pkg_resources.read_text(
                data, "gwascat_2_gnomad_superpopulation_map.json", encoding="utf-8"
            )
        )
        map_expr = f.create_map(*[f.lit(x) for x in chain(*json_dict.items())])

        return f.transform(gwas_catalog_ancestry, lambda x: map_expr[x])

    @classmethod
    def get_schema(cls: type[StudyIndexGWASCatalog]) -> StructType:
        """Provides the schema for the StudyIndexGWASCatalog dataset.

        This method is a duplication from the parent class, but by definition, the use of abstract methods require that every child class implements them.

        Returns:
            StructType: Spark schema for the StudyIndexGWASCatalog dataset.
        """
        return parse_spark_schema("studies.json")

    @classmethod
    def _parse_study_table(
        cls: type[StudyIndexGWASCatalog], catalog_studies: DataFrame
    ) -> StudyIndexGWASCatalog:
        """Harmonise GWASCatalog study table with `StudyIndex` schema.

        Args:
            catalog_studies (DataFrame): GWAS Catalog study table

        Returns:
            StudyIndexGWASCatalog:
        """
        return cls(
            _df=catalog_studies.select(
                f.coalesce(
                    f.col("STUDY ACCESSION"), f.monotonically_increasing_id()
                ).alias("studyId"),
                f.lit("GCST").alias("projectId"),
                f.lit("gwas").alias("studyType"),
                f.col("PUBMED ID").alias("pubmedId"),
                f.col("FIRST AUTHOR").alias("publicationFirstAuthor"),
                f.col("DATE").alias("publicationDate"),
                f.col("JOURNAL").alias("publicationJournal"),
                f.col("STUDY").alias("publicationTitle"),
                f.coalesce(f.col("DISEASE/TRAIT"), f.lit("Unreported")).alias(
                    "traitFromSource"
                ),
                f.col("INITIAL SAMPLE SIZE").alias("initialSampleSize"),
                parse_efos(f.col("MAPPED_TRAIT_URI")).alias("traitFromSourceMappedIds"),
                parse_efos(f.col("MAPPED BACKGROUND TRAIT URI")).alias(
                    "backgroundTraitFromSourceMappedIds"
                ),
            ),
            _schema=cls.get_schema(),
        )

    @classmethod
    def from_source(
        cls: type[StudyIndexGWASCatalog],
        catalog_studies: DataFrame,
        ancestry_file: DataFrame,
        sumstats_lut: DataFrame,
    ) -> StudyIndexGWASCatalog:
        """This function ingests study level metadata from the GWAS Catalog.

        Args:
            catalog_studies (DataFrame): GWAS Catalog raw study table
            ancestry_file (DataFrame): GWAS Catalog ancestry table.
            sumstats_lut (DataFrame): GWAS Catalog summary statistics list.

        Returns:
            StudyIndexGWASCatalog: Parsed and annotated GWAS Catalog study table.
        """
        # Read GWAS Catalogue raw data
        return (
            cls._parse_study_table(catalog_studies)
            ._annotate_ancestries(ancestry_file)
            ._annotate_sumstats_info(sumstats_lut)
            ._annotate_discovery_sample_sizes()
        )

    def get_gnomad_population_structure(self: StudyIndexGWASCatalog) -> DataFrame:
        """Get the population structure (ancestry normalised to gnomAD and population sizes) for every study.

        Returns:
            DataFrame: containing `studyId` and `populationsStructure`, where each element of the array represents a population
        """
        # Study ancestries
        w_study = Window.partitionBy("studyId")
        return (
            self.df
            # Excluding studies where no sample discription is provided:
            .filter(f.col("discoverySamples").isNotNull())
            # Exploding sample description and study identifier:
            .withColumn("discoverySample", f.explode(f.col("discoverySamples")))
            # Splitting sample descriptions further:
            .withColumn(
                "ancestries",
                f.split(f.col("discoverySample.ancestry"), r",\s(?![^()]*\))"),
            )
            # Dividing sample sizes assuming even distribution
            .withColumn(
                "adjustedSampleSize",
                f.col("discoverySample.sampleSize") / f.size(f.col("ancestries")),
            )
            # mapped to gnomAD superpopulation and exploded
            .withColumn(
                "population",
                f.explode(
                    StudyIndexGWASCatalog._gwas_ancestry_to_gnomad(f.col("ancestries"))
                ),
            )
            # Group by studies and aggregate for major population:
            .groupBy("studyId", "population")
            .agg(f.sum(f.col("adjustedSampleSize")).alias("sampleSize"))
            # Calculate proportions for each study
            .withColumn(
                "relativeSampleSize",
                f.col("sampleSize") / f.sum("sampleSize").over(w_study),
            )
            .withColumn(
                "populationStructure",
                f.struct("population", "relativeSampleSize"),
            )
            .groupBy("studyId")
            .agg(f.collect_set("populationStructure").alias("populationsStructure"))
        )

    def update_study_id(
        self: StudyIndexGWASCatalog, study_annotation: DataFrame
    ) -> StudyIndexGWASCatalog:
        """Update studyId with a dataframe containing study.

        Args:
            study_annotation (DataFrame): Dataframe containing `updatedStudyId`, `traitFromSource`, `traitFromSourceMappedIds` and key column `studyId`.

        Returns:
            StudyIndexGWASCatalog: Updated study table.
        """
        self.df = (
            self._df.join(
                study_annotation.select(
                    *[
                        f.col(c).alias(f"updated{c}")
                        if c not in ["studyId", "updatedStudyId"]
                        else f.col(c)
                        for c in study_annotation.columns
                    ]
                ),
                on="studyId",
                how="left",
            )
            .withColumn(
                "studyId",
                f.coalesce(f.col("updatedStudyId"), f.col("studyId")),
            )
            .withColumn(
                "traitFromSource",
                f.coalesce(f.col("updatedtraitFromSource"), f.col("traitFromSource")),
            )
            .withColumn(
                "traitFromSourceMappedIds",
                f.coalesce(
                    f.col("updatedtraitFromSourceMappedIds"),
                    f.col("traitFromSourceMappedIds"),
                ),
            )
            .select(self._df.columns)
        )

        return self

    def _annotate_ancestries(
        self: StudyIndexGWASCatalog, ancestry_lut: DataFrame
    ) -> StudyIndexGWASCatalog:
        """Extracting sample sizes and ancestry information.

        This function parses the ancestry data. Also get counts for the europeans in the same
        discovery stage.

        Args:
            ancestry_lut (DataFrame): Ancestry table as downloaded from the GWAS Catalog

        Returns:
            StudyIndexGWASCatalog: Slimmed and cleaned version of the ancestry annotation.
        """
        ancestry = (
            ancestry_lut
            # Convert column headers to camelcase:
            .transform(
                lambda df: df.select(
                    *[f.expr(column2camel_case(x)) for x in df.columns]
                )
            ).withColumnRenamed(
                "studyAccession", "studyId"
            )  # studyId has not been split yet
        )

        # Get a high resolution dataset on experimental stage:
        ancestry_stages = (
            ancestry.groupBy("studyId")
            .pivot("stage")
            .agg(
                f.collect_set(
                    f.struct(
                        f.col("numberOfIndividuals").alias("sampleSize"),
                        f.col("broadAncestralCategory").alias("ancestry"),
                    )
                )
            )
            .withColumnRenamed("initial", "discoverySamples")
            .withColumnRenamed("replication", "replicationSamples")
            .persist()
        )

        # Generate information on the ancestry composition of the discovery stage, and calculate
        # the proportion of the Europeans:
        europeans_deconvoluted = (
            ancestry
            # Focus on discovery stage:
            .filter(f.col("stage") == "initial")
            # Sorting ancestries if European:
            .withColumn(
                "ancestryFlag",
                # Excluding finnish:
                f.when(
                    f.col("initialSampleDescription").contains("Finnish"),
                    f.lit("other"),
                )
                # Excluding Icelandic population:
                .when(
                    f.col("initialSampleDescription").contains("Icelandic"),
                    f.lit("other"),
                )
                # Including European ancestry:
                .when(f.col("broadAncestralCategory") == "European", f.lit("european"))
                # Exclude all other population:
                .otherwise("other"),
            )
            # Grouping by study accession and initial sample description:
            .groupBy("studyId")
            .pivot("ancestryFlag")
            .agg(
                # Summarizing sample sizes for all ancestries:
                f.sum(f.col("numberOfIndividuals"))
            )
            # Do arithmetics to make sure we have the right proportion of european in the set:
            .withColumn(
                "initialSampleCountEuropean",
                f.when(f.col("european").isNull(), f.lit(0)).otherwise(
                    f.col("european")
                ),
            )
            .withColumn(
                "initialSampleCountOther",
                f.when(f.col("other").isNull(), f.lit(0)).otherwise(f.col("other")),
            )
            .withColumn(
                "initialSampleCount",
                f.col("initialSampleCountEuropean") + f.col("other"),
            )
            .drop(
                "european",
                "other",
                "initialSampleCount",
                "initialSampleCountEuropean",
                "initialSampleCountOther",
            )
        )

        parsed_ancestry_lut = ancestry_stages.join(
            europeans_deconvoluted, on="studyId", how="outer"
        )

        self.df = self.df.join(parsed_ancestry_lut, on="studyId", how="left")
        return self

    def _annotate_sumstats_info(
        self: StudyIndexGWASCatalog, sumstats_lut: DataFrame
    ) -> StudyIndexGWASCatalog:
        """Annotate summary stat locations.

        Args:
            sumstats_lut (DataFrame): listing GWAS Catalog summary stats paths

        Returns:
            StudyIndexGWASCatalog: including `summarystatsLocation` and `hasSumstats` columns
        """
        gwas_sumstats_base_uri = (
            "ftp://ftp.ebi.ac.uk/pub/databases/gwas/summary_statistics/"
        )

        parsed_sumstats_lut = sumstats_lut.withColumn(
            "summarystatsLocation",
            f.concat(
                f.lit(gwas_sumstats_base_uri),
                f.regexp_replace(f.col("_c0"), r"^\.\/", ""),
            ),
        ).select(
            f.regexp_extract(f.col("summarystatsLocation"), r"\/(GCST\d+)\/", 1).alias(
                "studyId"
            ),
            "summarystatsLocation",
            f.lit(True).alias("hasSumstats"),
        )

        self.df = (
            self.df.drop("hasSumstats")
            .join(parsed_sumstats_lut, on="studyId", how="left")
            .withColumn("hasSumstats", f.coalesce(f.col("hasSumstats"), f.lit(False)))
        )
        return self

    def _annotate_discovery_sample_sizes(
        self: StudyIndexGWASCatalog,
    ) -> StudyIndexGWASCatalog:
        """Extract the sample size of the discovery stage of the study as annotated in the GWAS Catalog.

        For some studies that measure quantitative traits, nCases and nControls can't be extracted. Therefore, we assume these are 0.

        Returns:
            StudyIndexGWASCatalog: object with columns `nCases`, `nControls`, and `nSamples` per `studyId` correctly extracted.
        """
        sample_size_lut = (
            self.df.select(
                "studyId",
                f.explode_outer(f.split(f.col("initialSampleSize"), r",\s+")).alias(
                    "samples"
                ),
            )
            # Extracting the sample size from the string:
            .withColumn(
                "sampleSize",
                f.regexp_extract(
                    f.regexp_replace(f.col("samples"), ",", ""), r"[0-9,]+", 0
                ).cast(t.IntegerType()),
            )
            .select(
                "studyId",
                "sampleSize",
                f.when(f.col("samples").contains("cases"), f.col("sampleSize"))
                .otherwise(f.lit(0))
                .alias("nCases"),
                f.when(f.col("samples").contains("controls"), f.col("sampleSize"))
                .otherwise(f.lit(0))
                .alias("nControls"),
            )
            # Aggregating sample sizes for all ancestries:
            .groupBy("studyId")  # studyId has not been split yet
            .agg(
                f.sum("nCases").alias("nCases"),
                f.sum("nControls").alias("nControls"),
                f.sum("sampleSize").alias("nSamples"),
            )
        )
        self.df = self.df.join(sample_size_lut, on="studyId", how="left")
        return self

from_source(catalog_studies, ancestry_file, sumstats_lut) classmethod

This function ingests study level metadata from the GWAS Catalog.

Parameters:

Name Type Description Default
catalog_studies DataFrame

GWAS Catalog raw study table

required
ancestry_file DataFrame

GWAS Catalog ancestry table.

required
sumstats_lut DataFrame

GWAS Catalog summary statistics list.

required

Returns:

Name Type Description
StudyIndexGWASCatalog StudyIndexGWASCatalog

Parsed and annotated GWAS Catalog study table.

Source code in src/otg/dataset/study_index.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
@classmethod
def from_source(
    cls: type[StudyIndexGWASCatalog],
    catalog_studies: DataFrame,
    ancestry_file: DataFrame,
    sumstats_lut: DataFrame,
) -> StudyIndexGWASCatalog:
    """This function ingests study level metadata from the GWAS Catalog.

    Args:
        catalog_studies (DataFrame): GWAS Catalog raw study table
        ancestry_file (DataFrame): GWAS Catalog ancestry table.
        sumstats_lut (DataFrame): GWAS Catalog summary statistics list.

    Returns:
        StudyIndexGWASCatalog: Parsed and annotated GWAS Catalog study table.
    """
    # Read GWAS Catalogue raw data
    return (
        cls._parse_study_table(catalog_studies)
        ._annotate_ancestries(ancestry_file)
        ._annotate_sumstats_info(sumstats_lut)
        ._annotate_discovery_sample_sizes()
    )

get_gnomad_population_structure()

Get the population structure (ancestry normalised to gnomAD and population sizes) for every study.

Returns:

Name Type Description
DataFrame DataFrame

containing studyId and populationsStructure, where each element of the array represents a population

Source code in src/otg/dataset/study_index.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def get_gnomad_population_structure(self: StudyIndexGWASCatalog) -> DataFrame:
    """Get the population structure (ancestry normalised to gnomAD and population sizes) for every study.

    Returns:
        DataFrame: containing `studyId` and `populationsStructure`, where each element of the array represents a population
    """
    # Study ancestries
    w_study = Window.partitionBy("studyId")
    return (
        self.df
        # Excluding studies where no sample discription is provided:
        .filter(f.col("discoverySamples").isNotNull())
        # Exploding sample description and study identifier:
        .withColumn("discoverySample", f.explode(f.col("discoverySamples")))
        # Splitting sample descriptions further:
        .withColumn(
            "ancestries",
            f.split(f.col("discoverySample.ancestry"), r",\s(?![^()]*\))"),
        )
        # Dividing sample sizes assuming even distribution
        .withColumn(
            "adjustedSampleSize",
            f.col("discoverySample.sampleSize") / f.size(f.col("ancestries")),
        )
        # mapped to gnomAD superpopulation and exploded
        .withColumn(
            "population",
            f.explode(
                StudyIndexGWASCatalog._gwas_ancestry_to_gnomad(f.col("ancestries"))
            ),
        )
        # Group by studies and aggregate for major population:
        .groupBy("studyId", "population")
        .agg(f.sum(f.col("adjustedSampleSize")).alias("sampleSize"))
        # Calculate proportions for each study
        .withColumn(
            "relativeSampleSize",
            f.col("sampleSize") / f.sum("sampleSize").over(w_study),
        )
        .withColumn(
            "populationStructure",
            f.struct("population", "relativeSampleSize"),
        )
        .groupBy("studyId")
        .agg(f.collect_set("populationStructure").alias("populationsStructure"))
    )

get_schema() classmethod

Provides the schema for the StudyIndexGWASCatalog dataset.

This method is a duplication from the parent class, but by definition, the use of abstract methods require that every child class implements them.

Returns:

Name Type Description
StructType StructType

Spark schema for the StudyIndexGWASCatalog dataset.

Source code in src/otg/dataset/study_index.py
82
83
84
85
86
87
88
89
90
91
@classmethod
def get_schema(cls: type[StudyIndexGWASCatalog]) -> StructType:
    """Provides the schema for the StudyIndexGWASCatalog dataset.

    This method is a duplication from the parent class, but by definition, the use of abstract methods require that every child class implements them.

    Returns:
        StructType: Spark schema for the StudyIndexGWASCatalog dataset.
    """
    return parse_spark_schema("studies.json")

update_study_id(study_annotation)

Update studyId with a dataframe containing study.

Parameters:

Name Type Description Default
study_annotation DataFrame

Dataframe containing updatedStudyId, traitFromSource, traitFromSourceMappedIds and key column studyId.

required

Returns:

Name Type Description
StudyIndexGWASCatalog StudyIndexGWASCatalog

Updated study table.

Source code in src/otg/dataset/study_index.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def update_study_id(
    self: StudyIndexGWASCatalog, study_annotation: DataFrame
) -> StudyIndexGWASCatalog:
    """Update studyId with a dataframe containing study.

    Args:
        study_annotation (DataFrame): Dataframe containing `updatedStudyId`, `traitFromSource`, `traitFromSourceMappedIds` and key column `studyId`.

    Returns:
        StudyIndexGWASCatalog: Updated study table.
    """
    self.df = (
        self._df.join(
            study_annotation.select(
                *[
                    f.col(c).alias(f"updated{c}")
                    if c not in ["studyId", "updatedStudyId"]
                    else f.col(c)
                    for c in study_annotation.columns
                ]
            ),
            on="studyId",
            how="left",
        )
        .withColumn(
            "studyId",
            f.coalesce(f.col("updatedStudyId"), f.col("studyId")),
        )
        .withColumn(
            "traitFromSource",
            f.coalesce(f.col("updatedtraitFromSource"), f.col("traitFromSource")),
        )
        .withColumn(
            "traitFromSourceMappedIds",
            f.coalesce(
                f.col("updatedtraitFromSourceMappedIds"),
                f.col("traitFromSourceMappedIds"),
            ),
        )
        .select(self._df.columns)
    )

    return self

Schema

root
 |-- studyId: string (nullable = false)
 |-- projectId: string (nullable = false)
 |-- studyType: string (nullable = false)
 |-- traitFromSource: string (nullable = false)
 |-- traitFromSourceMappedIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pubmedId: string (nullable = true)
 |-- publicationTitle: string (nullable = true)
 |-- publicationFirstAuthor: string (nullable = true)
 |-- publicationDate: string (nullable = true)
 |-- publicationJournal: string (nullable = true)
 |-- backgroundTraitFromSourceMappedIds: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- initialSampleSize: string (nullable = true)
 |-- nCases: long (nullable = true)
 |-- nControls: long (nullable = true)
 |-- nSamples: long (nullable = true)
 |-- discoverySamples: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- sampleSize: string (nullable = true)
 |    |    |-- ancestry: string (nullable = true)
 |-- replicationSamples: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- sampleSize: string (nullable = true)
 |    |    |-- ancestry: string (nullable = true)
 |-- summarystatsLocation: string (nullable = true)
 |-- hasSumstats: boolean (nullable = true)