Skip to content

Study Index

gentropy.datasource.finngen.study_index.FinnGenStudyIndex

Study index dataset from FinnGen.

The following information is aggregated/extracted:

  • Study ID in the special format (e.g. FINNGEN_R11_*)
  • Trait name (for example, Amoebiasis)
  • Number of cases and controls
  • Link to the summary statistics location
  • EFO mapping from curated EFO mapping file

Some fields are also populated as constants, such as study type and the initial sample size.

Source code in src/gentropy/datasource/finngen/study_index.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class FinnGenStudyIndex:
    """Study index dataset from FinnGen.

    The following information is aggregated/extracted:

    - Study ID in the special format (e.g. FINNGEN_R11_*)
    - Trait name (for example, Amoebiasis)
    - Number of cases and controls
    - Link to the summary statistics location
    - EFO mapping from curated EFO mapping file

    Some fields are also populated as constants, such as study type and the initial sample size.
    """

    @staticmethod
    def validate_release_prefix(release_prefix: str) -> FinngenPrefixMatch:
        """Validate release prefix passed to finngen StudyIndex.

        Args:
            release_prefix (str): Finngen release prefix, should be a string like FINNGEN_R*.

        Returns:
            FinngenPrefixMatch: Object containing valid prefix and release strings.

        Raises:
            ValueError: when incorrect release prefix is provided.

        This method ensures that the trailing underscore is removed from prefix.
        """
        pattern = re.compile(r"FINNGEN_(?P<release>R\d+){1}_?")
        pattern_match = pattern.match(release_prefix)
        if not pattern_match:
            raise ValueError(
                f"Invalid FinnGen release prefix: {release_prefix}, use the format FINNGEN_R*"
            )
        release = pattern_match.group("release").upper()
        if release_prefix.endswith("_"):
            release_prefix = release_prefix[:-1]
        return FinngenPrefixMatch(prefix=release_prefix, release=release)

    @staticmethod
    def read_efo_curation(session: SparkSession, url: str) -> DataFrame:
        """Read efo curation from provided url.

        Args:
            session (SparkSession): Session to use when reading the mapping file.
            url (str): Url to the mapping file. The file provided should be a tsv file.

        Returns:
            DataFrame: DataFrame with EFO mappings.

        Example of the file can be found in https://raw.githubusercontent.com/opentargets/curation/refs/heads/master/mappings/disease/manual_string.tsv.
        """
        csv_data = urlopen(url).readlines()
        csv_rows = [row.decode("utf8") for row in csv_data]
        rdd = session.sparkContext.parallelize(csv_rows)
        # NOTE: type annotations for spark.read.csv miss the fact that the first param can be [RDD[str]]
        efo_curation_mapping_df = session.read.csv(rdd, header=True, sep="\t")
        return efo_curation_mapping_df

    @staticmethod
    def join_efo_mapping(
        study_index: StudyIndex,
        efo_curation_mapping: DataFrame,
        finngen_release: str,
    ) -> StudyIndex:
        """Add EFO mapping to the Finngen study index table.

        This function performs inner join on table of EFO mappings to the study index table by trait name.
        All studies without EFO traits are dropped. The EFO mappings are then aggregated into lists per
        studyId.

        NOTE: preserve all studyId entries even if they don't have EFO mappings.
        This is to avoid discrepancies between `study_index` and `credible_set` `studyId` column.
        The rows with missing EFO mappings will be dropped in the study_index validation step.

        Args:
            study_index (StudyIndex): Study index table.
            efo_curation_mapping (DataFrame): Dataframe with EFO mappings.
            finngen_release (str): FinnGen release.

        Returns:
            StudyIndex: Study index table with added EFO mappings.
        """
        efo_mappings = (
            efo_curation_mapping.withColumn("STUDY", f.upper(f.col("STUDY")))
            .filter(f.col("STUDY").contains("FINNGEN"))
            .filter(f.upper(f.col("STUDY")).contains(finngen_release))
            .select(
                f.regexp_replace(f.col("SEMANTIC_TAG"), r"^.*/", "").alias(
                    "traitFromSourceMappedId"
                ),
                f.col("PROPERTY_VALUE").alias("traitFromSource"),
            )
        )

        si_df = study_index.df.join(
            efo_mappings, on="traitFromSource", how="left_outer"
        )
        common_cols = [c for c in si_df.columns if c != "traitFromSourceMappedId"]
        si_df = si_df.groupby(common_cols).agg(
            f.collect_list("traitFromSourceMappedId").alias("traitFromSourceMappedIds")
        )
        return StudyIndex(_df=si_df, _schema=StudyIndex.get_schema())

    @classmethod
    def from_source(
        cls: type[FinnGenStudyIndex],
        spark: SparkSession,
        finngen_phenotype_table_url: str,
        finngen_release_prefix: str,
        finngen_summary_stats_url_prefix: str,
        finngen_summary_stats_url_suffix: str,
        sample_size: int,
    ) -> StudyIndex:
        """This function ingests study level metadata from FinnGen.

        Args:
            spark (SparkSession): Spark session object.
            finngen_phenotype_table_url (str): URL to the FinnGen phenotype table.
            finngen_release_prefix (str): FinnGen release prefix.
            finngen_summary_stats_url_prefix (str): FinnGen summary stats URL prefix.
            finngen_summary_stats_url_suffix (str): FinnGen summary stats URL suffix.
            sample_size (int): Number of individuals participated in sample collection.

        Returns:
            StudyIndex: Parsed and annotated FinnGen study table.
        """
        json_data = urlopen(finngen_phenotype_table_url).read().decode("utf-8")
        rdd = spark.sparkContext.parallelize([json_data])
        raw_df = spark.read.json(rdd)

        return StudyIndex(
            _df=raw_df.select(
                f.concat(
                    f.concat_ws("_", f.lit(finngen_release_prefix), f.col("phenocode"))
                ).alias("studyId"),
                f.col("phenostring").alias("traitFromSource"),
                f.col("num_cases").cast("integer").alias("nCases"),
                f.col("num_controls").cast("integer").alias("nControls"),
                (f.col("num_cases") + f.col("num_controls"))
                .cast("integer")
                .alias("nSamples"),
                f.lit(finngen_release_prefix).alias("projectId"),
                f.lit("gwas").alias("studyType"),
                f.lit(True).alias("hasSumstats"),
                f.lit("453,733 (254,618 females and 199,115 males)").alias(
                    "initialSampleSize"
                ),
                f.array(
                    f.struct(
                        f.lit(sample_size).cast("integer").alias("sampleSize"),
                        f.lit("Finnish").alias("ancestry"),
                    )
                ).alias("discoverySamples"),
                # Cohort label is consistent with GWAS Catalog curation.
                f.array(f.lit("FinnGen")).alias("cohorts"),
                f.concat(
                    f.lit(finngen_summary_stats_url_prefix),
                    f.col("phenocode"),
                    f.lit(finngen_summary_stats_url_suffix),
                ).alias("summarystatsLocation"),
            ).withColumn(
                "ldPopulationStructure",
                StudyIndex.aggregate_and_map_ancestries(f.col("discoverySamples")),
            ),
            _schema=StudyIndex.get_schema(),
        )

