Skip to content

Locus to Gene (L2G)

gentropy.l2g.LocusToGeneFeatureMatrixStep

Annotate credible set with functional genomics features.

Source code in src/gentropy/l2g.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class LocusToGeneFeatureMatrixStep:
    """Annotate credible set with functional genomics features."""

    def __init__(
        self,
        session: Session,
        *,
        features_list: list[str],
        credible_set_path: str,
        variant_index_path: str | None = None,
        colocalisation_path: str | None = None,
        study_index_path: str | None = None,
        gene_index_path: str | None = None,
        feature_matrix_path: str,
    ) -> None:
        """Initialise the step and run the logic based on mode.

        Args:
            session (Session): Session object that contains the Spark session
            features_list (list[str]): List of features to use for the model
            credible_set_path (str): Path to the credible set dataset necessary to build the feature matrix
            variant_index_path (str | None): Path to the variant index dataset
            colocalisation_path (str | None): Path to the colocalisation dataset
            study_index_path (str | None): Path to the study index dataset
            gene_index_path (str | None): Path to the gene index dataset
            feature_matrix_path (str): Path to the L2G feature matrix output dataset
        """
        credible_set = StudyLocus.from_parquet(
            session, credible_set_path, recursiveFileLookup=True
        )
        studies = (
            StudyIndex.from_parquet(session, study_index_path, recursiveFileLookup=True)
            if study_index_path
            else None
        )
        variant_index = (
            VariantIndex.from_parquet(session, variant_index_path)
            if variant_index_path
            else None
        )
        coloc = (
            Colocalisation.from_parquet(
                session, colocalisation_path, recursiveFileLookup=True
            )
            if colocalisation_path
            else None
        )
        gene_index = (
            GeneIndex.from_parquet(session, gene_index_path, recursiveFileLookup=True)
            if gene_index_path
            else None
        )
        features_input_loader = L2GFeatureInputLoader(
            variant_index=variant_index,
            colocalisation=coloc,
            study_index=studies,
            study_locus=credible_set,
            gene_index=gene_index,
        )

        fm = credible_set.filter(f.col("studyType") == "gwas").build_feature_matrix(
            features_list, features_input_loader
        )
        fm._df.coalesce(session.output_partitions).write.mode(
            session.write_mode
        ).parquet(feature_matrix_path)

__init__(session: Session, *, features_list: list[str], credible_set_path: str, variant_index_path: str | None = None, colocalisation_path: str | None = None, study_index_path: str | None = None, gene_index_path: str | None = None, feature_matrix_path: str) -> None

Initialise the step and run the logic based on mode.

Parameters:

Name Type Description Default
session Session

Session object that contains the Spark session

required
features_list list[str]

List of features to use for the model

required
credible_set_path str

Path to the credible set dataset necessary to build the feature matrix

required
variant_index_path str | None

Path to the variant index dataset

None
colocalisation_path str | None

Path to the colocalisation dataset

None
study_index_path str | None

Path to the study index dataset

None
gene_index_path str | None

Path to the gene index dataset

None
feature_matrix_path str

Path to the L2G feature matrix output dataset

required
Source code in src/gentropy/l2g.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    session: Session,
    *,
    features_list: list[str],
    credible_set_path: str,
    variant_index_path: str | None = None,
    colocalisation_path: str | None = None,
    study_index_path: str | None = None,
    gene_index_path: str | None = None,
    feature_matrix_path: str,
) -> None:
    """Initialise the step and run the logic based on mode.

    Args:
        session (Session): Session object that contains the Spark session
        features_list (list[str]): List of features to use for the model
        credible_set_path (str): Path to the credible set dataset necessary to build the feature matrix
        variant_index_path (str | None): Path to the variant index dataset
        colocalisation_path (str | None): Path to the colocalisation dataset
        study_index_path (str | None): Path to the study index dataset
        gene_index_path (str | None): Path to the gene index dataset
        feature_matrix_path (str): Path to the L2G feature matrix output dataset
    """
    credible_set = StudyLocus.from_parquet(
        session, credible_set_path, recursiveFileLookup=True
    )
    studies = (
        StudyIndex.from_parquet(session, study_index_path, recursiveFileLookup=True)
        if study_index_path
        else None
    )
    variant_index = (
        VariantIndex.from_parquet(session, variant_index_path)
        if variant_index_path
        else None
    )
    coloc = (
        Colocalisation.from_parquet(
            session, colocalisation_path, recursiveFileLookup=True
        )
        if colocalisation_path
        else None
    )
    gene_index = (
        GeneIndex.from_parquet(session, gene_index_path, recursiveFileLookup=True)
        if gene_index_path
        else None
    )
    features_input_loader = L2GFeatureInputLoader(
        variant_index=variant_index,
        colocalisation=coloc,
        study_index=studies,
        study_locus=credible_set,
        gene_index=gene_index,
    )

    fm = credible_set.filter(f.col("studyType") == "gwas").build_feature_matrix(
        features_list, features_input_loader
    )
    fm._df.coalesce(session.output_partitions).write.mode(
        session.write_mode
    ).parquet(feature_matrix_path)

