Skip to content

Datasets

The Dataset classes define the data model behind Open Targets Gentropy. Every class inherits from the Dataset class and contains a dataframe with a predefined schema that can be found in the respective classes.

gentropy.dataset.dataset.Dataset dataclass

Bases: ABC

Open Targets Gentropy Dataset.

Dataset is a wrapper around a Spark DataFrame with a predefined schema. Schemas for each child dataset are described in the schemas module.

Source code in src/gentropy/dataset/dataset.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@dataclass
class Dataset(ABC):
    """Open Targets Gentropy Dataset.

    `Dataset` is a wrapper around a Spark DataFrame with a predefined schema. Schemas for each child dataset are described in the `schemas` module.
    """

    _df: DataFrame
    _schema: StructType

    def __post_init__(self: Dataset) -> None:
        """Post init."""
        self.validate_schema()

    @property
    def df(self: Dataset) -> DataFrame:
        """Dataframe included in the Dataset.

        Returns:
            DataFrame: Dataframe included in the Dataset
        """
        return self._df

    @df.setter
    def df(self: Dataset, new_df: DataFrame) -> None:  # noqa: CCE001
        """Dataframe setter.

        Args:
            new_df (DataFrame): New dataframe to be included in the Dataset
        """
        self._df: DataFrame = new_df
        self.validate_schema()

    @property
    def schema(self: Dataset) -> StructType:
        """Dataframe expected schema.

        Returns:
            StructType: Dataframe expected schema
        """
        return self._schema

    @classmethod
    def _process_class_params(
        cls, params: dict[str, Any]
    ) -> tuple[dict[str, Any], dict[str, Any]]:
        """Separate class initialization parameters from spark session parameters.

        Args:
            params (dict[str, Any]): Combined parameters dictionary

        Returns:
            tuple[dict[str, Any], dict[str, Any]]: (class_params, spark_params)
        """
        # Get all field names from the class (including parent classes)
        class_field_names = {
            field.name
            for cls_ in cls.__mro__
            if hasattr(cls_, "__dataclass_fields__")
            for field in cls_.__dataclass_fields__.values()
        }
        # Separate parameters
        class_params = {k: v for k, v in params.items() if k in class_field_names}
        spark_params = {k: v for k, v in params.items() if k not in class_field_names}
        return class_params, spark_params

    @classmethod
    @abstractmethod
    def get_schema(cls: type[Self]) -> StructType:
        """Abstract method to get the schema. Must be implemented by child classes.

        Returns:
            StructType: Schema for the Dataset

        Raises:
                NotImplementedError: Must be implemented in the child classes
        """
        raise NotImplementedError("Must be implemented in the child classes")

    @classmethod
    def get_QC_column_name(cls: type[Self]) -> str | None:
        """Abstract method to get the QC column name. Assumes None unless overriden by child classes.

        Returns:
            str | None: Column name
        """
        return None

    @classmethod
    def get_QC_mappings(cls: type[Self]) -> dict[str, str]:
        """Method to get the mapping between QC flag and corresponding QC category value.

        Returns empty dict unless overriden by child classes.

        Returns:
            dict[str, str]: Mapping between flag name and QC column category value.
        """
        return {}

    @classmethod
    def from_parquet(
        cls: type[Self],
        session: Session,
        path: str | list[str],
        **kwargs: bool | float | int | str | None,
    ) -> Self:
        """Reads parquet into a Dataset with a given schema.

        Args:
            session (Session): Spark session
            path (str | list[str]): Path to the parquet dataset
            **kwargs (bool | float | int | str | None): Additional arguments to pass to spark.read.parquet

        Returns:
            Self: Dataset with the parquet file contents

        Raises:
            ValueError: Parquet file is empty
        """
        schema = cls.get_schema()

        # Separate class params from spark params
        class_params, spark_params = cls._process_class_params(kwargs)

        df = session.load_data(path, format="parquet", schema=schema, **spark_params)
        if df.isEmpty():
            raise ValueError(f"Parquet file is empty: {path}")
        return cls(_df=df, _schema=schema, **class_params)

    def filter(self: Self, condition: Column) -> Self:
        """Creates a new instance of a Dataset with the DataFrame filtered by the condition.

        Args:
            condition (Column): Condition to filter the DataFrame

        Returns:
            Self: Filtered Dataset
        """
        df = self._df.filter(condition)
        class_constructor = self.__class__
        return class_constructor(_df=df, _schema=class_constructor.get_schema())

    def validate_schema(self: Dataset) -> None:
        """Validate DataFrame schema against expected class schema.

        Raises:
            SchemaValidationError: If the DataFrame schema does not match the expected schema
        """
        expected_schema = self._schema
        observed_schema = self._df.schema

        # Unexpected fields in dataset
        if discrepancies := compare_struct_schemas(observed_schema, expected_schema):
            raise SchemaValidationError(
                f"Schema validation failed for {type(self).__name__}", discrepancies
            )

    def valid_rows(self: Self, invalid_flags: list[str], invalid: bool = False) -> Self:
        """Filters `Dataset` according to a list of quality control flags. Only `Dataset` classes with a QC column can be validated.

        This method checks do following steps:
        - Check if the Dataset contains a QC column.
        - Check if the invalid_flags exist in the QC mappings flags.
        - Filter the Dataset according to the invalid_flags and invalid parameters.

        Args:
            invalid_flags (list[str]): List of quality control flags to be excluded.
            invalid (bool): If True returns the invalid rows, instead of the valid. Defaults to False.

        Returns:
            Self: filtered dataset.

        Raises:
            ValueError: If the Dataset does not contain a QC column or if the invalid_flags elements do not exist in QC mappings flags.
        """
        # If the invalid flags are not valid quality checks (enum) for this Dataset we raise an error:
        invalid_reasons = []
        for flag in invalid_flags:
            if flag not in self.get_QC_mappings():
                raise ValueError(
                    f"{flag} is not a valid QC flag for {type(self).__name__} ({self.get_QC_mappings()})."
                )
            reason = self.get_QC_mappings()[flag]
            invalid_reasons.append(reason)

        qc_column_name = self.get_QC_column_name()
        # If Dataset (class) does not contain QC column we raise an error:
        if not qc_column_name:
            raise ValueError(
                f"{type(self).__name__} objects do not contain a QC column to filter by."
            )
        else:
            column: str = qc_column_name
            # If QC column (nullable) is not available in the dataframe we create an empty array:
            qc = f.when(f.col(column).isNull(), f.array()).otherwise(f.col(column))

        filterCondition = ~f.arrays_overlap(
            f.array([f.lit(i) for i in invalid_reasons]), qc
        )
        # Returning the filtered dataset:
        if invalid:
            return self.filter(~filterCondition)
        else:
            return self.filter(filterCondition)

    def drop_infinity_values(self: Self, *cols: str) -> Self:
        """Drop infinity values from Double typed column.

        Infinity type reference - https://spark.apache.org/docs/latest/sql-ref-datatypes.html#floating-point-special-values
        The implementation comes from https://stackoverflow.com/questions/34432998/how-to-replace-infinity-in-pyspark-dataframe

        Args:
            *cols (str): names of the columns to check for infinite values, these should be of DoubleType only!

        Returns:
            Self: Dataset after removing infinite values
        """
        if len(cols) == 0:
            return self
        inf_strings = ("Inf", "+Inf", "-Inf", "Infinity", "+Infinity", "-Infinity")
        inf_values = [f.lit(v).cast(DoubleType()) for v in inf_strings]
        conditions = [f.col(c).isin(inf_values) for c in cols]
        # reduce individual filter expressions with or statement
        # to col("beta").isin([lit(Inf)]) | col("beta").isin([lit(Inf)])...
        condition = reduce(lambda a, b: a | b, conditions)
        self.df = self._df.filter(~condition)
        return self

    def persist(self: Self) -> Self:
        """Persist in memory the DataFrame included in the Dataset.

        Returns:
            Self: Persisted Dataset
        """
        self.df = self._df.persist()
        return self

    def unpersist(self: Self) -> Self:
        """Remove the persisted DataFrame from memory.

        Returns:
            Self: Unpersisted Dataset
        """
        self.df = self._df.unpersist()
        return self

    def coalesce(self: Self, num_partitions: int, **kwargs: Any) -> Self:
        """Coalesce the DataFrame included in the Dataset.

        Coalescing is efficient for decreasing the number of partitions because it avoids a full shuffle of the data.

        Args:
            num_partitions (int): Number of partitions to coalesce to
            **kwargs (Any): Arguments to pass to the coalesce method

        Returns:
            Self: Coalesced Dataset
        """
        self.df = self._df.coalesce(num_partitions, **kwargs)
        return self

    def repartition(self: Self, num_partitions: int, **kwargs: Any) -> Self:
        """Repartition the DataFrame included in the Dataset.

        Repartitioning creates new partitions with data that is distributed evenly.

        Args:
            num_partitions (int): Number of partitions to repartition to
            **kwargs (Any): Arguments to pass to the repartition method

        Returns:
            Self: Repartitioned Dataset
        """
        self.df = self._df.repartition(num_partitions, **kwargs)
        return self

    @staticmethod
    def update_quality_flag(
        qc: Column, flag_condition: Column, flag_text: Enum
    ) -> Column:
        """Update the provided quality control list with a new flag if condition is met.

        Args:
            qc (Column): Array column with the current list of qc flags.
            flag_condition (Column): This is a column of booleans, signing which row should be flagged
            flag_text (Enum): Text for the new quality control flag

        Returns:
            Column: Array column with the updated list of qc flags.
        """
        qc = f.when(qc.isNull(), f.array()).otherwise(qc)
        return f.when(
            flag_condition,
            f.array_union(qc, f.array(f.lit(flag_text.value))),
        ).otherwise(qc)

    @staticmethod
    def flag_duplicates(test_column: Column) -> Column:
        """Return True for rows, where the value was already seen in column.

        This implementation allows keeping the first occurrence of the value.

        Args:
            test_column (Column): Column to check for duplicates

        Returns:
            Column: Column with a boolean flag for duplicates
        """
        return (
            f.row_number().over(Window.partitionBy(test_column).orderBy(f.rand())) > 1
        )

    @staticmethod
    def generate_identifier(uniqueness_defining_columns: list[str]) -> Column:
        """Hashes the provided columns to generate a unique identifier.

        Args:
            uniqueness_defining_columns (list[str]): list of columns defining uniqueness

        Returns:
            Column: column with a unique identifier
        """
        hashable_columns = [
            f.when(f.col(column).cast("string").isNull(), f.lit("None")).otherwise(
                f.col(column).cast("string")
            )
            for column in uniqueness_defining_columns
        ]
        return f.md5(f.concat(*hashable_columns))

