Bases: VariantIndexStepConfig
Variant index step.
Using a VariantAnnotation
dataset as a reference, this step creates and writes a dataset of the type VariantIndex
that includes only variants that have disease-association data with a reduced set of annotations.
Source code in src/otg/variant_index.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | @dataclass
class VariantIndexStep(VariantIndexStepConfig):
"""Variant index step.
Using a `VariantAnnotation` dataset as a reference, this step creates and writes a dataset of the type `VariantIndex` that includes only variants that have disease-association data with a reduced set of annotations.
"""
session: Session = Session()
def run(self: VariantIndexStep) -> None:
"""Run variant index step."""
# Variant annotation dataset
va = VariantAnnotation.from_parquet(self.session, self.variant_annotation_path)
# Study-locus dataset
study_locus = StudyLocus.from_parquet(self.session, self.study_locus_path)
# Reduce scope of variant annotation dataset to only variants in study-locus sets:
va_slimmed = va.filter_by_variant_df(
study_locus.unique_lead_tag_variants(), ["id", "chromosome"]
)
# Generate variant index ussing a subset of the variant annotation dataset
vi = VariantIndex.from_variant_annotation(va_slimmed)
# Write data:
# self.etl.logger.info(
# f"Writing invalid variants from the credible set to: {self.variant_invalid}"
# )
# vi.invalid_variants.write.mode(self.etl.write_mode).parquet(
# self.variant_invalid
# )
self.session.logger.info(f"Writing variant index to: {self.variant_index_path}")
(
vi.df.write.partitionBy("chromosome")
.mode(self.session.write_mode)
.parquet(self.variant_index_path)
)
|
run()
Run variant index step.
Source code in src/otg/variant_index.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 | def run(self: VariantIndexStep) -> None:
"""Run variant index step."""
# Variant annotation dataset
va = VariantAnnotation.from_parquet(self.session, self.variant_annotation_path)
# Study-locus dataset
study_locus = StudyLocus.from_parquet(self.session, self.study_locus_path)
# Reduce scope of variant annotation dataset to only variants in study-locus sets:
va_slimmed = va.filter_by_variant_df(
study_locus.unique_lead_tag_variants(), ["id", "chromosome"]
)
# Generate variant index ussing a subset of the variant annotation dataset
vi = VariantIndex.from_variant_annotation(va_slimmed)
# Write data:
# self.etl.logger.info(
# f"Writing invalid variants from the credible set to: {self.variant_invalid}"
# )
# vi.invalid_variants.write.mode(self.etl.write_mode).parquet(
# self.variant_invalid
# )
self.session.logger.info(f"Writing variant index to: {self.variant_index_path}")
(
vi.df.write.partitionBy("chromosome")
.mode(self.session.write_mode)
.parquet(self.variant_index_path)
)
|