gentropy.l2g.LocusToGeneStep

Locus to gene step.

Source code in src/gentropy/l2g.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
class LocusToGeneStep:
    """Locus to gene step."""

    def __init__(
        self,
        session: Session,
        *,
        run_mode: str,
        features_list: list[str],
        hyperparameters: dict[str, Any],
        download_from_hub: bool,
        cross_validate: bool,
        wandb_run_name: str,
        credible_set_path: str,
        feature_matrix_path: str,
        model_path: str | None = None,
        gold_standard_curation_path: str | None = None,
        variant_index_path: str | None = None,
        gene_interactions_path: str | None = None,
        predictions_path: str | None = None,
        l2g_threshold: float | None = None,
        hf_hub_repo_id: str | None = None,
        hf_model_commit_message: str | None = "chore: update model",
    ) -> None:
        """Initialise the step and run the logic based on mode.

        Args:
            session (Session): Session object that contains the Spark session
            run_mode (str): Run mode, either 'train' or 'predict'
            features_list (list[str]): List of features to use for the model
            hyperparameters (dict[str, Any]): Hyperparameters for the model
            download_from_hub (bool): Whether to download the model from Hugging Face Hub
            cross_validate (bool): Whether to run cross validation (5-fold by default) to train the model.
            wandb_run_name (str): Name of the run to track model training in Weights and Biases
            credible_set_path (str): Path to the credible set dataset necessary to build the feature matrix
            feature_matrix_path (str): Path to the L2G feature matrix input dataset
            model_path (str | None): Path to the model. It can be either in the filesystem or the name on the Hugging Face Hub (in the form of username/repo_name).
            gold_standard_curation_path (str | None): Path to the gold standard curation file
            variant_index_path (str | None): Path to the variant index
            gene_interactions_path (str | None): Path to the gene interactions dataset
            predictions_path (str | None): Path to the L2G predictions output dataset
            l2g_threshold (float | None): An optional threshold for the L2G score to filter predictions. A threshold of 0.05 is recommended.
            hf_hub_repo_id (str | None): Hugging Face Hub repository ID. If provided, the model will be uploaded to Hugging Face.
            hf_model_commit_message (str | None): Commit message when we upload the model to the Hugging Face Hub

        Raises:
            ValueError: If run_mode is not 'train' or 'predict'
        """
        if run_mode not in ["train", "predict"]:
            raise ValueError(
                f"run_mode must be one of 'train' or 'predict', got {run_mode}"
            )

        self.session = session
        self.run_mode = run_mode
        self.model_path = model_path
        self.predictions_path = predictions_path
        self.features_list = list(features_list)
        self.hyperparameters = dict(hyperparameters)
        self.wandb_run_name = wandb_run_name
        self.cross_validate = cross_validate
        self.hf_hub_repo_id = hf_hub_repo_id
        self.download_from_hub = download_from_hub
        self.hf_model_commit_message = hf_model_commit_message
        self.l2g_threshold = l2g_threshold or 0.0
        self.gold_standard_curation_path = gold_standard_curation_path
        self.gene_interactions_path = gene_interactions_path
        self.variant_index_path = variant_index_path

        # Load common inputs
        self.credible_set = StudyLocus.from_parquet(
            session, credible_set_path, recursiveFileLookup=True
        )
        self.feature_matrix = L2GFeatureMatrix(
            _df=session.load_data(feature_matrix_path),
        )

        if run_mode == "predict":
            self.run_predict()
        elif run_mode == "train":
            self.gold_standard = self.prepare_gold_standard()
            self.run_train()

    def prepare_gold_standard(self) -> L2GGoldStandard:
        """Prepare the gold standard for training.

        Returns:
            L2GGoldStandard: training dataset.

        Raises:
            ValueError: When gold standard path, is not provided, or when
                parsing OTG gold standard but missing interactions and variant index paths.
            TypeError: When gold standard is not OTG gold standard nor L2GGoldStandard.

        """
        if self.gold_standard_curation_path is None:
            raise ValueError("Gold Standard is required for model training.")
        # Read the gold standard either from json or parquet, default to parquet if can not infer the format from extension.
        ext = self.gold_standard_curation_path.split(".")[-1]
        ext = "parquet" if ext not in ["parquet", "json"] else ext
        gold_standard = self.session.load_data(self.gold_standard_curation_path, ext)
        schema_issues = compare_struct_schemas(
            gold_standard.schema, L2GGoldStandard.get_schema()
        )
        # Parse the gold standard depending on the input schema
        match schema_issues:
            case {**extra} if not extra:
                # Schema is the same as L2GGoldStandard - load the GS
                # NOTE: match to empty dict will be non-selective
                # see https://stackoverflow.com/questions/75389166/how-to-match-an-empty-dictionary
                logging.info("Successfully parsed gold standard.")
                return L2GGoldStandard(
                    _df=gold_standard,
                    _schema=L2GGoldStandard.get_schema(),
                )
            case {
                "missing_mandatory_columns": [
                    "studyLocusId",
                    "variantId",
                    "studyId",
                    "geneId",
                    "goldStandardSet",
                ],
                "unexpected_columns": [
                    "association_info",
                    "gold_standard_info",
                    "metadata",
                    "sentinel_variant",
                    "trait_info",
                ],
            }:
                # There are schema mismatches, this would mean that we have
                logging.info("Detected OTG Gold Standard. Attempting to parse it.")
                otg_curation = gold_standard
                if self.gene_interactions_path is None:
                    raise ValueError("Interactions are required for parsing curation.")
                if self.variant_index_path is None:
                    raise ValueError("Variant Index are required for parsing curation.")

                interactions = self.session.load_data(
                    self.gene_interactions_path, "parquet"
                )
                variant_index = VariantIndex.from_parquet(
                    self.session, self.variant_index_path
                )
                study_locus_overlap = StudyLocus(
                    _df=self.credible_set.df.join(
                        otg_curation.select(
                            f.concat_ws(
                                "_",
                                f.col("sentinel_variant.locus_GRCh38.chromosome"),
                                f.col("sentinel_variant.locus_GRCh38.position"),
                                f.col("sentinel_variant.alleles.reference"),
                                f.col("sentinel_variant.alleles.alternative"),
                            ).alias("variantId"),
                            f.col("association_info.otg_id").alias("studyId"),
                        ),
                        on=[
                            "studyId",
                            "variantId",
                        ],
                        how="inner",
                    ),
                    _schema=StudyLocus.get_schema(),
                ).find_overlaps()

                return L2GGoldStandard.from_otg_curation(
                    gold_standard_curation=otg_curation,
                    variant_index=variant_index,
                    study_locus_overlap=study_locus_overlap,
                    interactions=interactions,
                )
            case _:
                raise TypeError("Incorrect gold standard dataset provided.")

    def run_predict(self) -> None:
        """Run the prediction step.

        Raises:
            ValueError: If predictions_path is not provided for prediction mode
        """
        if not self.predictions_path:
            raise ValueError("predictions_path must be provided for prediction mode")
        predictions = L2GPrediction.from_credible_set(
            self.session,
            self.credible_set,
            self.feature_matrix,
            self.features_list,
            model_path=self.model_path,
            hf_token=access_gcp_secret("hfhub-key", "open-targets-genetics-dev"),
            download_from_hub=self.download_from_hub,
        )
        predictions.filter(
            f.col("score") >= self.l2g_threshold
        ).add_locus_to_gene_features(
            self.feature_matrix, self.features_list
        ).df.coalesce(self.session.output_partitions).write.mode(
            self.session.write_mode
        ).parquet(self.predictions_path)
        self.session.logger.info("L2G predictions saved successfully.")

    def run_train(self) -> None:
        """Run the training step."""
        # Initialize access to weights and biases
        wandb_key = access_gcp_secret("wandb-key", "open-targets-genetics-dev")
        wandb_login(key=wandb_key)

        # Instantiate classifier and train model
        l2g_model = LocusToGeneModel(
            model=GradientBoostingClassifier(random_state=42, loss="log_loss"),
            hyperparameters=self.hyperparameters,
        )

        # Calculate the gold standard features
        feature_matrix = self._annotate_gold_standards_w_feature_matrix()

        # Run the training
        trained_model = LocusToGeneTrainer(
            model=l2g_model, feature_matrix=feature_matrix
        ).train(self.wandb_run_name, cross_validate=self.cross_validate)

        # Export the model
        if trained_model.training_data and trained_model.model and self.model_path:
            trained_model.save(self.model_path)
            if self.hf_hub_repo_id and self.hf_model_commit_message:
                hf_hub_token = access_gcp_secret(
                    "hfhub-key", "open-targets-genetics-dev"
                )
                trained_model.export_to_hugging_face_hub(
                    # we upload the model in the filesystem
                    self.model_path.split("/")[-1],
                    hf_hub_token,
                    data=trained_model.training_data._df.drop(
                        "goldStandardSet", "geneId"
                    ).toPandas(),
                    repo_id=self.hf_hub_repo_id,
                    commit_message=self.hf_model_commit_message,
                )

    def _annotate_gold_standards_w_feature_matrix(self) -> L2GFeatureMatrix:
        """Generate the feature matrix of annotated gold standards.

        Returns:
            L2GFeatureMatrix: Feature matrix with gold standards annotated with features.
        """
        return (
            self.gold_standard.build_feature_matrix(
                self.feature_matrix, self.credible_set
            )
            .select_features(self.features_list)
            .persist()
        )