from_source(spark: SparkSession, finngen_phenotype_table_url: str, finngen_release_prefix: str, finngen_summary_stats_url_prefix: str, finngen_summary_stats_url_suffix: str, sample_size: int) -> StudyIndex classmethod

This function ingests study level metadata from FinnGen.

Parameters:

Name Type Description Default
spark SparkSession

Spark session object.

required
finngen_phenotype_table_url str

URL to the FinnGen phenotype table.

required
finngen_release_prefix str

FinnGen release prefix.

required
finngen_summary_stats_url_prefix str

FinnGen summary stats URL prefix.

required
finngen_summary_stats_url_suffix str

FinnGen summary stats URL suffix.

required
sample_size int

Number of individuals participated in sample collection.

required

Returns:

Name Type Description
StudyIndex StudyIndex

Parsed and annotated FinnGen study table.

Source code in src/gentropy/datasource/finngen/study_index.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@classmethod
def from_source(
    cls: type[FinnGenStudyIndex],
    spark: SparkSession,
    finngen_phenotype_table_url: str,
    finngen_release_prefix: str,
    finngen_summary_stats_url_prefix: str,
    finngen_summary_stats_url_suffix: str,
    sample_size: int,
) -> StudyIndex:
    """This function ingests study level metadata from FinnGen.

    Args:
        spark (SparkSession): Spark session object.
        finngen_phenotype_table_url (str): URL to the FinnGen phenotype table.
        finngen_release_prefix (str): FinnGen release prefix.
        finngen_summary_stats_url_prefix (str): FinnGen summary stats URL prefix.
        finngen_summary_stats_url_suffix (str): FinnGen summary stats URL suffix.
        sample_size (int): Number of individuals participated in sample collection.

    Returns:
        StudyIndex: Parsed and annotated FinnGen study table.
    """
    json_data = urlopen(finngen_phenotype_table_url).read().decode("utf-8")
    rdd = spark.sparkContext.parallelize([json_data])
    raw_df = spark.read.json(rdd)

    return StudyIndex(
        _df=raw_df.select(
            f.concat(
                f.concat_ws("_", f.lit(finngen_release_prefix), f.col("phenocode"))
            ).alias("studyId"),
            f.col("phenostring").alias("traitFromSource"),
            f.col("num_cases").cast("integer").alias("nCases"),
            f.col("num_controls").cast("integer").alias("nControls"),
            (f.col("num_cases") + f.col("num_controls"))
            .cast("integer")
            .alias("nSamples"),
            f.lit(finngen_release_prefix).alias("projectId"),
            f.lit("gwas").alias("studyType"),
            f.lit(True).alias("hasSumstats"),
            f.lit("453,733 (254,618 females and 199,115 males)").alias(
                "initialSampleSize"
            ),
            f.array(
                f.struct(
                    f.lit(sample_size).cast("integer").alias("sampleSize"),
                    f.lit("Finnish").alias("ancestry"),
                )
            ).alias("discoverySamples"),
            # Cohort label is consistent with GWAS Catalog curation.
            f.array(f.lit("FinnGen")).alias("cohorts"),
            f.concat(
                f.lit(finngen_summary_stats_url_prefix),
                f.col("phenocode"),
                f.lit(finngen_summary_stats_url_suffix),
            ).alias("summarystatsLocation"),
        ).withColumn(
            "ldPopulationStructure",
            StudyIndex.aggregate_and_map_ancestries(f.col("discoverySamples")),
        ),
        _schema=StudyIndex.get_schema(),
    )