df: DataFrame property writable

Dataframe included in the Dataset.

Returns:

Name Type Description
DataFrame DataFrame

Dataframe included in the Dataset

schema: StructType property

Dataframe expected schema.

Returns:

Name Type Description
StructType StructType

Dataframe expected schema

coalesce(num_partitions: int, **kwargs: Any) -> Self

Coalesce the DataFrame included in the Dataset.

Coalescing is efficient for decreasing the number of partitions because it avoids a full shuffle of the data.

Parameters:

Name Type Description Default
num_partitions int

Number of partitions to coalesce to

required
**kwargs Any

Arguments to pass to the coalesce method

{}

Returns:

Name Type Description
Self Self

Coalesced Dataset

Source code in src/gentropy/dataset/dataset.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def coalesce(self: Self, num_partitions: int, **kwargs: Any) -> Self:
    """Coalesce the DataFrame included in the Dataset.

    Coalescing is efficient for decreasing the number of partitions because it avoids a full shuffle of the data.

    Args:
        num_partitions (int): Number of partitions to coalesce to
        **kwargs (Any): Arguments to pass to the coalesce method

    Returns:
        Self: Coalesced Dataset
    """
    self.df = self._df.coalesce(num_partitions, **kwargs)
    return self

drop_infinity_values(*cols: str) -> Self

