Skip to content

L2G Model

gentropy.method.l2g.model.LocusToGeneModel dataclass

Wrapper for the Locus to Gene classifier.

Source code in src/gentropy/method/l2g/model.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
@dataclass
class LocusToGeneModel:
    """Wrapper for the Locus to Gene classifier."""

    features_list: list[str]
    estimator: Any = None
    pipeline: Pipeline = Pipeline(stages=[])
    model: PipelineModel | None = None
    wandb_l2g_project_name: str = "otg_l2g"

    def __post_init__(self: LocusToGeneModel) -> None:
        """Post init that adds the model to the ML pipeline."""
        label_indexer = StringIndexer(
            inputCol="goldStandardSet", outputCol="label", handleInvalid="keep"
        )
        vector_assembler = LocusToGeneModel.features_vector_assembler(
            self.features_list
        )

        self.pipeline = Pipeline(
            stages=[
                label_indexer,
                vector_assembler,
            ]
        )

    def save(self: LocusToGeneModel, path: str) -> None:
        """Saves fitted pipeline model to disk.

        Args:
            path (str): Path to save the model to

        Raises:
            ValueError: If the model has not been fitted yet
        """
        if self.model is None:
            raise ValueError("Model has not been fitted yet.")
        self.model.write().overwrite().save(path)

    @property
    def classifier(self: LocusToGeneModel) -> Any:
        """Return the model.

        Returns:
            Any: An estimator object from Spark ML
        """
        return self.estimator

    @staticmethod
    def features_vector_assembler(features_cols: list[str]) -> VectorAssembler:
        """Spark transformer to assemble the feature columns into a vector.

        Args:
            features_cols (list[str]): List of feature columns to assemble

        Returns:
            VectorAssembler: Spark transformer to assemble the feature columns into a vector

        Examples:
            >>> from pyspark.ml.feature import VectorAssembler
            >>> df = spark.createDataFrame([(5.2, 3.5)], schema="feature_1 FLOAT, feature_2 FLOAT")
            >>> assembler = LocusToGeneModel.features_vector_assembler(["feature_1", "feature_2"])
            >>> assembler.transform(df).show()
            +---------+---------+--------------------+
            |feature_1|feature_2|            features|
            +---------+---------+--------------------+
            |      5.2|      3.5|[5.19999980926513...|
            +---------+---------+--------------------+
            <BLANKLINE>
        """
        return (
            VectorAssembler(handleInvalid="error")
            .setInputCols(features_cols)
            .setOutputCol("features")
        )

    def log_to_wandb(
        self: LocusToGeneModel,
        results: DataFrame,
        training_data: L2GFeatureMatrix,
        evaluators: list[
            BinaryClassificationEvaluator | MulticlassClassificationEvaluator
        ],
        wandb_run: Run,
    ) -> None:
        """Log evaluation results and feature importance to W&B.

        Args:
            results (DataFrame): Dataframe containing the predictions
            training_data (L2GFeatureMatrix): Training data used for the model. If provided, the table and the number of positive and negative labels will be logged to W&B
            evaluators (list[BinaryClassificationEvaluator | MulticlassClassificationEvaluator]): List of Spark ML evaluators to use for evaluation
            wandb_run (Run): W&B run to log the results to
        """
        ## Track evaluation metrics
        for evaluator in evaluators:
            wandb_evaluator = WandbEvaluator(
                spark_ml_evaluator=evaluator, wandb_run=wandb_run
            )
            wandb_evaluator.evaluate(results)
        ## Track feature importance
        wandb_run.log({"importances": self.get_feature_importance()})
        ## Track training set
        training_table = Table(dataframe=training_data.df.toPandas())
        wandb_run.log({"trainingSet": training_table})
        # Count number of positive and negative labels
        gs_counts_dict = {
            "goldStandard" + row["goldStandardSet"].capitalize(): row["count"]
            for row in training_data.df.groupBy("goldStandardSet").count().collect()
        }
        wandb_run.log(gs_counts_dict)
        # Missingness rates
        wandb_run.log(
            {"missingnessRates": training_data.calculate_feature_missingness_rate()}
        )

    @classmethod
    def load_from_disk(
        cls: Type[LocusToGeneModel], path: str, features_list: list[str]
    ) -> LocusToGeneModel:
        """Load a fitted pipeline model from disk.

        Args:
            path (str): Path to the model
            features_list (list[str]): List of features used for the model

        Returns:
            LocusToGeneModel: L2G model loaded from disk
        """
        return cls(model=PipelineModel.load(path), features_list=features_list)

    @classifier.setter  # type: ignore
    def classifier(self: LocusToGeneModel, new_estimator: Any) -> None:
        """Set the model.

        Args:
            new_estimator (Any): An estimator object from Spark ML
        """
        self.estimator = new_estimator

    def get_param_grid(self: LocusToGeneModel) -> list[Any]:
        """Return the parameter grid for the model.

        Returns:
            list[Any]: List of parameter maps to use for cross validation
        """
        return (
            ParamGridBuilder()
            .addGrid(self.estimator.max_depth, [3, 5, 7])
            .addGrid(self.estimator.learning_rate, [0.01, 0.1, 1.0])
            .build()
        )

    def add_pipeline_stage(
        self: LocusToGeneModel, transformer: Transformer
    ) -> LocusToGeneModel:
        """Adds a stage to the L2G pipeline.

        Args:
            transformer (Transformer): Spark transformer to add to the pipeline

        Returns:
            LocusToGeneModel: L2G model with the new transformer

        Examples:
            >>> from pyspark.ml.regression import LinearRegression
            >>> estimator = LinearRegression()
            >>> test_model = LocusToGeneModel(features_list=["a", "b"])
            >>> print(len(test_model.pipeline.getStages()))
            2
            >>> print(len(test_model.add_pipeline_stage(estimator).pipeline.getStages()))
            3
        """
        pipeline_stages = self.pipeline.getStages()
        new_stages = pipeline_stages + [transformer]
        self.pipeline = Pipeline(stages=new_stages)
        return self

    def evaluate(
        self: LocusToGeneModel,
        results: DataFrame,
        hyperparameters: dict[str, Any],
        wandb_run_name: str | None,
        gold_standard_data: L2GFeatureMatrix | None = None,
    ) -> None:
        """Perform evaluation of the model predictions for the test set and track the results with W&B.

        Args:
            results (DataFrame): Dataframe containing the predictions
            hyperparameters (dict[str, Any]): Hyperparameters used for the model
            wandb_run_name (str | None): Descriptive name for the run to be tracked with W&B
            gold_standard_data (L2GFeatureMatrix | None): Feature matrix for the associations in the gold standard. If provided, the ratio of positive to negative labels will be logged to W&B
        """
        binary_evaluator = BinaryClassificationEvaluator(
            rawPredictionCol="rawPrediction", labelCol="label"
        )
        multi_evaluator = MulticlassClassificationEvaluator(
            labelCol="label", predictionCol="prediction"
        )

        if wandb_run_name and gold_standard_data:
            run = wandb_init(
                project=self.wandb_l2g_project_name,
                config=hyperparameters,
                name=wandb_run_name,
            )
            if isinstance(run, Run):
                self.log_to_wandb(
                    results,
                    gold_standard_data,
                    [binary_evaluator, multi_evaluator],
                    run,
                )
                run.finish()

    @property
    def feature_name_map(self: LocusToGeneModel) -> dict[str, str]:
        """Return a dictionary mapping encoded feature names to the original names.

        Returns:
            dict[str, str]: Feature name map of the model

        Raises:
            ValueError: If the model has not been fitted yet
        """
        if not self.model:
            raise ValueError("Model not fitted yet. `fit()` has to be called first.")
        elif isinstance(self.model.stages[1], VectorAssembler):
            feature_names = self.model.stages[1].getInputCols()
        return {f"f{i}": feature_name for i, feature_name in enumerate(feature_names)}

    def get_feature_importance(self: LocusToGeneModel) -> dict[str, float]:
        """Return dictionary with relative importances of every feature in the model. Feature names are encoded and have to be mapped back to their original names.

        Returns:
            dict[str, float]: Dictionary mapping feature names to their importance

        Raises:
            ValueError: If the model has not been fitted yet or is not an XGBoost model
        """
        if not self.model or not isinstance(
            self.model.stages[-1], SparkXGBClassifierModel
        ):
            raise ValueError(
                f"Model type {type(self.model)} not supported for feature importance."
            )
        importance_map = self.model.stages[-1].get_feature_importances()
        return {self.feature_name_map[k]: v for k, v in importance_map.items()}

    def fit(
        self: LocusToGeneModel,
        feature_matrix: L2GFeatureMatrix,
    ) -> LocusToGeneModel:
        """Fit the pipeline to the feature matrix dataframe.

        Args:
            feature_matrix (L2GFeatureMatrix): Feature matrix dataframe to fit the model to

        Returns:
            LocusToGeneModel: Fitted model
        """
        self.model = self.pipeline.fit(feature_matrix.df)
        return self

    def predict(
        self: LocusToGeneModel,
        feature_matrix: L2GFeatureMatrix,
    ) -> DataFrame:
        """Apply the model to a given feature matrix dataframe. The feature matrix needs to be preprocessed first.

        Args:
            feature_matrix (L2GFeatureMatrix): Feature matrix dataframe to apply the model to

        Returns:
            DataFrame: Dataframe with predictions

        Raises:
            ValueError: If the model has not been fitted yet
        """
        if not self.model:
            raise ValueError("Model not fitted yet. `fit()` has to be called first.")
        return self.model.transform(feature_matrix.df)