__init__(session: Session, *, run_mode: str, features_list: list[str], hyperparameters: dict[str, Any], download_from_hub: bool, cross_validate: bool, wandb_run_name: str, credible_set_path: str, feature_matrix_path: str, model_path: str | None = None, gold_standard_curation_path: str | None = None, variant_index_path: str | None = None, gene_interactions_path: str | None = None, predictions_path: str | None = None, l2g_threshold: float | None = None, hf_hub_repo_id: str | None = None, hf_model_commit_message: str | None = 'chore: update model') -> None

Initialise the step and run the logic based on mode.

Parameters:

Name Type Description Default
session Session

Session object that contains the Spark session

required
run_mode str

Run mode, either 'train' or 'predict'

required
features_list list[str]

List of features to use for the model

required
hyperparameters dict[str, Any]

Hyperparameters for the model

required
download_from_hub bool

Whether to download the model from Hugging Face Hub

required
cross_validate bool

Whether to run cross validation (5-fold by default) to train the model.

required
wandb_run_name str

Name of the run to track model training in Weights and Biases

required
credible_set_path str

Path to the credible set dataset necessary to build the feature matrix

required
feature_matrix_path str

Path to the L2G feature matrix input dataset

required
model_path str | None

Path to the model. It can be either in the filesystem or the name on the Hugging Face Hub (in the form of username/repo_name).