join_efo_mapping(study_index: StudyIndex, efo_curation_mapping: DataFrame, finngen_release: str) -> StudyIndex staticmethod

Add EFO mapping to the Finngen study index table.

This function performs inner join on table of EFO mappings to the study index table by trait name. All studies without EFO traits are dropped. The EFO mappings are then aggregated into lists per studyId.

NOTE: preserve all studyId entries even if they don't have EFO mappings. This is to avoid discrepancies between study_index and credible_set studyId column. The rows with missing EFO mappings will be dropped in the study_index validation step.

Parameters:

Name Type Description Default
study_index StudyIndex

Study index table.

required
efo_curation_mapping DataFrame

Dataframe with EFO mappings.

required
finngen_release str

FinnGen release.

required

Returns:

Name Type Description
StudyIndex StudyIndex

Study index table with added EFO mappings.

Source code in src/gentropy/datasource/finngen/study_index.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@staticmethod
def join_efo_mapping(
    study_index: StudyIndex,
    efo_curation_mapping: DataFrame,
    finngen_release: str,
) -> StudyIndex:
    """Add EFO mapping to the Finngen study index table.

    This function performs inner join on table of EFO mappings to the study index table by trait name.
    All studies without EFO traits are dropped. The EFO mappings are then aggregated into lists per
    studyId.

    NOTE: preserve all studyId entries even if they don't have EFO mappings.
    This is to avoid discrepancies between `study_index` and `credible_set` `studyId` column.
    The rows with missing EFO mappings will be dropped in the study_index validation step.

    Args:
        study_index (StudyIndex): Study index table.
        efo_curation_mapping (DataFrame): Dataframe with EFO mappings.
        finngen_release (str): FinnGen release.

    Returns:
        StudyIndex: Study index table with added EFO mappings.
    """
    efo_mappings = (
        efo_curation_mapping.withColumn("STUDY", f.upper(f.col("STUDY")))
        .filter(f.col("STUDY").contains("FINNGEN"))
        .filter(f.upper(f.col("STUDY")).contains(finngen_release))
        .select(
            f.regexp_replace(f.col("SEMANTIC_TAG"), r"^.*/", "").alias(
                "traitFromSourceMappedId"
            ),
            f.col("PROPERTY_VALUE").alias("traitFromSource"),
        )
    )

    si_df = study_index.df.join(
        efo_mappings, on="traitFromSource", how="left_outer"
    )
    common_cols = [c for c in si_df.columns if c != "traitFromSourceMappedId"]
    si_df = si_df.groupby(common_cols).agg(
        f.collect_list("traitFromSourceMappedId").alias("traitFromSourceMappedIds")
    )
    return StudyIndex(_df=si_df, _schema=StudyIndex.get_schema())

