Skip to content

gwas_catalog_study_inclusion

gentropy.gwas_catalog_study_inclusion.GWASCatalogStudyInclusionGenerator

GWAS Catalog study eligibility for ingestion based on curation and the provided criteria.

Source code in src/gentropy/gwas_catalog_study_inclusion.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class GWASCatalogStudyInclusionGenerator:
    """GWAS Catalog study eligibility for ingestion based on curation and the provided criteria."""

    @staticmethod
    def flag_eligible_studies(
        study_index: StudyIndexGWASCatalog, criteria: str
    ) -> DataFrame:
        """Apply filter on GWAS Catalog studies based on the provided criteria.

        Args:
            study_index (StudyIndexGWASCatalog): complete study index to be filtered based on the provided filter set
            criteria (str): name of the filter set to be applied.

        Raises:
            ValueError: if the provided filter set is not in the accepted values.

        Returns:
            DataFrame: filtered dataframe containing only eligible studies.
        """
        filters: dict[str, Column] = {
            # Filters applied on studies for ingesting curated associations:
            "curation": (study_index.is_gwas() & study_index.has_mapped_trait()),
            # Filters applied on studies for ingesting summary statistics:
            "summary_stats": (
                study_index.is_gwas()
                & study_index.has_mapped_trait()
                & (~study_index.is_quality_flagged())
                & study_index.has_summarystats()
            ),
        }

        if criteria not in filters:
            raise ValueError(
                f'Wrong value as filter set ({criteria}). Accepted: {",".join(filters.keys())}'
            )

        # Applying the relevant filter to the study:
        return study_index.df.select(
            "studyId",
            "studyType",
            "traitFromSource",
            "traitFromSourceMappedIds",
            "qualityControls",
            "hasSumstats",
            filters[criteria].alias("isEligible"),
        )

    @staticmethod
    def process_harmonised_list(studies: list[str], session: Session) -> DataFrame:
        """Generate spark dataframe from the provided list.

        Args:
            studies (list[str]): list of path pointing to harmonised summary statistics.
            session (Session): session

        Returns:
            DataFrame: column name is consistent with original implementatin
        """
        return session.spark.createDataFrame([{"_c0": path} for path in studies])

    @staticmethod
    def get_gwas_catalog_study_index(
        session: Session,
        variant_annotation_path: str,
        catalog_study_files: list[str],
        catalog_ancestry_files: list[str],
        harmonised_study_file: str,
        catalog_associations_file: str,
        gwas_catalog_study_curation_file: str,
    ) -> StudyIndexGWASCatalog:
        """Return GWAS Catalog study index.

        Args:
            session (Session): Session object.
            variant_annotation_path (str): Input variant annotation path.
            catalog_study_files (list[str]): List of raw GWAS catalog studies file.
            catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
            harmonised_study_file (str): GWAS Catalog summary statistics lookup table.
            catalog_associations_file (str): Raw GWAS catalog associations file.
            gwas_catalog_study_curation_file (str): file of the curation table. Optional.

        Returns:
            StudyIndexGWASCatalog: Completely processed and fully annotated study index.
        """
        # Extract
        va = VariantAnnotation.from_parquet(session, variant_annotation_path)
        catalog_studies = session.spark.read.csv(
            list(catalog_study_files), sep="\t", header=True
        )
        ancestry_lut = session.spark.read.csv(
            list(catalog_ancestry_files), sep="\t", header=True
        )
        sumstats_lut = session.spark.read.csv(
            harmonised_study_file, sep="\t", header=False
        )
        catalog_associations = session.spark.read.csv(
            catalog_associations_file, sep="\t", header=True
        ).persist()
        gwas_catalog_study_curation = read_curation_table(
            gwas_catalog_study_curation_file, session
        )

        # Transform
        study_index, _ = GWASCatalogStudySplitter.split(
            StudyIndexGWASCatalogParser.from_source(
                catalog_studies,
                ancestry_lut,
                sumstats_lut,
            ).annotate_from_study_curation(gwas_catalog_study_curation),
            GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va),
        )

        return study_index

    def __init__(
        self,
        session: Session,
        catalog_study_files: list[str],
        catalog_ancestry_files: list[str],
        catalog_associations_file: str,
        gwas_catalog_study_curation_file: str,
        variant_annotation_path: str,
        harmonised_study_file: str,
        criteria: str,
        inclusion_list_path: str,
        exclusion_list_path: str,
    ) -> None:
        """Run step.

        Args:
            session (Session): Session object.
            catalog_study_files (list[str]): List of raw GWAS catalog studies file.
            catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
            catalog_associations_file (str): Raw GWAS catalog associations file.
            gwas_catalog_study_curation_file (str): file of the curation table. Optional.
            variant_annotation_path (str): Input variant annotation path.
            harmonised_study_file (str): GWAS Catalog summary statistics lookup table.
            criteria (str): name of the filter set to be applied.
            inclusion_list_path (str): Output path for the inclusion list.
            exclusion_list_path (str): Output path for the exclusion list.
        """
        # Create study index:
        study_index = self.get_gwas_catalog_study_index(
            session,
            variant_annotation_path,
            catalog_study_files,
            catalog_ancestry_files,
            harmonised_study_file,
            catalog_associations_file,
            gwas_catalog_study_curation_file,
        )

        # Get study indices for inclusion:
        flagged_studies = self.flag_eligible_studies(study_index, criteria)

        # Output inclusion list:
        eligible = (
            flagged_studies.filter(f.col("isEligible")).select("studyId").persist()
        )
        eligible.write.mode(session.write_mode).parquet(inclusion_list_path)

        # Output exclusion list:
        excluded = flagged_studies.filter(~f.col("isEligible")).persist()
        excluded.write.mode(session.write_mode).parquet(exclusion_list_path)