None
gold_standard_curation_path str | None

Path to the gold standard curation file

None
variant_index_path str | None

Path to the variant index

None
gene_interactions_path str | None

Path to the gene interactions dataset

None
predictions_path str | None

Path to the L2G predictions output dataset

None
l2g_threshold float | None

An optional threshold for the L2G score to filter predictions. A threshold of 0.05 is recommended.

None
hf_hub_repo_id str | None

Hugging Face Hub repository ID. If provided, the model will be uploaded to Hugging Face.

None
hf_model_commit_message str | None

Commit message when we upload the model to the Hugging Face Hub

'chore: update model'

Raises:

Type Description
ValueError

If run_mode is not 'train' or 'predict'

Source code in src/gentropy/l2g.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def __init__(
    self,
    session: Session,
    *,
    run_mode: str,
    features_list: list[str],
    hyperparameters: dict[str, Any],
    download_from_hub: bool,
    cross_validate: bool,
    wandb_run_name: str,
    credible_set_path: str,
    feature_matrix_path: str,
    model_path: str | None = None,
    gold_standard_curation_path: str | None = None,
    variant_index_path: str | None = None,
    gene_interactions_path: str | None = None,
    predictions_path: str | None = None,
    l2g_threshold: float | None = None,
    hf_hub_repo_id: str | None = None,
    hf_model_commit_message: str | None = "chore: update model",
) -> None:
    """Initialise the step and run the logic based on mode.

    Args:
        session (Session): Session object that contains the Spark session
        run_mode (str): Run mode, either 'train' or 'predict'
        features_list (list[str]): List of features to use for the model
        hyperparameters (dict[str, Any]): Hyperparameters for the model
        download_from_hub (bool): Whether to download the model from Hugging Face Hub
        cross_validate (bool): Whether to run cross validation (5-fold by default) to train the model.
        wandb_run_name (str): Name of the run to track model training in Weights and Biases
        credible_set_path (str): Path to the credible set dataset necessary to build the feature matrix
        feature_matrix_path (str): Path to the L2G feature matrix input dataset
        model_path (str | None): Path to the model. It can be either in the filesystem or the name on the Hugging Face Hub (in the form of username/repo_name).
        gold_standard_curation_path (str | None): Path to the gold standard curation file
        variant_index_path (str | None): Path to the variant index
        gene_interactions_path (str | None): Path to the gene interactions dataset
        predictions_path (str | None): Path to the L2G predictions output dataset
        l2g_threshold (float | None): An optional threshold for the L2G score to filter predictions. A threshold of 0.05 is recommended.
        hf_hub_repo_id (str | None): Hugging Face Hub repository ID. If provided, the model will be uploaded to Hugging Face.
        hf_model_commit_message (str | None): Commit message when we upload the model to the Hugging Face Hub

    Raises:
        ValueError: If run_mode is not 'train' or 'predict'
    """
    if run_mode not in ["train", "predict"]:
        raise ValueError(
            f"run_mode must be one of 'train' or 'predict', got {run_mode}"
        )

    self.session = session
    self.run_mode = run_mode
    self.model_path = model_path
    self.predictions_path = predictions_path
    self.features_list = list(features_list)
    self.hyperparameters = dict(hyperparameters)
    self.wandb_run_name = wandb_run_name
    self.cross_validate = cross_validate
    self.hf_hub_repo_id = hf_hub_repo_id
    self.download_from_hub = download_from_hub
    self.hf_model_commit_message = hf_model_commit_message
    self.l2g_threshold = l2g_threshold or 0.0
    self.gold_standard_curation_path = gold_standard_curation_path
    self.gene_interactions_path = gene_interactions_path
    self.variant_index_path = variant_index_path

    # Load common inputs
    self.credible_set = StudyLocus.from_parquet(
        session, credible_set_path, recursiveFileLookup=True
    )
    self.feature_matrix = L2GFeatureMatrix(
        _df=session.load_data(feature_matrix_path),
    )

    if run_mode == "predict":
        self.run_predict()
    elif run_mode == "train":
        self.gold_standard = self.prepare_gold_standard()
        self.run_train()

