Skip to content

Andersson et al.

gentropy.datasource.intervals.andersson.IntervalsAndersson

Interval dataset from Andersson et al. 2014.

Source code in src/gentropy/datasource/intervals/andersson.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class IntervalsAndersson:
    """Interval dataset from Andersson et al. 2014."""

    @staticmethod
    def read(spark: SparkSession, path: str) -> DataFrame:
        """Read andersson2014 dataset.

        Args:
            spark (SparkSession): Spark session
            path (str): Path to the dataset

        Returns:
            DataFrame: Raw Andersson et al. dataframe
        """
        input_schema = t.StructType.fromJson(
            json.loads(
                pkg_resources.read_text(schemas, "andersson2014.json", encoding="utf-8")
            )
        )
        return (
            spark.read.option("delimiter", "\t")
            .option("mode", "DROPMALFORMED")
            .option("header", "true")
            .schema(input_schema)
            .csv(path)
        )

    @classmethod
    def parse(
        cls: type[IntervalsAndersson],
        raw_anderson_df: DataFrame,
        gene_index: GeneIndex,
        lift: LiftOverSpark,
    ) -> Intervals:
        """Parse Andersson et al. 2014 dataset.

        Args:
            raw_anderson_df (DataFrame): Raw Andersson et al. dataset
            gene_index (GeneIndex): Gene index
            lift (LiftOverSpark): LiftOverSpark instance

        Returns:
            Intervals: Intervals dataset
        """
        # Constant values:
        dataset_name = "andersson2014"
        experiment_type = "fantom5"
        pmid = "24670763"
        bio_feature = "aggregate"
        twosided_threshold = 2.45e6  # <-  this needs to phased out. Filter by percentile instead of absolute value.

        # Read the anderson file:
        parsed_anderson_df = (
            raw_anderson_df
            # Parsing score column and casting as float:
            .withColumn("score", f.col("score").cast("float") / f.lit(1000))
            # Parsing the 'name' column:
            .withColumn("parsedName", f.split(f.col("name"), ";"))
            .withColumn("gene_symbol", f.col("parsedName")[2])
            .withColumn("location", f.col("parsedName")[0])
            .withColumn(
                "chrom",
                f.regexp_replace(f.split(f.col("location"), ":|-")[0], "chr", ""),
            )
            .withColumn(
                "start", f.split(f.col("location"), ":|-")[1].cast(t.IntegerType())
            )
            .withColumn(
                "end", f.split(f.col("location"), ":|-")[2].cast(t.IntegerType())
            )
            # Select relevant columns:
            .select("chrom", "start", "end", "gene_symbol", "score")
            # Drop rows with non-canonical chromosomes:
            .filter(
                f.col("chrom").isin([str(x) for x in range(1, 23)] + ["X", "Y", "MT"])
            )
            # For each region/gene, keep only one row with the highest score:
            .groupBy("chrom", "start", "end", "gene_symbol")
            .agg(f.max("score").alias("resourceScore"))
            .orderBy("chrom", "start")
        )

        return Intervals(
            _df=(
                # Lift over the intervals:
                lift.convert_intervals(parsed_anderson_df, "chrom", "start", "end")
                .drop("start", "end")
                .withColumnRenamed("mapped_start", "start")
                .withColumnRenamed("mapped_end", "end")
                .distinct()
                # Joining with the gene index
                .alias("intervals")
                .join(
                    gene_index.symbols_lut().alias("genes"),
                    on=[
                        f.col("intervals.gene_symbol") == f.col("genes.geneSymbol"),
                        # Drop rows where the TSS is far from the start of the region
                        f.abs(
                            (f.col("intervals.start") + f.col("intervals.end")) / 2
                            - f.col("tss")
                        )
                        <= twosided_threshold,
                    ],
                    how="left",
                )
                # Select relevant columns:
                .select(
                    f.col("chrom").alias("chromosome"),
                    f.col("intervals.start").alias("start"),
                    f.col("intervals.end").alias("end"),
                    "geneId",
                    "resourceScore",
                    f.lit(dataset_name).alias("datasourceId"),
                    f.lit(experiment_type).alias("datatypeId"),
                    f.lit(pmid).alias("pmid"),
                    f.lit(bio_feature).alias("biofeature"),
                )
            ),
            _schema=Intervals.get_schema(),
        )

parse(raw_anderson_df: DataFrame, gene_index: GeneIndex, lift: LiftOverSpark) -> Intervals classmethod

Parse Andersson et al. 2014 dataset.

Parameters:

Name Type Description Default
raw_anderson_df DataFrame

Raw Andersson et al. dataset

