Skip to content

Biosample index

gentropy.dataset.biosample_index.BiosampleIndex dataclass

Bases: Dataset

Biosample index dataset.

A Biosample index dataset captures the metadata of the biosamples (e.g. tissues, cell types, cell lines, etc) such as alternate names and relationships with other biosamples.

Source code in src/gentropy/dataset/biosample_index.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@dataclass
class BiosampleIndex(Dataset):
    """Biosample index dataset.

    A Biosample index dataset captures the metadata of the biosamples (e.g. tissues, cell types, cell lines, etc) such as alternate names and relationships with other biosamples.
    """

    @classmethod
    def get_schema(cls: type[BiosampleIndex]) -> StructType:
        """Provide the schema for the BiosampleIndex dataset.

        Returns:
            StructType: The schema of the BiosampleIndex dataset.
        """
        return parse_spark_schema("biosample_index.json")

    def merge_indices(
        self: BiosampleIndex,
        biosample_indices : list[BiosampleIndex]
        ) -> BiosampleIndex:
        """Merge a list of biosample indices into a single biosample index.

        Where there are conflicts, in single values - the first value is taken. In list values, the union of all values is taken.

        Args:
            biosample_indices (list[BiosampleIndex]): Biosample indices to merge.

        Returns:
            BiosampleIndex: Merged biosample index.
        """
        # Extract the DataFrames from the BiosampleIndex objects
        biosample_dfs = [biosample_index.df for biosample_index in biosample_indices] + [self.df]

        # Merge the DataFrames
        merged_df = reduce(DataFrame.unionAll, biosample_dfs)

        # Determine aggregation functions for each column
        # Currently this will take the first value for single values and merge lists for list values
        agg_funcs = []
        for field in merged_df.schema.fields:
            if field.name != "biosampleId":  # Skip the grouping column
                if field.dataType == ArrayType(StringType()):
                    agg_funcs.append(f.array_distinct(f.flatten(f.collect_list(field.name))).alias(field.name))
                else:
                    agg_funcs.append(f.first(f.col(field.name), ignorenulls=True).alias(field.name))

        # Perform aggregation
        aggregated_df = merged_df.groupBy("biosampleId").agg(*agg_funcs)

        return BiosampleIndex(
            _df=aggregated_df,
            _schema=BiosampleIndex.get_schema()
            )

    def retain_rows_with_ancestor_id(
        self: BiosampleIndex,
        ancestor_ids : list[str]
        ) -> BiosampleIndex:
        """Filter the biosample index to retain only rows with the given ancestor IDs.

        Args:
            ancestor_ids (list[str]): Ancestor IDs to filter on.

        Returns:
            BiosampleIndex: Filtered biosample index.
        """
        # Create a Spark array of ancestor IDs prior to filtering
        ancestor_ids_array = f.array(*[f.lit(id) for id in ancestor_ids])

        return BiosampleIndex(
            _df=self.df.filter(
                f.size(f.array_intersect(f.col("ancestors"), ancestor_ids_array)) > 0
            ),
            _schema=BiosampleIndex.get_schema()
            )

get_schema() -> StructType classmethod

Provide the schema for the BiosampleIndex dataset.

Returns:

Name Type Description
StructType StructType

The schema of the BiosampleIndex dataset.

Source code in src/gentropy/dataset/biosample_index.py
27
28
29
30
31
32
33
34
@classmethod
def get_schema(cls: type[BiosampleIndex]) -> StructType:
    """Provide the schema for the BiosampleIndex dataset.

    Returns:
        StructType: The schema of the BiosampleIndex dataset.
    """
    return parse_spark_schema("biosample_index.json")

merge_indices(biosample_indices: list[BiosampleIndex]) -> BiosampleIndex

Merge a list of biosample indices into a single biosample index.

Where there are conflicts, in single values - the first value is taken. In list values, the union of all values is taken.

Parameters:

Name Type Description Default
biosample_indices list[BiosampleIndex]

Biosample indices to merge.

required

Returns:

Name Type Description
BiosampleIndex BiosampleIndex

Merged biosample index.

Source code in src/gentropy/dataset/biosample_index.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def merge_indices(
    self: BiosampleIndex,
    biosample_indices : list[BiosampleIndex]
    ) -> BiosampleIndex:
    """Merge a list of biosample indices into a single biosample index.

    Where there are conflicts, in single values - the first value is taken. In list values, the union of all values is taken.

    Args:
        biosample_indices (list[BiosampleIndex]): Biosample indices to merge.

    Returns:
        BiosampleIndex: Merged biosample index.
    """
    # Extract the DataFrames from the BiosampleIndex objects
    biosample_dfs = [biosample_index.df for biosample_index in biosample_indices] + [self.df]

    # Merge the DataFrames
    merged_df = reduce(DataFrame.unionAll, biosample_dfs)

    # Determine aggregation functions for each column
    # Currently this will take the first value for single values and merge lists for list values
    agg_funcs = []
    for field in merged_df.schema.fields:
        if field.name != "biosampleId":  # Skip the grouping column
            if field.dataType == ArrayType(StringType()):
                agg_funcs.append(f.array_distinct(f.flatten(f.collect_list(field.name))).alias(field.name))
            else:
                agg_funcs.append(f.first(f.col(field.name), ignorenulls=True).alias(field.name))

    # Perform aggregation
    aggregated_df = merged_df.groupBy("biosampleId").agg(*agg_funcs)

    return BiosampleIndex(
        _df=aggregated_df,
        _schema=BiosampleIndex.get_schema()
        )

retain_rows_with_ancestor_id(ancestor_ids: list[str]) -> BiosampleIndex

Filter the biosample index to retain only rows with the given ancestor IDs.

Parameters:

Name Type Description Default
ancestor_ids list[str]

Ancestor IDs to filter on.

required

Returns:

Name Type Description
BiosampleIndex BiosampleIndex

Filtered biosample index.

Source code in src/gentropy/dataset/biosample_index.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def retain_rows_with_ancestor_id(
    self: BiosampleIndex,
    ancestor_ids : list[str]
    ) -> BiosampleIndex:
    """Filter the biosample index to retain only rows with the given ancestor IDs.

    Args:
        ancestor_ids (list[str]): Ancestor IDs to filter on.

    Returns:
        BiosampleIndex: Filtered biosample index.
    """
    # Create a Spark array of ancestor IDs prior to filtering
    ancestor_ids_array = f.array(*[f.lit(id) for id in ancestor_ids])

    return BiosampleIndex(
        _df=self.df.filter(
            f.size(f.array_intersect(f.col("ancestors"), ancestor_ids_array)) > 0
        ),
        _schema=BiosampleIndex.get_schema()
        )

Schema

root
 |-- biosampleId: string (nullable = false)
 |-- biosampleName: string (nullable = false)
 |-- description: string (nullable = true)
 |-- xrefs: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- synonyms: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- parents: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- ancestors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- descendants: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- children: array (nullable = true)
 |    |-- element: string (containsNull = true)