prepare_gold_standard() -> L2GGoldStandard

Prepare the gold standard for training.

Returns:

Name Type Description
L2GGoldStandard L2GGoldStandard

training dataset.

Raises:

Type Description
ValueError

When gold standard path, is not provided, or when parsing OTG gold standard but missing interactions and variant index paths.

TypeError

When gold standard is not OTG gold standard nor L2GGoldStandard.

Source code in src/gentropy/l2g.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
def prepare_gold_standard(self) -> L2GGoldStandard:
    """Prepare the gold standard for training.

    Returns:
        L2GGoldStandard: training dataset.

    Raises:
        ValueError: When gold standard path, is not provided, or when
            parsing OTG gold standard but missing interactions and variant index paths.
        TypeError: When gold standard is not OTG gold standard nor L2GGoldStandard.

    """
    if self.gold_standard_curation_path is None:
        raise ValueError("Gold Standard is required for model training.")
    # Read the gold standard either from json or parquet, default to parquet if can not infer the format from extension.
    ext = self.gold_standard_curation_path.split(".")[-1]
    ext = "parquet" if ext not in ["parquet", "json"] else ext
    gold_standard = self.session.load_data(self.gold_standard_curation_path, ext)
    schema_issues = compare_struct_schemas(
        gold_standard.schema, L2GGoldStandard.get_schema()
    )
    # Parse the gold standard depending on the input schema
    match schema_issues:
        case {**extra} if not extra:
            # Schema is the same as L2GGoldStandard - load the GS
            # NOTE: match to empty dict will be non-selective
            # see https://stackoverflow.com/questions/75389166/how-to-match-an-empty-dictionary
            logging.info("Successfully parsed gold standard.")
            return L2GGoldStandard(
                _df=gold_standard,
                _schema=L2GGoldStandard.get_schema(),
            )
        case {
            "missing_mandatory_columns": [
                "studyLocusId",
                "variantId",
                "studyId",
                "geneId",
                "goldStandardSet",
            ],
            "unexpected_columns": [
                "association_info",
                "gold_standard_info",
                "metadata",
                "sentinel_variant",
                "trait_info",
            ],
        }:
            # There are schema mismatches, this would mean that we have
            logging.info("Detected OTG Gold Standard. Attempting to parse it.")
            otg_curation = gold_standard
            if self.gene_interactions_path is None:
                raise ValueError("Interactions are required for parsing curation.")
            if self.variant_index_path is None:
                raise ValueError("Variant Index are required for parsing curation.")

            interactions = self.session.load_data(
                self.gene_interactions_path, "parquet"
            )
            variant_index = VariantIndex.from_parquet(
                self.session, self.variant_index_path
            )
            study_locus_overlap = StudyLocus(
                _df=self.credible_set.df.join(
                    otg_curation.select(
                        f.concat_ws(
                            "_",
                            f.col("sentinel_variant.locus_GRCh38.chromosome"),
                            f.col("sentinel_variant.locus_GRCh38.position"),
                            f.col("sentinel_variant.alleles.reference"),
                            f.col("sentinel_variant.alleles.alternative"),
                        ).alias("variantId"),
                        f.col("association_info.otg_id").alias("studyId"),
                    ),
                    on=[
                        "studyId",
                        "variantId",
                    ],
                    how="inner",
                ),
                _schema=StudyLocus.get_schema(),
            ).find_overlaps()

            return L2GGoldStandard.from_otg_curation(
                gold_standard_curation=otg_curation,
                variant_index=variant_index,
                study_locus_overlap=study_locus_overlap,
                interactions=interactions,
            )
        case _:
            raise TypeError("Incorrect gold standard dataset provided.")

