Skip to content

Study Validation

gentropy.study_validation.StudyValidationStep

Study index validation step.

This step reads and outputs a study index dataset with flagged studies when target of disease validation fails.

Source code in src/gentropy/study_validation.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class StudyValidationStep:
    """Study index validation step.

    This step reads and outputs a study index dataset with flagged studies
    when target of disease validation fails.
    """

    def __init__(
        self,
        session: Session,
        study_index_path: list[str],
        target_index_path: str,
        disease_index_path: str,
        biosample_index_path: str,
        valid_study_index_path: str,
        invalid_study_index_path: str,
        invalid_qc_reasons: list[str] = [],
    ) -> None:
        """Initialize step.

        Args:
            session (Session): Session object.
            study_index_path (list[str]): Path to study index file.
            target_index_path (str): Path to target index file.
            disease_index_path (str): Path to disease index file.
            biosample_index_path (str): Path to biosample index file.
            valid_study_index_path (str): Path to write the valid records.
            invalid_study_index_path (str): Path to write the output file.
            invalid_qc_reasons (list[str]): List of invalid quality check reason names from `StudyQualityCheck` (e.g. ['DUPLICATED_STUDY']).
        """
        # Reading datasets:
        target_index = TargetIndex.from_parquet(session, target_index_path)
        biosample_index = BiosampleIndex.from_parquet(session, biosample_index_path)
        # Reading disease index and pre-process.
        # This logic does not belong anywhere, but gentorpy has no disease dataset yet.
        disease_index = (
            session.spark.read.parquet(disease_index_path)
            .select(
                f.col("id").alias("diseaseId"),
                f.explode_outer(
                    f.when(
                        f.col("obsoleteTerms").isNotNull(),
                        f.array_union(f.array("id"), f.col("obsoleteTerms")),
                    )
                ).alias("efo"),
            )
            .withColumn("efo", f.coalesce(f.col("efo"), f.col("diseaseId")))
        )
        study_index = StudyIndex.from_parquet(session, list(study_index_path))

        # Running validation:
        study_index_with_qc = (
            study_index.deconvolute_studies()  # Deconvolute studies where the same study is ingested from multiple sources
            .validate_study_type()  # Flagging non-supported study types
            .validate_target(target_index)  # Flagging QTL studies with invalid targets
            .validate_disease(disease_index)  # Flagging invalid EFOs
            .validate_biosample(
                biosample_index
            )  # Flagging QTL studies with invalid biosamples
        ).persist()  # we will need this for 2 types of outputs

        study_index_with_qc.valid_rows(invalid_qc_reasons, invalid=True).df.coalesce(
            session.output_partitions
        ).write.mode(session.write_mode).parquet(invalid_study_index_path)

        study_index_with_qc.valid_rows(invalid_qc_reasons).df.coalesce(
            session.output_partitions
        ).write.mode(session.write_mode).parquet(valid_study_index_path)

__init__(session: Session, study_index_path: list[str], target_index_path: str, disease_index_path: str, biosample_index_path: str, valid_study_index_path: str, invalid_study_index_path: str, invalid_qc_reasons: list[str] = []) -> None

Initialize step.

Parameters:

Name Type Description Default
session Session

Session object.

required
study_index_path list[str]

Path to study index file.

required
target_index_path str

Path to target index file.

required
disease_index_path str

Path to disease index file.

required
biosample_index_path str

Path to biosample index file.

required
valid_study_index_path str

Path to write the valid records.

required
invalid_study_index_path str

Path to write the output file.

required
invalid_qc_reasons list[str]

List of invalid quality check reason names from StudyQualityCheck (e.g. ['DUPLICATED_STUDY']).

[]
Source code in src/gentropy/study_validation.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    session: Session,
    study_index_path: list[str],
    target_index_path: str,
    disease_index_path: str,
    biosample_index_path: str,
    valid_study_index_path: str,
    invalid_study_index_path: str,
    invalid_qc_reasons: list[str] = [],
) -> None:
    """Initialize step.

    Args:
        session (Session): Session object.
        study_index_path (list[str]): Path to study index file.
        target_index_path (str): Path to target index file.
        disease_index_path (str): Path to disease index file.
        biosample_index_path (str): Path to biosample index file.
        valid_study_index_path (str): Path to write the valid records.
        invalid_study_index_path (str): Path to write the output file.
        invalid_qc_reasons (list[str]): List of invalid quality check reason names from `StudyQualityCheck` (e.g. ['DUPLICATED_STUDY']).
    """
    # Reading datasets:
    target_index = TargetIndex.from_parquet(session, target_index_path)
    biosample_index = BiosampleIndex.from_parquet(session, biosample_index_path)
    # Reading disease index and pre-process.
    # This logic does not belong anywhere, but gentorpy has no disease dataset yet.
    disease_index = (
        session.spark.read.parquet(disease_index_path)
        .select(
            f.col("id").alias("diseaseId"),
            f.explode_outer(
                f.when(
                    f.col("obsoleteTerms").isNotNull(),
                    f.array_union(f.array("id"), f.col("obsoleteTerms")),
                )
            ).alias("efo"),
        )
        .withColumn("efo", f.coalesce(f.col("efo"), f.col("diseaseId")))
    )
    study_index = StudyIndex.from_parquet(session, list(study_index_path))

    # Running validation:
    study_index_with_qc = (
        study_index.deconvolute_studies()  # Deconvolute studies where the same study is ingested from multiple sources
        .validate_study_type()  # Flagging non-supported study types
        .validate_target(target_index)  # Flagging QTL studies with invalid targets
        .validate_disease(disease_index)  # Flagging invalid EFOs
        .validate_biosample(
            biosample_index
        )  # Flagging QTL studies with invalid biosamples
    ).persist()  # we will need this for 2 types of outputs

    study_index_with_qc.valid_rows(invalid_qc_reasons, invalid=True).df.coalesce(
        session.output_partitions
    ).write.mode(session.write_mode).parquet(invalid_study_index_path)

    study_index_with_qc.valid_rows(invalid_qc_reasons).df.coalesce(
        session.output_partitions
    ).write.mode(session.write_mode).parquet(valid_study_index_path)