Drop infinity values from Double typed column.

Infinity type reference - https://spark.apache.org/docs/latest/sql-ref-datatypes.html#floating-point-special-values The implementation comes from https://stackoverflow.com/questions/34432998/how-to-replace-infinity-in-pyspark-dataframe

Parameters:

Name Type Description Default
*cols str

names of the columns to check for infinite values, these should be of DoubleType only!

()

Returns:

Name Type Description
Self Self

Dataset after removing infinite values

Source code in src/gentropy/dataset/dataset.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def drop_infinity_values(self: Self, *cols: str) -> Self:
    """Drop infinity values from Double typed column.

    Infinity type reference - https://spark.apache.org/docs/latest/sql-ref-datatypes.html#floating-point-special-values
    The implementation comes from https://stackoverflow.com/questions/34432998/how-to-replace-infinity-in-pyspark-dataframe

    Args:
        *cols (str): names of the columns to check for infinite values, these should be of DoubleType only!

    Returns:
        Self: Dataset after removing infinite values
    """
    if len(cols) == 0:
        return self
    inf_strings = ("Inf", "+Inf", "-Inf", "Infinity", "+Infinity", "-Infinity")
    inf_values = [f.lit(v).cast(DoubleType()) for v in inf_strings]
    conditions = [f.col(c).isin(inf_values) for c in cols]
    # reduce individual filter expressions with or statement
    # to col("beta").isin([lit(Inf)]) | col("beta").isin([lit(Inf)])...
    condition = reduce(lambda a, b: a | b, conditions)
    self.df = self._df.filter(~condition)
    return self