run_predict() -> None

Run the prediction step.

Raises:

Type Description
ValueError

If predictions_path is not provided for prediction mode

Source code in src/gentropy/l2g.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def run_predict(self) -> None:
    """Run the prediction step.

    Raises:
        ValueError: If predictions_path is not provided for prediction mode
    """
    if not self.predictions_path:
        raise ValueError("predictions_path must be provided for prediction mode")
    predictions = L2GPrediction.from_credible_set(
        self.session,
        self.credible_set,
        self.feature_matrix,
        self.features_list,
        model_path=self.model_path,
        hf_token=access_gcp_secret("hfhub-key", "open-targets-genetics-dev"),
        download_from_hub=self.download_from_hub,
    )
    predictions.filter(
        f.col("score") >= self.l2g_threshold
    ).add_locus_to_gene_features(
        self.feature_matrix, self.features_list
    ).df.coalesce(self.session.output_partitions).write.mode(
        self.session.write_mode
    ).parquet(self.predictions_path)
    self.session.logger.info("L2G predictions saved successfully.")

run_train() -> None

Run the training step.

Source code in src/gentropy/l2g.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def run_train(self) -> None:
    """Run the training step."""
    # Initialize access to weights and biases
    wandb_key = access_gcp_secret("wandb-key", "open-targets-genetics-dev")
    wandb_login(key=wandb_key)

    # Instantiate classifier and train model
    l2g_model = LocusToGeneModel(
        model=GradientBoostingClassifier(random_state=42, loss="log_loss"),
        hyperparameters=self.hyperparameters,
    )

    # Calculate the gold standard features
    feature_matrix = self._annotate_gold_standards_w_feature_matrix()

    # Run the training
    trained_model = LocusToGeneTrainer(
        model=l2g_model, feature_matrix=feature_matrix
    ).train(self.wandb_run_name, cross_validate=self.cross_validate)

    # Export the model
    if trained_model.training_data and trained_model.model and self.model_path:
        trained_model.save(self.model_path)
        if self.hf_hub_repo_id and self.hf_model_commit_message:
            hf_hub_token = access_gcp_secret(
                "hfhub-key", "open-targets-genetics-dev"
            )
            trained_model.export_to_hugging_face_hub(
                # we upload the model in the filesystem
                self.model_path.split("/")[-1],
                hf_hub_token,
                data=trained_model.training_data._df.drop(
                    "goldStandardSet", "geneId"
                ).toPandas(),
                repo_id=self.hf_hub_repo_id,
                commit_message=self.hf_model_commit_message,
            )