required
gene_index GeneIndex

Gene index

required
lift LiftOverSpark

LiftOverSpark instance

required

Returns:

Name Type Description
Intervals Intervals

Intervals dataset

Source code in src/gentropy/datasource/intervals/andersson.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@classmethod
def parse(
    cls: type[IntervalsAndersson],
    raw_anderson_df: DataFrame,
    gene_index: GeneIndex,
    lift: LiftOverSpark,
) -> Intervals:
    """Parse Andersson et al. 2014 dataset.

    Args:
        raw_anderson_df (DataFrame): Raw Andersson et al. dataset
        gene_index (GeneIndex): Gene index
        lift (LiftOverSpark): LiftOverSpark instance

    Returns:
        Intervals: Intervals dataset
    """
    # Constant values:
    dataset_name = "andersson2014"
    experiment_type = "fantom5"
    pmid = "24670763"
    bio_feature = "aggregate"
    twosided_threshold = 2.45e6  # <-  this needs to phased out. Filter by percentile instead of absolute value.

    # Read the anderson file:
    parsed_anderson_df = (
        raw_anderson_df
        # Parsing score column and casting as float:
        .withColumn("score", f.col("score").cast("float") / f.lit(1000))
        # Parsing the 'name' column:
        .withColumn("parsedName", f.split(f.col("name"), ";"))
        .withColumn("gene_symbol", f.col("parsedName")[2])
        .withColumn("location", f.col("parsedName")[0])
        .withColumn(
            "chrom",
            f.regexp_replace(f.split(f.col("location"), ":|-")[0], "chr", ""),
        )
        .withColumn(
            "start", f.split(f.col("location"), ":|-")[1].cast(t.IntegerType())
        )
        .withColumn(
            "end", f.split(f.col("location"), ":|-")[2].cast(t.IntegerType())
        )
        # Select relevant columns:
        .select("chrom", "start", "end", "gene_symbol", "score")
        # Drop rows with non-canonical chromosomes:
        .filter(
            f.col("chrom").isin([str(x) for x in range(1, 23)] + ["X", "Y", "MT"])
        )
        # For each region/gene, keep only one row with the highest score:
        .groupBy("chrom", "start", "end", "gene_symbol")
        .agg(f.max("score").alias("resourceScore"))
        .orderBy("chrom", "start")
    )

    return Intervals(
        _df=(
            # Lift over the intervals:
            lift.convert_intervals(parsed_anderson_df, "chrom", "start", "end")
            .drop("start", "end")
            .withColumnRenamed("mapped_start", "start")
            .withColumnRenamed("mapped_end", "end")
            .distinct()
            # Joining with the gene index
            .alias("intervals")
            .join(
                gene_index.symbols_lut().alias("genes"),
                on=[
                    f.col("intervals.gene_symbol") == f.col("genes.geneSymbol"),
                    # Drop rows where the TSS is far from the start of the region
                    f.abs(
                        (f.col("intervals.start") + f.col("intervals.end")) / 2
                        - f.col("tss")
                    )
                    <= twosided_threshold,
                ],
                how="left",
            )
            # Select relevant columns:
            .select(
                f.col("chrom").alias("chromosome"),
                f.col("intervals.start").alias("start"),
                f.col("intervals.end").alias("end"),
                "geneId",
                "resourceScore",
                f.lit(dataset_name).alias("datasourceId"),
                f.lit(experiment_type).alias("datatypeId"),
                f.lit(pmid).alias("pmid"),
                f.lit(bio_feature).alias("biofeature"),
            )
        ),
        _schema=Intervals.get_schema(),
    )

read(spark: SparkSession, path: str) -> DataFrame staticmethod

Read andersson2014 dataset.

Parameters:

Name Type Description Default
spark SparkSession

Spark session

required
path str

Path to the dataset

required

Returns:

Name Type Description
DataFrame DataFrame

Raw Andersson et al. dataframe

Source code in src/gentropy/datasource/intervals/andersson.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@staticmethod
def read(spark: SparkSession, path: str) -> DataFrame:
    """Read andersson2014 dataset.

    Args:
        spark (SparkSession): Spark session
        path (str): Path to the dataset

    Returns:
        DataFrame: Raw Andersson et al. dataframe
    """
    input_schema = t.StructType.fromJson(
        json.loads(
            pkg_resources.read_text(schemas, "andersson2014.json", encoding="utf-8")
        )
    )
    return (
        spark.read.option("delimiter", "\t")
        .option("mode", "DROPMALFORMED")
        .option("header", "true")
        .schema(input_schema)
        .csv(path)
    )