Skip to content

Summary Statistics

gentropy.datasource.ukb_ppp_eur.summary_stats.UkbPppEurSummaryStats dataclass

Summary statistics dataset for UKB PPP (EUR).

Source code in src/gentropy/datasource/ukb_ppp_eur/summary_stats.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@dataclass
class UkbPppEurSummaryStats:
    """Summary statistics dataset for UKB PPP (EUR)."""

    @classmethod
    def from_source(
        cls: type[UkbPppEurSummaryStats],
        spark: SparkSession,
        raw_summary_stats_path: str,
        tmp_variant_annotation_path: str,
        chromosome: str,
        study_index_path: str,
    ) -> SummaryStatistics:
        """Ingest and harmonise all summary stats for UKB PPP (EUR) data.

        Args:
            spark (SparkSession): Spark session object.
            raw_summary_stats_path (str): Input raw summary stats path.
            tmp_variant_annotation_path (str): Input variant annotation dataset path.
            chromosome (str): Which chromosome to process.
            study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.

        Returns:
            SummaryStatistics: Processed summary statistics dataset for a given chromosome.
        """
        df = harmonise_summary_stats(
            spark,
            raw_summary_stats_path,
            tmp_variant_annotation_path,
            chromosome,
            colname_position="GENPOS",
            colname_allele0="ALLELE0",
            colname_allele1="ALLELE1",
            colname_a1freq="A1FREQ",
            colname_info="INFO",
            colname_beta="BETA",
            colname_se="SE",
            colname_mlog10p="LOG10P",
            colname_n="N",
        )

        # Create the summary statistics object.
        return SummaryStatistics(
            _df=df,
            _schema=SummaryStatistics.get_schema(),
        )

    @classmethod
    def process_summary_stats_per_chromosome(
        cls,
        session: Session,
        raw_summary_stats_path: str,
        tmp_variant_annotation_path: str,
        summary_stats_output_path: str,
        study_index_path: str,
    ) -> None:
        """Processes summary statistics for each chromosome, partitioning and writing results.

        Args:
            session (Session): The Gentropy session session to use for distributed data processing.
            raw_summary_stats_path (str): The path to the raw summary statistics files.
            tmp_variant_annotation_path (str): The path to temporary variant annotation data, used for chromosome joins.
            summary_stats_output_path (str): The output path to write processed summary statistics as parquet files.
            study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.
        """
        # Set mode to overwrite for processing the first chromosome.
        write_mode = "overwrite"
        # Chromosome 23 is X, this is handled downstream.
        for chromosome in list(range(1, 24)):
            logging_message = f"  Processing chromosome {chromosome}"
            session.logger.info(logging_message)
            (
                cls.from_source(
                    spark=session.spark,
                    raw_summary_stats_path=raw_summary_stats_path,
                    tmp_variant_annotation_path=tmp_variant_annotation_path,
                    chromosome=str(chromosome),
                    study_index_path=study_index_path,
                )
                .df.coalesce(1)
                .repartition("studyId", "chromosome")
                .write.partitionBy("studyId", "chromosome")
                .mode(write_mode)
                .parquet(summary_stats_output_path)
            )
            # Now that we have written the first chromosome, change mode to append for subsequent operations.
            write_mode = "append"

from_source(spark: SparkSession, raw_summary_stats_path: str, tmp_variant_annotation_path: str, chromosome: str, study_index_path: str) -> SummaryStatistics classmethod

Ingest and harmonise all summary stats for UKB PPP (EUR) data.

Parameters:

Name Type Description Default
spark SparkSession

Spark session object.

required
raw_summary_stats_path str

Input raw summary stats path.

required
tmp_variant_annotation_path str

Input variant annotation dataset path.

required
chromosome str

Which chromosome to process.

required
study_index_path str

The path to study index, which is necessary in some cases to populate the sample size column.

required

Returns:

Name Type Description
SummaryStatistics SummaryStatistics

Processed summary statistics dataset for a given chromosome.

Source code in src/gentropy/datasource/ukb_ppp_eur/summary_stats.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@classmethod
def from_source(
    cls: type[UkbPppEurSummaryStats],
    spark: SparkSession,
    raw_summary_stats_path: str,
    tmp_variant_annotation_path: str,
    chromosome: str,
    study_index_path: str,
) -> SummaryStatistics:
    """Ingest and harmonise all summary stats for UKB PPP (EUR) data.

    Args:
        spark (SparkSession): Spark session object.
        raw_summary_stats_path (str): Input raw summary stats path.
        tmp_variant_annotation_path (str): Input variant annotation dataset path.
        chromosome (str): Which chromosome to process.
        study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.

    Returns:
        SummaryStatistics: Processed summary statistics dataset for a given chromosome.
    """
    df = harmonise_summary_stats(
        spark,
        raw_summary_stats_path,
        tmp_variant_annotation_path,
        chromosome,
        colname_position="GENPOS",
        colname_allele0="ALLELE0",
        colname_allele1="ALLELE1",
        colname_a1freq="A1FREQ",
        colname_info="INFO",
        colname_beta="BETA",
        colname_se="SE",
        colname_mlog10p="LOG10P",
        colname_n="N",
    )

    # Create the summary statistics object.
    return SummaryStatistics(
        _df=df,
        _schema=SummaryStatistics.get_schema(),
    )

process_summary_stats_per_chromosome(session: Session, raw_summary_stats_path: str, tmp_variant_annotation_path: str, summary_stats_output_path: str, study_index_path: str) -> None classmethod

Processes summary statistics for each chromosome, partitioning and writing results.

Parameters:

Name Type Description Default
session Session

The Gentropy session session to use for distributed data processing.

required
raw_summary_stats_path str

The path to the raw summary statistics files.

required
tmp_variant_annotation_path str

The path to temporary variant annotation data, used for chromosome joins.

required
summary_stats_output_path str

The output path to write processed summary statistics as parquet files.

required
study_index_path str

The path to study index, which is necessary in some cases to populate the sample size column.

required
Source code in src/gentropy/datasource/ukb_ppp_eur/summary_stats.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
@classmethod
def process_summary_stats_per_chromosome(
    cls,
    session: Session,
    raw_summary_stats_path: str,
    tmp_variant_annotation_path: str,
    summary_stats_output_path: str,
    study_index_path: str,
) -> None:
    """Processes summary statistics for each chromosome, partitioning and writing results.

    Args:
        session (Session): The Gentropy session session to use for distributed data processing.
        raw_summary_stats_path (str): The path to the raw summary statistics files.
        tmp_variant_annotation_path (str): The path to temporary variant annotation data, used for chromosome joins.
        summary_stats_output_path (str): The output path to write processed summary statistics as parquet files.
        study_index_path (str): The path to study index, which is necessary in some cases to populate the sample size column.
    """
    # Set mode to overwrite for processing the first chromosome.
    write_mode = "overwrite"
    # Chromosome 23 is X, this is handled downstream.
    for chromosome in list(range(1, 24)):
        logging_message = f"  Processing chromosome {chromosome}"
        session.logger.info(logging_message)
        (
            cls.from_source(
                spark=session.spark,
                raw_summary_stats_path=raw_summary_stats_path,
                tmp_variant_annotation_path=tmp_variant_annotation_path,
                chromosome=str(chromosome),
                study_index_path=study_index_path,
            )
            .df.coalesce(1)
            .repartition("studyId", "chromosome")
            .write.partitionBy("studyId", "chromosome")
            .mode(write_mode)
            .parquet(summary_stats_output_path)
        )
        # Now that we have written the first chromosome, change mode to append for subsequent operations.
        write_mode = "append"