__init__(session: Session, catalog_study_files: list[str], catalog_ancestry_files: list[str], catalog_associations_file: str, gwas_catalog_study_curation_file: str, variant_annotation_path: str, harmonised_study_file: str, criteria: str, inclusion_list_path: str, exclusion_list_path: str) -> None

Run step.

Parameters:

Name Type Description Default
session Session

Session object.

required
catalog_study_files list[str]

List of raw GWAS catalog studies file.

required
catalog_ancestry_files list[str]

List of raw ancestry annotations files from GWAS Catalog.

required
catalog_associations_file str

Raw GWAS catalog associations file.

required
gwas_catalog_study_curation_file str

file of the curation table. Optional.

required
variant_annotation_path str

Input variant annotation path.

required
harmonised_study_file str

GWAS Catalog summary statistics lookup table.

required
criteria str

name of the filter set to be applied.

required
inclusion_list_path str

Output path for the inclusion list.

required
exclusion_list_path str

Output path for the exclusion list.

required
Source code in src/gentropy/gwas_catalog_study_inclusion.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def __init__(
    self,
    session: Session,
    catalog_study_files: list[str],
    catalog_ancestry_files: list[str],
    catalog_associations_file: str,
    gwas_catalog_study_curation_file: str,
    variant_annotation_path: str,
    harmonised_study_file: str,
    criteria: str,
    inclusion_list_path: str,
    exclusion_list_path: str,
) -> None:
    """Run step.

    Args:
        session (Session): Session object.
        catalog_study_files (list[str]): List of raw GWAS catalog studies file.
        catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
        catalog_associations_file (str): Raw GWAS catalog associations file.
        gwas_catalog_study_curation_file (str): file of the curation table. Optional.
        variant_annotation_path (str): Input variant annotation path.
        harmonised_study_file (str): GWAS Catalog summary statistics lookup table.
        criteria (str): name of the filter set to be applied.
        inclusion_list_path (str): Output path for the inclusion list.
        exclusion_list_path (str): Output path for the exclusion list.
    """
    # Create study index:
    study_index = self.get_gwas_catalog_study_index(
        session,
        variant_annotation_path,
        catalog_study_files,
        catalog_ancestry_files,
        harmonised_study_file,
        catalog_associations_file,
        gwas_catalog_study_curation_file,
    )

    # Get study indices for inclusion:
    flagged_studies = self.flag_eligible_studies(study_index, criteria)

    # Output inclusion list:
    eligible = (
        flagged_studies.filter(f.col("isEligible")).select("studyId").persist()
    )
    eligible.write.mode(session.write_mode).parquet(inclusion_list_path)

    # Output exclusion list:
    excluded = flagged_studies.filter(~f.col("isEligible")).persist()
    excluded.write.mode(session.write_mode).parquet(exclusion_list_path)

flag_eligible_studies(study_index: StudyIndexGWASCatalog, criteria: str) -> DataFrame staticmethod

Apply filter on GWAS Catalog studies based on the provided criteria.

Parameters:

Name Type Description Default
study_index StudyIndexGWASCatalog

complete study index to be filtered based on the provided filter set

required
criteria str

name of the filter set to be applied.

required

Raises:

Type Description
ValueError

if the provided filter set is not in the accepted values.

Returns:

Name Type Description
DataFrame DataFrame

filtered dataframe containing only eligible studies.