classifier: Any property writable

Return the model.

Returns:

Name Type Description
Any Any

An estimator object from Spark ML

feature_name_map: dict[str, str] property

Return a dictionary mapping encoded feature names to the original names.

Returns:

Type Description
dict[str, str]

dict[str, str]: Feature name map of the model

Raises:

Type Description
ValueError

If the model has not been fitted yet

add_pipeline_stage(transformer: Transformer) -> LocusToGeneModel

Adds a stage to the L2G pipeline.

Parameters:

Name Type Description Default
transformer Transformer

Spark transformer to add to the pipeline

required

Returns:

Name Type Description
LocusToGeneModel LocusToGeneModel

L2G model with the new transformer

Examples:

>>> from pyspark.ml.regression import LinearRegression
>>> estimator = LinearRegression()
>>> test_model = LocusToGeneModel(features_list=["a", "b"])
>>> print(len(test_model.pipeline.getStages()))
2
>>> print(len(test_model.add_pipeline_stage(estimator).pipeline.getStages()))
3
Source code in src/gentropy/method/l2g/model.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def add_pipeline_stage(
    self: LocusToGeneModel, transformer: Transformer
) -> LocusToGeneModel:
    """Adds a stage to the L2G pipeline.

    Args:
        transformer (Transformer): Spark transformer to add to the pipeline

    Returns:
        LocusToGeneModel: L2G model with the new transformer

    Examples:
        >>> from pyspark.ml.regression import LinearRegression
        >>> estimator = LinearRegression()
        >>> test_model = LocusToGeneModel(features_list=["a", "b"])
        >>> print(len(test_model.pipeline.getStages()))
        2
        >>> print(len(test_model.add_pipeline_stage(estimator).pipeline.getStages()))
        3
    """
    pipeline_stages = self.pipeline.getStages()
    new_stages = pipeline_stages + [transformer]
    self.pipeline = Pipeline(stages=new_stages)
    return self