filter(condition: Column) -> Self

Creates a new instance of a Dataset with the DataFrame filtered by the condition.

Parameters:

Name Type Description Default
condition Column

Condition to filter the DataFrame

required

Returns:

Name Type Description
Self Self

Filtered Dataset

Source code in src/gentropy/dataset/dataset.py
156
157
158
159
160
161
162
163
164
165
166
167
def filter(self: Self, condition: Column) -> Self:
    """Creates a new instance of a Dataset with the DataFrame filtered by the condition.

    Args:
        condition (Column): Condition to filter the DataFrame

    Returns:
        Self: Filtered Dataset
    """
    df = self._df.filter(condition)
    class_constructor = self.__class__
    return class_constructor(_df=df, _schema=class_constructor.get_schema())

flag_duplicates(test_column: Column) -> Column staticmethod

Return True for rows, where the value was already seen in column.

This implementation allows keeping the first occurrence of the value.

Parameters:

Name Type Description Default
test_column Column

Column to check for duplicates

required

Returns:

Name Type Description
Column Column

Column with a boolean flag for duplicates

Source code in src/gentropy/dataset/dataset.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
@staticmethod
def flag_duplicates(test_column: Column) -> Column:
    """Return True for rows, where the value was already seen in column.

    This implementation allows keeping the first occurrence of the value.

    Args:
        test_column (Column): Column to check for duplicates

    Returns:
        Column: Column with a boolean flag for duplicates
    """
    return (
        f.row_number().over(Window.partitionBy(test_column).orderBy(f.rand())) > 1
    )

from_parquet(session: Session, path: str | list[str], **kwargs: bool | float | int | str | None) -> Self classmethod

Reads parquet into a Dataset with a given schema.

Parameters:

Name Type Description Default
session Session

Spark session

required
path str | list[str]

Path to the parquet dataset

required
**kwargs bool | float | int | str | None

Additional arguments to pass to spark.read.parquet

{}

Returns:

Name Type Description
Self Self

Dataset with the parquet file contents

Raises:

Type Description
ValueError

Parquet file is empty

Source code in src/gentropy/dataset/dataset.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def from_parquet(
    cls: type[Self],
    session: Session,
    path: str | list[str],
    **kwargs: bool | float | int | str | None,
) -> Self:
    """Reads parquet into a Dataset with a given schema.

    Args:
        session (Session): Spark session
        path (str | list[str]): Path to the parquet dataset
        **kwargs (bool | float | int | str | None): Additional arguments to pass to spark.read.parquet

    Returns:
        Self: Dataset with the parquet file contents

    Raises:
        ValueError: Parquet file is empty
    """
    schema = cls.get_schema()

    # Separate class params from spark params
    class_params, spark_params = cls._process_class_params(kwargs)

    df = session.load_data(path, format="parquet", schema=schema, **spark_params)
    if df.isEmpty():
        raise ValueError(f"Parquet file is empty: {path}")
    return cls(_df=df, _schema=schema, **class_params)

generate_identifier(uniqueness_defining_columns: list[str]) -> Column staticmethod

Hashes the provided columns to generate a unique identifier.

Parameters:

Name Type Description Default
uniqueness_defining_columns list[str]

list of columns defining uniqueness

required

Returns:

Name Type Description
Column Column

column with a unique identifier