gentropy.l2g.LocusToGeneEvidenceStep

Locus to gene evidence step.

Source code in src/gentropy/l2g.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
class LocusToGeneEvidenceStep:
    """Locus to gene evidence step."""

    def __init__(
        self,
        session: Session,
        locus_to_gene_predictions_path: str,
        credible_set_path: str,
        study_index_path: str,
        evidence_output_path: str,
        locus_to_gene_threshold: float,
    ) -> None:
        """Initialise the step and generate disease/target evidence.

        Args:
            session (Session): Session object that contains the Spark session
            locus_to_gene_predictions_path (str): Path to the L2G predictions dataset
            credible_set_path (str): Path to the credible set dataset
            study_index_path (str): Path to the study index dataset
            evidence_output_path (str): Path to the L2G evidence output dataset. The output format is ndjson gzipped.
            locus_to_gene_threshold (float, optional): Threshold to consider a gene as a target. Defaults to 0.05.
        """
        # Reading the predictions
        locus_to_gene_prediction = L2GPrediction.from_parquet(
            session, locus_to_gene_predictions_path
        )
        # Reading the credible set
        credible_sets = StudyLocus.from_parquet(session, credible_set_path)

        # Reading the study index
        study_index = StudyIndex.from_parquet(session, study_index_path)

        # Generate evidence and save file:
        (
            locus_to_gene_prediction.to_disease_target_evidence(
                credible_sets, study_index, locus_to_gene_threshold
            )
            .coalesce(session.output_partitions)
            .write.mode(session.write_mode)
            .option("compression", "gzip")
            .json(evidence_output_path)
        )

__init__(session: Session, locus_to_gene_predictions_path: str, credible_set_path: str, study_index_path: str, evidence_output_path: str, locus_to_gene_threshold: float) -> None

Initialise the step and generate disease/target evidence.

Parameters:

Name Type Description Default
session Session

Session object that contains the Spark session

required
locus_to_gene_predictions_path str

Path to the L2G predictions dataset

required
credible_set_path str

Path to the credible set dataset

required
study_index_path str

Path to the study index dataset

required
evidence_output_path str

Path to the L2G evidence output dataset. The output format is ndjson gzipped.

required
locus_to_gene_threshold float

Threshold to consider a gene as a target. Defaults to 0.05.

required
Source code in src/gentropy/l2g.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def __init__(
    self,
    session: Session,
    locus_to_gene_predictions_path: str,
    credible_set_path: str,
    study_index_path: str,
    evidence_output_path: str,
    locus_to_gene_threshold: float,
) -> None:
    """Initialise the step and generate disease/target evidence.

    Args:
        session (Session): Session object that contains the Spark session
        locus_to_gene_predictions_path (str): Path to the L2G predictions dataset
        credible_set_path (str): Path to the credible set dataset
        study_index_path (str): Path to the study index dataset
        evidence_output_path (str): Path to the L2G evidence output dataset. The output format is ndjson gzipped.
        locus_to_gene_threshold (float, optional): Threshold to consider a gene as a target. Defaults to 0.05.
    """
    # Reading the predictions
    locus_to_gene_prediction = L2GPrediction.from_parquet(
        session, locus_to_gene_predictions_path
    )
    # Reading the credible set
    credible_sets = StudyLocus.from_parquet(session, credible_set_path)

    # Reading the study index
    study_index = StudyIndex.from_parquet(session, study_index_path)

    # Generate evidence and save file:
    (
        locus_to_gene_prediction.to_disease_target_evidence(
            credible_sets, study_index, locus_to_gene_threshold
        )
        .coalesce(session.output_partitions)
        .write.mode(session.write_mode)
        .option("compression", "gzip")
        .json(evidence_output_path)
    )

gentropy.l2g.LocusToGeneAssociationsStep

Locus to gene associations step.

