Skip to content

GWAS Catalog Top Hits Ingestion Step

gentropy.gwas_catalog_top_hits.GWASCatalogTopHitIngestionStep

GWAS Catalog ingestion step to extract GWASCatalog top hits.

Source code in src/gentropy/gwas_catalog_top_hits.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class GWASCatalogTopHitIngestionStep:
    """GWAS Catalog ingestion step to extract GWASCatalog top hits."""

    def __init__(
        self,
        session: Session,
        catalog_study_files: list[str],
        catalog_ancestry_files: list[str],
        catalog_associations_file: str,
        variant_annotation_path: str,
        catalog_studies_out: str,
        catalog_associations_out: str,
        distance: int = WindowBasedClumpingStepConfig().distance,
    ) -> None:
        """Run step.

        Args:
            session (Session): Session object.
            catalog_study_files (list[str]): List of raw GWAS catalog studies file.
            catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
            catalog_associations_file (str): Raw GWAS catalog associations file.
            variant_annotation_path (str): Path to GnomAD variants.
            catalog_studies_out (str): Output GWAS catalog studies path.
            catalog_associations_out (str): Output GWAS catalog associations path.
            distance (int): Distance, within which tagging variants are collected around the semi-index.
        """
        # Extract
        gnomad_variants = VariantIndex.from_parquet(session, variant_annotation_path)
        catalog_studies = session.spark.read.csv(
            list(catalog_study_files), sep="\t", header=True
        )
        ancestry_lut = session.spark.read.csv(
            list(catalog_ancestry_files), sep="\t", header=True
        )
        catalog_associations = session.spark.read.csv(
            catalog_associations_file, sep="\t", header=True
        ).persist()

        # Transform
        study_index, study_locus = GWASCatalogStudySplitter.split(
            StudyIndexGWASCatalogParser.from_source(catalog_studies, ancestry_lut),
            GWASCatalogCuratedAssociationsParser.from_source(
                catalog_associations, gnomad_variants
            ),
        )
        # Load
        (
            study_index
            # Flag all studies without sumstats
            .add_no_sumstats_flag()
            # Save dataset:
            .df.write.mode(session.write_mode)
            .parquet(catalog_studies_out)
        )

        (
            study_locus.window_based_clumping(distance)
            .df.write.mode(session.write_mode)
            .parquet(catalog_associations_out)
        )

__init__(session: Session, catalog_study_files: list[str], catalog_ancestry_files: list[str], catalog_associations_file: str, variant_annotation_path: str, catalog_studies_out: str, catalog_associations_out: str, distance: int = WindowBasedClumpingStepConfig().distance) -> None

Run step.

Parameters:

Name Type Description Default
session Session

Session object.

required
catalog_study_files list[str]

List of raw GWAS catalog studies file.

required
catalog_ancestry_files list[str]

List of raw ancestry annotations files from GWAS Catalog.

required
catalog_associations_file str

Raw GWAS catalog associations file.

required
variant_annotation_path str

Path to GnomAD variants.

required
catalog_studies_out str

Output GWAS catalog studies path.

required
catalog_associations_out str

Output GWAS catalog associations path.

required
distance int

Distance, within which tagging variants are collected around the semi-index.

distance
Source code in src/gentropy/gwas_catalog_top_hits.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self,
    session: Session,
    catalog_study_files: list[str],
    catalog_ancestry_files: list[str],
    catalog_associations_file: str,
    variant_annotation_path: str,
    catalog_studies_out: str,
    catalog_associations_out: str,
    distance: int = WindowBasedClumpingStepConfig().distance,
) -> None:
    """Run step.

    Args:
        session (Session): Session object.
        catalog_study_files (list[str]): List of raw GWAS catalog studies file.
        catalog_ancestry_files (list[str]): List of raw ancestry annotations files from GWAS Catalog.
        catalog_associations_file (str): Raw GWAS catalog associations file.
        variant_annotation_path (str): Path to GnomAD variants.
        catalog_studies_out (str): Output GWAS catalog studies path.
        catalog_associations_out (str): Output GWAS catalog associations path.
        distance (int): Distance, within which tagging variants are collected around the semi-index.
    """
    # Extract
    gnomad_variants = VariantIndex.from_parquet(session, variant_annotation_path)
    catalog_studies = session.spark.read.csv(
        list(catalog_study_files), sep="\t", header=True
    )
    ancestry_lut = session.spark.read.csv(
        list(catalog_ancestry_files), sep="\t", header=True
    )
    catalog_associations = session.spark.read.csv(
        catalog_associations_file, sep="\t", header=True
    ).persist()

    # Transform
    study_index, study_locus = GWASCatalogStudySplitter.split(
        StudyIndexGWASCatalogParser.from_source(catalog_studies, ancestry_lut),
        GWASCatalogCuratedAssociationsParser.from_source(
            catalog_associations, gnomad_variants
        ),
    )
    # Load
    (
        study_index
        # Flag all studies without sumstats
        .add_no_sumstats_flag()
        # Save dataset:
        .df.write.mode(session.write_mode)
        .parquet(catalog_studies_out)
    )

    (
        study_locus.window_based_clumping(distance)
        .df.write.mode(session.write_mode)
        .parquet(catalog_associations_out)
    )