Source code in src/gentropy/dataset/dataset.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@staticmethod
def generate_identifier(uniqueness_defining_columns: list[str]) -> Column:
    """Hashes the provided columns to generate a unique identifier.

    Args:
        uniqueness_defining_columns (list[str]): list of columns defining uniqueness

    Returns:
        Column: column with a unique identifier
    """
    hashable_columns = [
        f.when(f.col(column).cast("string").isNull(), f.lit("None")).otherwise(
            f.col(column).cast("string")
        )
        for column in uniqueness_defining_columns
    ]
    return f.md5(f.concat(*hashable_columns))

get_QC_column_name() -> str | None classmethod

Abstract method to get the QC column name. Assumes None unless overriden by child classes.

Returns:

Type Description
str | None

str | None: Column name

Source code in src/gentropy/dataset/dataset.py
106
107
108
109
110
111
112
113
@classmethod
def get_QC_column_name(cls: type[Self]) -> str | None:
    """Abstract method to get the QC column name. Assumes None unless overriden by child classes.

    Returns:
        str | None: Column name
    """
    return None

get_QC_mappings() -> dict[str, str] classmethod

Method to get the mapping between QC flag and corresponding QC category value.

Returns empty dict unless overriden by child classes.

Returns:

Type Description
dict[str, str]

dict[str, str]: Mapping between flag name and QC column category value.

Source code in src/gentropy/dataset/dataset.py
115
116
117
118
119
120
121
122
123
124
@classmethod
def get_QC_mappings(cls: type[Self]) -> dict[str, str]:
    """Method to get the mapping between QC flag and corresponding QC category value.

    Returns empty dict unless overriden by child classes.

    Returns:
        dict[str, str]: Mapping between flag name and QC column category value.
    """
    return {}

get_schema() -> StructType abstractmethod classmethod

Abstract method to get the schema. Must be implemented by child classes.

Returns:

Name Type Description
StructType StructType

Schema for the Dataset

Raises:

Type Description
NotImplementedError

Must be implemented in the child classes

Source code in src/gentropy/dataset/dataset.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
@classmethod
@abstractmethod
def get_schema(cls: type[Self]) -> StructType:
    """Abstract method to get the schema. Must be implemented by child classes.

    Returns:
        StructType: Schema for the Dataset

    Raises:
            NotImplementedError: Must be implemented in the child classes
    """
    raise NotImplementedError("Must be implemented in the child classes")

persist() -> Self

Persist in memory the DataFrame included in the Dataset.

Returns:

Name Type Description
Self Self

Persisted Dataset

Source code in src/gentropy/dataset/dataset.py
255
256
257
258
259
260
261
262
def persist(self: Self) -> Self:
    """Persist in memory the DataFrame included in the Dataset.

    Returns:
        Self: Persisted Dataset
    """
    self.df = self._df.persist()
    return self

repartition(num_partitions: int, **kwargs: Any) -> Self

Repartition the DataFrame included in the Dataset.

Repartitioning creates new partitions with data that is distributed evenly.

Parameters:

Name Type Description Default
num_partitions int

Number of partitions to repartition to

required
**kwargs Any

Arguments to pass to the repartition method

{}

Returns:

Name Type Description
Self Self

Repartitioned Dataset

Source code in src/gentropy/dataset/dataset.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def repartition(self: Self, num_partitions: int, **kwargs: Any) -> Self:
    """Repartition the DataFrame included in the Dataset.

    Repartitioning creates new partitions with data that is distributed evenly.

    Args:
        num_partitions (int): Number of partitions to repartition to
        **kwargs (Any): Arguments to pass to the repartition method

    Returns:
        Self: Repartitioned Dataset
    """
    self.df = self._df.repartition(num_partitions, **kwargs)
    return self

unpersist() -> Self

Remove the persisted DataFrame from memory.

Returns:

Name Type Description
Self Self

Unpersisted Dataset

Source code in src/gentropy/dataset/dataset.py
264
265
266
267
268
269
270
271
def unpersist(self: Self) -> Self:
    """Remove the persisted DataFrame from memory.

    Returns:
        Self: Unpersisted Dataset
    """
    self.df = self._df.unpersist()
    return self

update_quality_flag(qc: Column, flag_condition: Column, flag_text: Enum) -> Column staticmethod

Update the provided quality control list with a new flag if condition is met.

Parameters:

Name Type Description Default
qc Column

Array column with the current list of qc flags.