read_efo_curation(session: SparkSession, url: str) -> DataFrame staticmethod

Read efo curation from provided url.

Parameters:

Name Type Description Default
session SparkSession

Session to use when reading the mapping file.

required
url str

Url to the mapping file. The file provided should be a tsv file.

required

Returns:

Name Type Description
DataFrame DataFrame

DataFrame with EFO mappings.

Example of the file can be found in https://raw.githubusercontent.com/opentargets/curation/refs/heads/master/mappings/disease/manual_string.tsv.

Source code in src/gentropy/datasource/finngen/study_index.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@staticmethod
def read_efo_curation(session: SparkSession, url: str) -> DataFrame:
    """Read efo curation from provided url.

    Args:
        session (SparkSession): Session to use when reading the mapping file.
        url (str): Url to the mapping file. The file provided should be a tsv file.

    Returns:
        DataFrame: DataFrame with EFO mappings.

    Example of the file can be found in https://raw.githubusercontent.com/opentargets/curation/refs/heads/master/mappings/disease/manual_string.tsv.
    """
    csv_data = urlopen(url).readlines()
    csv_rows = [row.decode("utf8") for row in csv_data]
    rdd = session.sparkContext.parallelize(csv_rows)
    # NOTE: type annotations for spark.read.csv miss the fact that the first param can be [RDD[str]]
    efo_curation_mapping_df = session.read.csv(rdd, header=True, sep="\t")
    return efo_curation_mapping_df

validate_release_prefix(release_prefix: str) -> FinngenPrefixMatch staticmethod

Validate release prefix passed to finngen StudyIndex.

Parameters:

Name Type Description Default
release_prefix str

Finngen release prefix, should be a string like FINNGEN_R*.

required

Returns:

Name Type Description
FinngenPrefixMatch FinngenPrefixMatch

Object containing valid prefix and release strings.

Raises:

Type Description
ValueError

when incorrect release prefix is provided.

This method ensures that the trailing underscore is removed from prefix.

Source code in src/gentropy/datasource/finngen/study_index.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@staticmethod
def validate_release_prefix(release_prefix: str) -> FinngenPrefixMatch:
    """Validate release prefix passed to finngen StudyIndex.

    Args:
        release_prefix (str): Finngen release prefix, should be a string like FINNGEN_R*.

    Returns:
        FinngenPrefixMatch: Object containing valid prefix and release strings.

    Raises:
        ValueError: when incorrect release prefix is provided.

    This method ensures that the trailing underscore is removed from prefix.
    """
    pattern = re.compile(r"FINNGEN_(?P<release>R\d+){1}_?")
    pattern_match = pattern.match(release_prefix)
    if not pattern_match:
        raise ValueError(
            f"Invalid FinnGen release prefix: {release_prefix}, use the format FINNGEN_R*"
        )
    release = pattern_match.group("release").upper()
    if release_prefix.endswith("_"):
        release_prefix = release_prefix[:-1]
    return FinngenPrefixMatch(prefix=release_prefix, release=release)