evaluate(results: DataFrame, hyperparameters: dict[str, Any], wandb_run_name: str | None, gold_standard_data: L2GFeatureMatrix | None = None) -> None

Perform evaluation of the model predictions for the test set and track the results with W&B.

Parameters:

Name Type Description Default
results DataFrame

Dataframe containing the predictions

required
hyperparameters dict[str, Any]

Hyperparameters used for the model

required
wandb_run_name str | None

Descriptive name for the run to be tracked with W&B

required
gold_standard_data L2GFeatureMatrix | None

Feature matrix for the associations in the gold standard. If provided, the ratio of positive to negative labels will be logged to W&B

None
Source code in src/gentropy/method/l2g/model.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def evaluate(
    self: LocusToGeneModel,
    results: DataFrame,
    hyperparameters: dict[str, Any],
    wandb_run_name: str | None,
    gold_standard_data: L2GFeatureMatrix | None = None,
) -> None:
    """Perform evaluation of the model predictions for the test set and track the results with W&B.

    Args:
        results (DataFrame): Dataframe containing the predictions
        hyperparameters (dict[str, Any]): Hyperparameters used for the model
        wandb_run_name (str | None): Descriptive name for the run to be tracked with W&B
        gold_standard_data (L2GFeatureMatrix | None): Feature matrix for the associations in the gold standard. If provided, the ratio of positive to negative labels will be logged to W&B
    """
    binary_evaluator = BinaryClassificationEvaluator(
        rawPredictionCol="rawPrediction", labelCol="label"
    )
    multi_evaluator = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction"
    )

    if wandb_run_name and gold_standard_data:
        run = wandb_init(
            project=self.wandb_l2g_project_name,
            config=hyperparameters,
            name=wandb_run_name,
        )
        if isinstance(run, Run):
            self.log_to_wandb(
                results,
                gold_standard_data,
                [binary_evaluator, multi_evaluator],
                run,
            )
            run.finish()

features_vector_assembler(features_cols: list[str]) -> VectorAssembler staticmethod

Spark transformer to assemble the feature columns into a vector.

Parameters:

Name Type Description Default
features_cols list[str]

List of feature columns to assemble

required

Returns:

