Skip to content

gwas_catalog_ingestion

gentropy.gwas_catalog_ingestion.GWASCatalogIngestionStep

GWAS Catalog ingestion step to extract GWASCatalog Study and StudyLocus tables.

!!! note This step currently only processes the GWAS Catalog curated list of top hits.

Source code in src/gentropy/gwas_catalog_ingestion.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
class GWASCatalogIngestionStep:
    """GWAS Catalog ingestion step to extract GWASCatalog Study and StudyLocus tables.

    !!! note This step currently only processes the GWAS Catalog curated list of top hits.
    """

    def __init__(
        self,
        session: Session,
        catalog_study_files: list[str],
        catalog_ancestry_files: list[str],
        catalog_sumstats_lut: str,
        catalog_associations_file: str,
        variant_annotation_path: str,
        catalog_studies_out: str,
        catalog_associations_out: str,
        gwas_catalog_study_curation_file: str | None = None,
        inclusion_list_path: str | None = None,
    ) -> None:
        """Run step.

        Args:
            session (Session): Session object.
            catalog_study_files (list[str]): List of raw GWAS catalog studies file.
            catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
            catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table.
            catalog_associations_file (str): Raw GWAS catalog associations file.
            variant_annotation_path (str): Input variant annotation path.
            catalog_studies_out (str): Output GWAS catalog studies path.
            catalog_associations_out (str): Output GWAS catalog associations path.
            gwas_catalog_study_curation_file (str | None): file of the curation table. Optional.
            inclusion_list_path (str | None): optional inclusion list (parquet)
        """
        # Extract
        va = VariantAnnotation.from_parquet(session, variant_annotation_path)
        catalog_studies = session.spark.read.csv(
            list(catalog_study_files), sep="\t", header=True
        )
        ancestry_lut = session.spark.read.csv(
            list(catalog_ancestry_files), sep="\t", header=True
        )
        sumstats_lut = session.spark.read.csv(
            catalog_sumstats_lut, sep="\t", header=False
        )
        catalog_associations = session.spark.read.csv(
            catalog_associations_file, sep="\t", header=True
        ).persist()
        gwas_catalog_study_curation = read_curation_table(
            gwas_catalog_study_curation_file, session
        )

        # Transform
        study_index, study_locus = GWASCatalogStudySplitter.split(
            StudyIndexGWASCatalogParser.from_source(
                catalog_studies, ancestry_lut, sumstats_lut
            ).annotate_from_study_curation(gwas_catalog_study_curation),
            GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va),
        )

        # if inclusion list is provided apply filter:
        if inclusion_list_path:
            inclusion_list = session.spark.read.parquet(
                inclusion_list_path, sep="\t", header=True
            )

            study_index = study_index.apply_inclusion_list(inclusion_list)
            study_locus = study_locus.apply_inclusion_list(inclusion_list)

        # Load
        study_index.df.write.mode(session.write_mode).parquet(catalog_studies_out)
        study_locus.df.write.mode(session.write_mode).parquet(catalog_associations_out)

__init__(session: Session, catalog_study_files: list[str], catalog_ancestry_files: list[str], catalog_sumstats_lut: str, catalog_associations_file: str, variant_annotation_path: str, catalog_studies_out: str, catalog_associations_out: str, gwas_catalog_study_curation_file: str | None = None, inclusion_list_path: str | None = None) -> None

Run step.

Parameters:

Name Type Description Default
session Session

Session object.

required
catalog_study_files list[str]

List of raw GWAS catalog studies file.

required
catalog_ancestry_files list[str]

List of raw ancestry annotations files from GWAS Catalog.

required
catalog_sumstats_lut str

GWAS Catalog summary statistics lookup table.

required
catalog_associations_file str

Raw GWAS catalog associations file.

required
variant_annotation_path str

Input variant annotation path.

required
catalog_studies_out str

Output GWAS catalog studies path.

required
catalog_associations_out str

Output GWAS catalog associations path.

required
gwas_catalog_study_curation_file str | None

file of the curation table. Optional.

None
inclusion_list_path str | None

optional inclusion list (parquet)

None
Source code in src/gentropy/gwas_catalog_ingestion.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def __init__(
    self,
    session: Session,
    catalog_study_files: list[str],
    catalog_ancestry_files: list[str],
    catalog_sumstats_lut: str,
    catalog_associations_file: str,
    variant_annotation_path: str,
    catalog_studies_out: str,
    catalog_associations_out: str,
    gwas_catalog_study_curation_file: str | None = None,
    inclusion_list_path: str | None = None,
) -> None:
    """Run step.

    Args:
        session (Session): Session object.
        catalog_study_files (list[str]): List of raw GWAS catalog studies file.
        catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
        catalog_sumstats_lut (str): GWAS Catalog summary statistics lookup table.
        catalog_associations_file (str): Raw GWAS catalog associations file.
        variant_annotation_path (str): Input variant annotation path.
        catalog_studies_out (str): Output GWAS catalog studies path.
        catalog_associations_out (str): Output GWAS catalog associations path.
        gwas_catalog_study_curation_file (str | None): file of the curation table. Optional.
        inclusion_list_path (str | None): optional inclusion list (parquet)
    """
    # Extract
    va = VariantAnnotation.from_parquet(session, variant_annotation_path)
    catalog_studies = session.spark.read.csv(
        list(catalog_study_files), sep="\t", header=True
    )
    ancestry_lut = session.spark.read.csv(
        list(catalog_ancestry_files), sep="\t", header=True
    )
    sumstats_lut = session.spark.read.csv(
        catalog_sumstats_lut, sep="\t", header=False
    )
    catalog_associations = session.spark.read.csv(
        catalog_associations_file, sep="\t", header=True
    ).persist()
    gwas_catalog_study_curation = read_curation_table(
        gwas_catalog_study_curation_file, session
    )

    # Transform
    study_index, study_locus = GWASCatalogStudySplitter.split(
        StudyIndexGWASCatalogParser.from_source(
            catalog_studies, ancestry_lut, sumstats_lut
        ).annotate_from_study_curation(gwas_catalog_study_curation),
        GWASCatalogCuratedAssociationsParser.from_source(catalog_associations, va),
    )

    # if inclusion list is provided apply filter:
    if inclusion_list_path:
        inclusion_list = session.spark.read.parquet(
            inclusion_list_path, sep="\t", header=True
        )

        study_index = study_index.apply_inclusion_list(inclusion_list)
        study_locus = study_locus.apply_inclusion_list(inclusion_list)

    # Load
    study_index.df.write.mode(session.write_mode).parquet(catalog_studies_out)
    study_locus.df.write.mode(session.write_mode).parquet(catalog_associations_out)