Study index validation step.
This step reads and outputs a study index dataset with flagged studies
when target of disease validation fails.
Source code in src/gentropy/study_validation.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | class StudyValidationStep:
"""Study index validation step.
This step reads and outputs a study index dataset with flagged studies
when target of disease validation fails.
"""
def __init__(
self,
session: Session,
study_index_path: list[str],
target_index_path: str,
disease_index_path: str,
biosample_index_path: str,
valid_study_index_path: str,
invalid_study_index_path: str,
# NOTE: do not use container as default argument!
# https://github.com/satwikkansal/wtfpython?tab=readme-ov-file#-beware-of-default-mutable-arguments
invalid_qc_reasons: list[str] | None = None,
deprecated_project_ids: list[str] | None = None,
) -> None:
"""Initialize step.
Args:
session (Session): Session object.
study_index_path (list[str]): Path to study index file.
target_index_path (str): Path to target index file.
disease_index_path (str): Path to disease index file.
biosample_index_path (str): Path to biosample index file.
valid_study_index_path (str): Path to write the valid records.
invalid_study_index_path (str): Path to write the output file.
invalid_qc_reasons (list[str] | None): List of invalid quality check reason names from `StudyQualityCheck` (e.g. ['DUPLICATED_STUDY']).
deprecated_project_ids (list[str] | None): List of deprecated projectIds, (e.g. ['GTEx']).
"""
invalid_qc_reasons = invalid_qc_reasons or []
deprecated_project_ids = deprecated_project_ids or []
# Reading datasets:
target_index = TargetIndex.from_parquet(session, target_index_path)
biosample_index = BiosampleIndex.from_parquet(session, biosample_index_path)
# Reading disease index and pre-process.
# This logic does not belong anywhere, but gentorpy has no disease dataset yet.
disease_index = (
session.spark.read.parquet(disease_index_path)
.select(
f.col("id").alias("diseaseId"),
f.explode_outer(
f.when(
f.col("obsoleteTerms").isNotNull(),
f.array_union(f.array("id"), f.col("obsoleteTerms")),
)
).alias("efo"),
)
.withColumn("efo", f.coalesce(f.col("efo"), f.col("diseaseId")))
)
study_index = StudyIndex.from_parquet(session, list(study_index_path))
# Running validation:
study_index_with_qc = (
study_index.deconvolute_studies() # Deconvolute studies where the same study is ingested from multiple sources
.validate_study_type() # Flagging non-supported study types
.validate_project_id(deprecated_project_ids) # Flag obsolete projectIds
.validate_target(target_index) # Flagging QTL studies with invalid targets
.validate_disease(disease_index) # Flagging invalid EFOs
.validate_biosample(
biosample_index
) # Flagging QTL studies with invalid biosamples
.validate_analysis_flags() # Flagging studies with case case design
).persist() # we will need this for 2 types of outputs
study_index_with_qc.valid_rows(invalid_qc_reasons, invalid=True).df.coalesce(
session.output_partitions
).write.mode(session.write_mode).parquet(invalid_study_index_path)
study_index_with_qc.valid_rows(invalid_qc_reasons).df.coalesce(
session.output_partitions
).write.mode(session.write_mode).parquet(valid_study_index_path)
|
__init__(session: Session, study_index_path: list[str], target_index_path: str, disease_index_path: str, biosample_index_path: str, valid_study_index_path: str, invalid_study_index_path: str, invalid_qc_reasons: list[str] | None = None, deprecated_project_ids: list[str] | None = None) -> None
Initialize step.
Parameters:
| Name |
Type |
Description |
Default |
session
|
Session
|
|
required
|
study_index_path
|
list[str]
|
Path to study index file.
|
required
|
target_index_path
|
str
|
Path to target index file.
|
required
|
disease_index_path
|
str
|
Path to disease index file.
|
required
|
biosample_index_path
|
str
|
Path to biosample index file.
|
required
|
valid_study_index_path
|
str
|
Path to write the valid records.
|
required
|
invalid_study_index_path
|
str
|
Path to write the output file.
|
required
|
invalid_qc_reasons
|
list[str] | None
|
List of invalid quality check reason names from StudyQualityCheck (e.g. ['DUPLICATED_STUDY']).
|
None
|
deprecated_project_ids
|
list[str] | None
|
List of deprecated projectIds, (e.g. ['GTEx']).
|
None
|
Source code in src/gentropy/study_validation.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 | def __init__(
self,
session: Session,
study_index_path: list[str],
target_index_path: str,
disease_index_path: str,
biosample_index_path: str,
valid_study_index_path: str,
invalid_study_index_path: str,
# NOTE: do not use container as default argument!
# https://github.com/satwikkansal/wtfpython?tab=readme-ov-file#-beware-of-default-mutable-arguments
invalid_qc_reasons: list[str] | None = None,
deprecated_project_ids: list[str] | None = None,
) -> None:
"""Initialize step.
Args:
session (Session): Session object.
study_index_path (list[str]): Path to study index file.
target_index_path (str): Path to target index file.
disease_index_path (str): Path to disease index file.
biosample_index_path (str): Path to biosample index file.
valid_study_index_path (str): Path to write the valid records.
invalid_study_index_path (str): Path to write the output file.
invalid_qc_reasons (list[str] | None): List of invalid quality check reason names from `StudyQualityCheck` (e.g. ['DUPLICATED_STUDY']).
deprecated_project_ids (list[str] | None): List of deprecated projectIds, (e.g. ['GTEx']).
"""
invalid_qc_reasons = invalid_qc_reasons or []
deprecated_project_ids = deprecated_project_ids or []
# Reading datasets:
target_index = TargetIndex.from_parquet(session, target_index_path)
biosample_index = BiosampleIndex.from_parquet(session, biosample_index_path)
# Reading disease index and pre-process.
# This logic does not belong anywhere, but gentorpy has no disease dataset yet.
disease_index = (
session.spark.read.parquet(disease_index_path)
.select(
f.col("id").alias("diseaseId"),
f.explode_outer(
f.when(
f.col("obsoleteTerms").isNotNull(),
f.array_union(f.array("id"), f.col("obsoleteTerms")),
)
).alias("efo"),
)
.withColumn("efo", f.coalesce(f.col("efo"), f.col("diseaseId")))
)
study_index = StudyIndex.from_parquet(session, list(study_index_path))
# Running validation:
study_index_with_qc = (
study_index.deconvolute_studies() # Deconvolute studies where the same study is ingested from multiple sources
.validate_study_type() # Flagging non-supported study types
.validate_project_id(deprecated_project_ids) # Flag obsolete projectIds
.validate_target(target_index) # Flagging QTL studies with invalid targets
.validate_disease(disease_index) # Flagging invalid EFOs
.validate_biosample(
biosample_index
) # Flagging QTL studies with invalid biosamples
.validate_analysis_flags() # Flagging studies with case case design
).persist() # we will need this for 2 types of outputs
study_index_with_qc.valid_rows(invalid_qc_reasons, invalid=True).df.coalesce(
session.output_partitions
).write.mode(session.write_mode).parquet(invalid_study_index_path)
study_index_with_qc.valid_rows(invalid_qc_reasons).df.coalesce(
session.output_partitions
).write.mode(session.write_mode).parquet(valid_study_index_path)
|