Name Type Description
VectorAssembler VectorAssembler

Spark transformer to assemble the feature columns into a vector

Examples:

>>> from pyspark.ml.feature import VectorAssembler
>>> df = spark.createDataFrame([(5.2, 3.5)], schema="feature_1 FLOAT, feature_2 FLOAT")
>>> assembler = LocusToGeneModel.features_vector_assembler(["feature_1", "feature_2"])
>>> assembler.transform(df).show()
+---------+---------+--------------------+
|feature_1|feature_2|            features|
+---------+---------+--------------------+
|      5.2|      3.5|[5.19999980926513...|
+---------+---------+--------------------+
Source code in src/gentropy/method/l2g/model.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@staticmethod
def features_vector_assembler(features_cols: list[str]) -> VectorAssembler:
    """Spark transformer to assemble the feature columns into a vector.

    Args:
        features_cols (list[str]): List of feature columns to assemble

    Returns:
        VectorAssembler: Spark transformer to assemble the feature columns into a vector

    Examples:
        >>> from pyspark.ml.feature import VectorAssembler
        >>> df = spark.createDataFrame([(5.2, 3.5)], schema="feature_1 FLOAT, feature_2 FLOAT")
        >>> assembler = LocusToGeneModel.features_vector_assembler(["feature_1", "feature_2"])
        >>> assembler.transform(df).show()
        +---------+---------+--------------------+
        |feature_1|feature_2|            features|
        +---------+---------+--------------------+
        |      5.2|      3.5|[5.19999980926513...|
        +---------+---------+--------------------+
        <BLANKLINE>
    """
    return (
        VectorAssembler(handleInvalid="error")
        .setInputCols(features_cols)
        .setOutputCol("features")
    )

fit(feature_matrix: L2GFeatureMatrix) -> LocusToGeneModel

Fit the pipeline to the feature matrix dataframe.

Parameters:

Name Type Description Default
feature_matrix L2GFeatureMatrix

Feature matrix dataframe to fit the model to

required

Returns:

Name Type Description
LocusToGeneModel LocusToGeneModel

Fitted model

Source code in src/gentropy/method/l2g/model.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def fit(
    self: LocusToGeneModel,
    feature_matrix: L2GFeatureMatrix,
) -> LocusToGeneModel:
    """Fit the pipeline to the feature matrix dataframe.

    Args:
        feature_matrix (L2GFeatureMatrix): Feature matrix dataframe to fit the model to

    Returns:
        LocusToGeneModel: Fitted model
    """
    self.model = self.pipeline.fit(feature_matrix.df)
    return self

get_feature_importance() -> dict[str, float]

Return dictionary with relative importances of every feature in the model. Feature names are encoded and have to be mapped back to their original names.

Returns:

Type Description
dict[str, float]

dict[str, float]: Dictionary mapping feature names to their importance

Raises:

Type Description
ValueError

If the model has not been fitted yet or is not an XGBoost model

Source code in src/gentropy/method/l2g/model.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def get_feature_importance(self: LocusToGeneModel) -> dict[str, float]:
    """Return dictionary with relative importances of every feature in the model. Feature names are encoded and have to be mapped back to their original names.

    Returns:
        dict[str, float]: Dictionary mapping feature names to their importance

    Raises:
        ValueError: If the model has not been fitted yet or is not an XGBoost model
    """
    if not self.model or not isinstance(
        self.model.stages[-1], SparkXGBClassifierModel
    ):
        raise ValueError(
            f"Model type {type(self.model)} not supported for feature importance."
        )
    importance_map = self.model.stages[-1].get_feature_importances()
    return {self.feature_name_map[k]: v for k, v in importance_map.items()}

get_param_grid() -> list[Any]

Return the parameter grid for the model.

Returns:

Type Description
list[Any]

list[Any]: List of parameter maps to use for cross validation

Source code in src/gentropy/method/l2g/model.py
167
168
169
170
171
172
173
174
175
176
177
178
def get_param_grid(self: LocusToGeneModel) -> list[Any]:
    """Return the parameter grid for the model.

    Returns:
        list[Any]: List of parameter maps to use for cross validation
    """
    return (
        ParamGridBuilder()
        .addGrid(self.estimator.max_depth, [3, 5, 7])
        .addGrid(self.estimator.learning_rate, [0.01, 0.1, 1.0])
        .build()
    )

load_from_disk(path: str, features_list: list[str]) -> LocusToGeneModel classmethod

Load a fitted pipeline model from disk.

Parameters:

Name Type Description Default
path str

Path to the model

required
features_list list[str]

List of features used for the model

required

Returns:

Name Type Description
LocusToGeneModel LocusToGeneModel

L2G model loaded from disk

Source code in src/gentropy/method/l2g/model.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
@classmethod
def load_from_disk(
    cls: Type[LocusToGeneModel], path: str, features_list: list[str]
) -> LocusToGeneModel:
    """Load a fitted pipeline model from disk.

    Args:
        path (str): Path to the model
        features_list (list[str]): List of features used for the model

    Returns:
        LocusToGeneModel: L2G model loaded from disk
    """
    return cls(model=PipelineModel.load(path), features_list=features_list)

log_to_wandb(results: DataFrame, training_data: L2GFeatureMatrix, evaluators: list[BinaryClassificationEvaluator | MulticlassClassificationEvaluator], wandb_run: Run) -> None

Log evaluation results and feature importance to W&B.

Parameters:

Name Type Description Default
results DataFrame

Dataframe containing the predictions

required
training_data L2GFeatureMatrix

Training data used for the model. If provided, the table and the number of positive and negative labels will be logged to W&B

required
evaluators list[BinaryClassificationEvaluator | MulticlassClassificationEvaluator]

List of Spark ML evaluators to use for evaluation

required
wandb_run Run

W&B run to log the results to

required
Source code in src/gentropy/method/l2g/model.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def log_to_wandb(
    self: LocusToGeneModel,
    results: DataFrame,
    training_data: L2GFeatureMatrix,
    evaluators: list[
        BinaryClassificationEvaluator | MulticlassClassificationEvaluator
    ],
    wandb_run: Run,
) -> None:
    """Log evaluation results and feature importance to W&B.

    Args:
        results (DataFrame): Dataframe containing the predictions
        training_data (L2GFeatureMatrix): Training data used for the model. If provided, the table and the number of positive and negative labels will be logged to W&B
        evaluators (list[BinaryClassificationEvaluator | MulticlassClassificationEvaluator]): List of Spark ML evaluators to use for evaluation
        wandb_run (Run): W&B run to log the results to
    """
    ## Track evaluation metrics
    for evaluator in evaluators:
        wandb_evaluator = WandbEvaluator(
            spark_ml_evaluator=evaluator, wandb_run=wandb_run
        )
        wandb_evaluator.evaluate(results)
    ## Track feature importance
    wandb_run.log({"importances": self.get_feature_importance()})
    ## Track training set
    training_table = Table(dataframe=training_data.df.toPandas())
    wandb_run.log({"trainingSet": training_table})
    # Count number of positive and negative labels
    gs_counts_dict = {
        "goldStandard" + row["goldStandardSet"].capitalize(): row["count"]
        for row in training_data.df.groupBy("goldStandardSet").count().collect()
    }
    wandb_run.log(gs_counts_dict)
    # Missingness rates
    wandb_run.log(
        {"missingnessRates": training_data.calculate_feature_missingness_rate()}
    )

predict(feature_matrix: L2GFeatureMatrix) -> DataFrame

Apply the model to a given feature matrix dataframe. The feature matrix needs to be preprocessed first.

Parameters:

Name Type Description Default
feature_matrix L2GFeatureMatrix

Feature matrix dataframe to apply the model to

required

Returns:

Name Type Description
DataFrame DataFrame

Dataframe with predictions

Raises:

Type Description
ValueError

If the model has not been fitted yet

Source code in src/gentropy/method/l2g/model.py
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def predict(
    self: LocusToGeneModel,
    feature_matrix: L2GFeatureMatrix,
) -> DataFrame:
    """Apply the model to a given feature matrix dataframe. The feature matrix needs to be preprocessed first.

    Args:
        feature_matrix (L2GFeatureMatrix): Feature matrix dataframe to apply the model to

    Returns:
        DataFrame: Dataframe with predictions

    Raises:
        ValueError: If the model has not been fitted yet
    """
    if not self.model:
        raise ValueError("Model not fitted yet. `fit()` has to be called first.")
    return self.model.transform(feature_matrix.df)

save(path: str) -> None

Saves fitted pipeline model to disk.

Parameters:

Name Type Description Default
path str

Path to save the model to

required

Raises:

Type Description
ValueError

If the model has not been fitted yet

Source code in src/gentropy/method/l2g/model.py
54
55
56
57
58
59
60
61
62
63
64
65
def save(self: LocusToGeneModel, path: str) -> None:
    """Saves fitted pipeline model to disk.

    Args:
        path (str): Path to save the model to

    Raises:
        ValueError: If the model has not been fitted yet
    """
    if self.model is None:
        raise ValueError("Model has not been fitted yet.")
    self.model.write().overwrite().save(path)