Skip to content

Study Index

gentropy.datasource.ukb_ppp_eur.study_index.UkbPppEurStudyIndex dataclass

Bases: StudyIndex

Study index dataset from UKB PPP (EUR).

Source code in src/gentropy/datasource/ukb_ppp_eur/study_index.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class UkbPppEurStudyIndex(StudyIndex):
    """Study index dataset from UKB PPP (EUR)."""

    @classmethod
    def from_source(
        cls: type[UkbPppEurStudyIndex],
        spark: SparkSession,
        raw_study_index_path_from_tsv: str,
        raw_summary_stats_path: str,
    ) -> StudyIndex:
        """This function ingests study level metadata from UKB PPP (EUR).

        Args:
            spark (SparkSession): Spark session object.
            raw_study_index_path_from_tsv (str): Raw study index path.
            raw_summary_stats_path (str): Raw summary stats path.

        Returns:
            StudyIndex: Parsed and annotated UKB PPP (EUR) study table.
        """
        # In order to populate the nSamples column, we need to peek inside the summary stats dataframe.
        num_of_samples = (
            spark.read.parquet(raw_summary_stats_path)
            .filter(f.col("chromosome") == "22")
            .groupBy("studyId")
            .agg(f.first("N").cast("integer").alias("nSamples"))
            .select("*")
        )
        # Now we can read the raw study index and complete the processing.
        study_index_df = (
            spark.read.csv(raw_study_index_path_from_tsv, sep="\t", header=True)
            .select(
                f.lit("pqtl").alias("studyType"),
                f.lit("UKB_PPP_EUR").alias("projectId"),
                f.col("_gentropy_study_id").alias("studyId"),
                f.col("UKBPPP_ProteinID").alias("traitFromSource"),
                f.lit("UBERON_0001969").alias("biosampleFromSourceId"),
                f.col("ensembl_id").alias("geneId"),
                f.lit(True).alias("hasSumstats"),
                f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"),
            )
            .join(num_of_samples, "studyId", "inner")
        )
        # Add population structure.
        study_index_df = (
            study_index_df.withColumn(
                "discoverySamples",
                f.array(
                    f.struct(
                        f.col("nSamples").cast("integer").alias("sampleSize"),
                        f.lit("European").alias("ancestry"),
                    )
                ),
            )
            .withColumn(
                "ldPopulationStructure",
                cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
            )
            .withColumn("biosampleFromSourceId", f.lit("UBERON_0001969"))
        )

        return StudyIndex(
            _df=study_index_df,
            _schema=StudyIndex.get_schema(),
        )

from_source(spark: SparkSession, raw_study_index_path_from_tsv: str, raw_summary_stats_path: str) -> StudyIndex classmethod

This function ingests study level metadata from UKB PPP (EUR).

Parameters:

Name Type Description Default
spark SparkSession

Spark session object.

required
raw_study_index_path_from_tsv str

Raw study index path.

required
raw_summary_stats_path str

Raw summary stats path.

required

Returns:

Name Type Description
StudyIndex StudyIndex

Parsed and annotated UKB PPP (EUR) study table.

Source code in src/gentropy/datasource/ukb_ppp_eur/study_index.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@classmethod
def from_source(
    cls: type[UkbPppEurStudyIndex],
    spark: SparkSession,
    raw_study_index_path_from_tsv: str,
    raw_summary_stats_path: str,
) -> StudyIndex:
    """This function ingests study level metadata from UKB PPP (EUR).

    Args:
        spark (SparkSession): Spark session object.
        raw_study_index_path_from_tsv (str): Raw study index path.
        raw_summary_stats_path (str): Raw summary stats path.

    Returns:
        StudyIndex: Parsed and annotated UKB PPP (EUR) study table.
    """
    # In order to populate the nSamples column, we need to peek inside the summary stats dataframe.
    num_of_samples = (
        spark.read.parquet(raw_summary_stats_path)
        .filter(f.col("chromosome") == "22")
        .groupBy("studyId")
        .agg(f.first("N").cast("integer").alias("nSamples"))
        .select("*")
    )
    # Now we can read the raw study index and complete the processing.
    study_index_df = (
        spark.read.csv(raw_study_index_path_from_tsv, sep="\t", header=True)
        .select(
            f.lit("pqtl").alias("studyType"),
            f.lit("UKB_PPP_EUR").alias("projectId"),
            f.col("_gentropy_study_id").alias("studyId"),
            f.col("UKBPPP_ProteinID").alias("traitFromSource"),
            f.lit("UBERON_0001969").alias("biosampleFromSourceId"),
            f.col("ensembl_id").alias("geneId"),
            f.lit(True).alias("hasSumstats"),
            f.col("_gentropy_summary_stats_link").alias("summarystatsLocation"),
        )
        .join(num_of_samples, "studyId", "inner")
    )
    # Add population structure.
    study_index_df = (
        study_index_df.withColumn(
            "discoverySamples",
            f.array(
                f.struct(
                    f.col("nSamples").cast("integer").alias("sampleSize"),
                    f.lit("European").alias("ancestry"),
                )
            ),
        )
        .withColumn(
            "ldPopulationStructure",
            cls.aggregate_and_map_ancestries(f.col("discoverySamples")),
        )
        .withColumn("biosampleFromSourceId", f.lit("UBERON_0001969"))
    )

    return StudyIndex(
        _df=study_index_df,
        _schema=StudyIndex.get_schema(),
    )