Source code in src/gentropy/gwas_catalog_study_inclusion.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@staticmethod
def flag_eligible_studies(
    study_index: StudyIndexGWASCatalog, criteria: str
) -> DataFrame:
    """Apply filter on GWAS Catalog studies based on the provided criteria.

    Args:
        study_index (StudyIndexGWASCatalog): complete study index to be filtered based on the provided filter set
        criteria (str): name of the filter set to be applied.

    Raises:
        ValueError: if the provided filter set is not in the accepted values.

    Returns:
        DataFrame: filtered dataframe containing only eligible studies.
    """
    filters: dict[str, Column] = {
        # Filters applied on studies for ingesting curated associations:
        "curation": (study_index.is_gwas() & study_index.has_mapped_trait()),
        # Filters applied on studies for ingesting summary statistics:
        "summary_stats": (
            study_index.is_gwas()
            & study_index.has_mapped_trait()
            & (~study_index.is_quality_flagged())
            & study_index.has_summarystats()
        ),
    }

    if criteria not in filters:
        raise ValueError(
            f'Wrong value as filter set ({criteria}). Accepted: {",".join(filters.keys())}'
        )

    # Applying the relevant filter to the study:
    return study_index.df.select(
        "studyId",
        "studyType",
        "traitFromSource",
        "traitFromSourceMappedIds",
        "qualityControls",
        "hasSumstats",
        filters[criteria].alias("isEligible"),
    )

get_gwas_catalog_study_index(session: Session, variant_annotation_path: str, catalog_study_files: list[str], catalog_ancestry_files: list[str], harmonised_study_file: str, catalog_associations_file: str, gwas_catalog_study_curation_file: str) -> StudyIndexGWASCatalog staticmethod

Return GWAS Catalog study index.

Parameters:

Name Type Description Default
session Session

Session object.

required
variant_annotation_path str

Input variant annotation path.

required
catalog_study_files list[str]

List of raw GWAS catalog studies file.

required
catalog_ancestry_files list[str]

List of raw ancestry annotations files from GWAS Catalog.

required
harmonised_study_file str

GWAS Catalog summary statistics lookup table.

required
catalog_associations_file str

Raw GWAS catalog associations file.

required
gwas_catalog_study_curation_file str

file of the curation table. Optional.

required

Returns:

Name Type Description
StudyIndexGWASCatalog StudyIndexGWASCatalog

Completely processed and fully annotated study index.

Source code in src/gentropy/gwas_catalog_study_inclusion.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@staticmethod
def get_gwas_catalog_study_index(
    session: Session,
    variant_annotation_path: str,
    catalog_study_files: list[str],
    catalog_ancestry_files: list[str],
    harmonised_study_file: str,
    catalog_associations_file: str,
    gwas_catalog_study_curation_file: str,
) -> StudyIndexGWASCatalog:
    """Return GWAS Catalog study index.

    Args:
        session (Session): Session object.
        variant_annotation_path (str): Input variant annotation path.
        catalog_study_files (list[str]): List of raw GWAS catalog studies file.
        catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
        harmonised_study_file (str): GWAS Catalog summary statistics lookup table.
        catalog_associations_file (str): Raw GWAS catalog associations file.
        gwas_catalog_study_curation_file (str): file of the curation table. Optional.

    Returns:
        StudyIndexGWASCatalog: Completely processed and fully annotated study index.
    """
    # Extract
    va = VariantAnnotation.from_parquet(session, variant_annotation_path)
    catalog_studies = session.spark.read.csv(
        list(catalog_study_files), sep="\t", header=True
    )
    ancestry_lut = session.spark.read.csv(
        list(catalog_ancestry_files), sep="\t", header=True
    )
    sumstats_lut = session.spark.read.csv(
        harmonised_study_file, sep="\t", header=False
    )
    catalog_associations = session.spark.read.csv(
        catalog_associations_file, sep="\t", header=True
    ).persist()
    gwas_catalog_study_curation = read_curation_table(
        gwas_catalog_study_curation_file, session
    )

    # Transform
    study_index, _ = GWASCatalogStudySplitter.split(
        StudyIndexGWASCatalogParser.from_source(
            catalog_studies,
            ancestry_lut,
            sumstats_lut,
        ).annotate_from_study_curation(gwas_catalog_study_curation),
        GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va),
    )

    return study_index

process_harmonised_list(studies: list[str], session: Session) -> DataFrame staticmethod

Generate spark dataframe from the provided list.

Parameters:

Name Type Description Default
studies list[str]

list of path pointing to harmonised summary statistics.

required
session Session

session

required

Returns:

Name Type Description
DataFrame DataFrame

column name is consistent with original implementatin

Source code in src/gentropy/gwas_catalog_study_inclusion.py
71
72
73
74
75
76
77
78
79
80
81
82
@staticmethod
def process_harmonised_list(studies: list[str], session: Session) -> DataFrame:
    """Generate spark dataframe from the provided list.

    Args:
        studies (list[str]): list of path pointing to harmonised summary statistics.
        session (Session): session

    Returns:
        DataFrame: column name is consistent with original implementatin
    """
    return session.spark.createDataFrame([{"_c0": path} for path in studies])