required
flag_condition Column

This is a column of booleans, signing which row should be flagged

required
flag_text Enum

Text for the new quality control flag

required

Returns:

Name Type Description
Column Column

Array column with the updated list of qc flags.

Source code in src/gentropy/dataset/dataset.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@staticmethod
def update_quality_flag(
    qc: Column, flag_condition: Column, flag_text: Enum
) -> Column:
    """Update the provided quality control list with a new flag if condition is met.

    Args:
        qc (Column): Array column with the current list of qc flags.
        flag_condition (Column): This is a column of booleans, signing which row should be flagged
        flag_text (Enum): Text for the new quality control flag

    Returns:
        Column: Array column with the updated list of qc flags.
    """
    qc = f.when(qc.isNull(), f.array()).otherwise(qc)
    return f.when(
        flag_condition,
        f.array_union(qc, f.array(f.lit(flag_text.value))),
    ).otherwise(qc)

valid_rows(invalid_flags: list[str], invalid: bool = False) -> Self

Filters Dataset according to a list of quality control flags. Only Dataset classes with a QC column can be validated.

This method checks do following steps: - Check if the Dataset contains a QC column. - Check if the invalid_flags exist in the QC mappings flags. - Filter the Dataset according to the invalid_flags and invalid parameters.

Parameters:

Name Type Description Default
invalid_flags list[str]

List of quality control flags to be excluded.

required
invalid bool

If True returns the invalid rows, instead of the valid. Defaults to False.

False

Returns:

Name Type Description
Self Self

filtered dataset.

Raises:

Type Description
ValueError

If the Dataset does not contain a QC column or if the invalid_flags elements do not exist in QC mappings flags.

Source code in src/gentropy/dataset/dataset.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def valid_rows(self: Self, invalid_flags: list[str], invalid: bool = False) -> Self:
    """Filters `Dataset` according to a list of quality control flags. Only `Dataset` classes with a QC column can be validated.

    This method checks do following steps:
    - Check if the Dataset contains a QC column.
    - Check if the invalid_flags exist in the QC mappings flags.
    - Filter the Dataset according to the invalid_flags and invalid parameters.

    Args:
        invalid_flags (list[str]): List of quality control flags to be excluded.
        invalid (bool): If True returns the invalid rows, instead of the valid. Defaults to False.

    Returns:
        Self: filtered dataset.

    Raises:
        ValueError: If the Dataset does not contain a QC column or if the invalid_flags elements do not exist in QC mappings flags.
    """
    # If the invalid flags are not valid quality checks (enum) for this Dataset we raise an error:
    invalid_reasons = []
    for flag in invalid_flags:
        if flag not in self.get_QC_mappings():
            raise ValueError(
                f"{flag} is not a valid QC flag for {type(self).__name__} ({self.get_QC_mappings()})."
            )
        reason = self.get_QC_mappings()[flag]
        invalid_reasons.append(reason)

    qc_column_name = self.get_QC_column_name()
    # If Dataset (class) does not contain QC column we raise an error:
    if not qc_column_name:
        raise ValueError(
            f"{type(self).__name__} objects do not contain a QC column to filter by."
        )
    else:
        column: str = qc_column_name
        # If QC column (nullable) is not available in the dataframe we create an empty array:
        qc = f.when(f.col(column).isNull(), f.array()).otherwise(f.col(column))

    filterCondition = ~f.arrays_overlap(
        f.array([f.lit(i) for i in invalid_reasons]), qc
    )
    # Returning the filtered dataset:
    if invalid:
        return self.filter(~filterCondition)
    else:
        return self.filter(filterCondition)

validate_schema() -> None

Validate DataFrame schema against expected class schema.

Raises:

Type Description
SchemaValidationError

If the DataFrame schema does not match the expected schema

Source code in src/gentropy/dataset/dataset.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def validate_schema(self: Dataset) -> None:
    """Validate DataFrame schema against expected class schema.

    Raises:
        SchemaValidationError: If the DataFrame schema does not match the expected schema
    """
    expected_schema = self._schema
    observed_schema = self._df.schema

    # Unexpected fields in dataset
    if discrepancies := compare_struct_schemas(observed_schema, expected_schema):
        raise SchemaValidationError(
            f"Schema validation failed for {type(self).__name__}", discrepancies
        )