Source code in src/gentropy/l2g.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
class LocusToGeneAssociationsStep:
    """Locus to gene associations step."""

    def __init__(
        self,
        session: Session,
        evidence_input_path: str,
        disease_index_path: str,
        direct_associations_output_path: str,
        indirect_associations_output_path: str,
    ) -> None:
        """Create direct and indirect association datasets.

        Args:
            session (Session): Session object that contains the Spark session
            evidence_input_path (str): Path to the L2G evidence input dataset
            disease_index_path (str): Path to disease index file
            direct_associations_output_path (str): Path to the direct associations output dataset
            indirect_associations_output_path (str): Path to the indirect associations output dataset
        """
        # Read in the disease index
        disease_index = session.spark.read.parquet(disease_index_path).select(
            f.col("id").alias("diseaseId"),
            f.explode("ancestors").alias("ancestorDiseaseId"),
        )

        # Read in the L2G evidence
        disease_target_evidence = session.spark.read.json(evidence_input_path).select(
            f.col("targetFromSourceId").alias("targetId"),
            f.col("diseaseFromSourceMappedId").alias("diseaseId"),
            f.col("resourceScore"),
        )

        # Generate direct assocations and save file
        (
            disease_target_evidence.groupBy("targetId", "diseaseId")
            .agg(f.collect_set("resourceScore").alias("scores"))
            .select(
                "targetId",
                "diseaseId",
                calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
            )
            .write.mode(session.write_mode)
            .parquet(direct_associations_output_path)
        )

        # Generate indirect assocations and save file
        (
            disease_target_evidence.join(disease_index, on="diseaseId", how="inner")
            .groupBy("targetId", "ancestorDiseaseId")
            .agg(f.collect_set("resourceScore").alias("scores"))
            .select(
                "targetId",
                "ancestorDiseaseId",
                calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
            )
            .write.mode(session.write_mode)
            .parquet(indirect_associations_output_path)
        )

__init__(session: Session, evidence_input_path: str, disease_index_path: str, direct_associations_output_path: str, indirect_associations_output_path: str) -> None

Create direct and indirect association datasets.

Parameters:

Name Type Description Default
session Session

Session object that contains the Spark session

required
evidence_input_path str

Path to the L2G evidence input dataset

required
disease_index_path str

Path to disease index file

required
direct_associations_output_path str

Path to the direct associations output dataset

required
indirect_associations_output_path str

Path to the indirect associations output dataset

required
Source code in src/gentropy/l2g.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
def __init__(
    self,
    session: Session,
    evidence_input_path: str,
    disease_index_path: str,
    direct_associations_output_path: str,
    indirect_associations_output_path: str,
) -> None:
    """Create direct and indirect association datasets.

    Args:
        session (Session): Session object that contains the Spark session
        evidence_input_path (str): Path to the L2G evidence input dataset
        disease_index_path (str): Path to disease index file
        direct_associations_output_path (str): Path to the direct associations output dataset
        indirect_associations_output_path (str): Path to the indirect associations output dataset
    """
    # Read in the disease index
    disease_index = session.spark.read.parquet(disease_index_path).select(
        f.col("id").alias("diseaseId"),
        f.explode("ancestors").alias("ancestorDiseaseId"),
    )

    # Read in the L2G evidence
    disease_target_evidence = session.spark.read.json(evidence_input_path).select(
        f.col("targetFromSourceId").alias("targetId"),
        f.col("diseaseFromSourceMappedId").alias("diseaseId"),
        f.col("resourceScore"),
    )

    # Generate direct assocations and save file
    (
        disease_target_evidence.groupBy("targetId", "diseaseId")
        .agg(f.collect_set("resourceScore").alias("scores"))
        .select(
            "targetId",
            "diseaseId",
            calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
        )
        .write.mode(session.write_mode)
        .parquet(direct_associations_output_path)
    )

    # Generate indirect assocations and save file
    (
        disease_target_evidence.join(disease_index, on="diseaseId", how="inner")
        .groupBy("targetId", "ancestorDiseaseId")
        .agg(f.collect_set("resourceScore").alias("scores"))
        .select(
            "targetId",
            "ancestorDiseaseId",
            calculate_harmonic_sum(f.col("scores")).alias("harmonicSum"),
        )
        .write.mode(session.write_mode)
        .parquet(indirect_associations_output_path)
    )