Skip to content

Javierre et al.

gentropy.datasource.intervals.javierre.IntervalsJavierre

Interval dataset from Javierre et al. 2016.

Source code in src/gentropy/datasource/intervals/javierre.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
class IntervalsJavierre:
    """Interval dataset from Javierre et al. 2016."""

    @staticmethod
    def read(spark: SparkSession, path: str) -> DataFrame:
        """Read Javierre dataset.

        Args:
            spark (SparkSession): Spark session
            path (str): Path to dataset

        Returns:
            DataFrame: Raw Javierre dataset
        """
        return spark.read.parquet(path)

    @classmethod
    def parse(
        cls: type[IntervalsJavierre],
        javierre_raw: DataFrame,
        gene_index: GeneIndex,
        lift: LiftOverSpark,
    ) -> Intervals:
        """Parse Javierre et al. 2016 dataset.

        Args:
            javierre_raw (DataFrame): Raw Javierre data
            gene_index (GeneIndex): Gene index
            lift (LiftOverSpark): LiftOverSpark instance

        Returns:
            Intervals: Javierre et al. 2016 interval data
        """
        # Constant values:
        dataset_name = "javierre2016"
        experiment_type = "pchic"
        pmid = "27863249"
        twosided_threshold = 2.45e6

        # Read Javierre data:
        javierre_parsed = (
            javierre_raw
            # Splitting name column into chromosome, start, end, and score:
            .withColumn("name_split", f.split(f.col("name"), r":|-|,"))
            .withColumn(
                "name_chr",
                f.regexp_replace(f.col("name_split")[0], "chr", "").cast(
                    t.StringType()
                ),
            )
            .withColumn("name_start", f.col("name_split")[1].cast(t.IntegerType()))
            .withColumn("name_end", f.col("name_split")[2].cast(t.IntegerType()))
            .withColumn("name_score", f.col("name_split")[3].cast(t.FloatType()))
            # Cleaning up chromosome:
            .withColumn(
                "chrom",
                f.regexp_replace(f.col("chrom"), "chr", "").cast(t.StringType()),
            )
            .drop("name_split", "name", "annotation")
            # Keep canonical chromosomes and consistent chromosomes with scores:
            .filter(
                (f.col("name_score").isNotNull())
                & (f.col("chrom") == f.col("name_chr"))
                & f.col("name_chr").isin(
                    [f"{x}" for x in range(1, 23)] + ["X", "Y", "MT"]
                )
            )
        )

        # Lifting over intervals:
        javierre_remapped = (
            javierre_parsed
            # Lifting over to GRCh38 interval 1:
            .transform(lambda df: lift.convert_intervals(df, "chrom", "start", "end"))
            .drop("start", "end")
            .withColumnRenamed("mapped_chrom", "chrom")
            .withColumnRenamed("mapped_start", "start")
            .withColumnRenamed("mapped_end", "end")
            # Lifting over interval 2 to GRCh38:
            .transform(
                lambda df: lift.convert_intervals(
                    df, "name_chr", "name_start", "name_end"
                )
            )
            .drop("name_start", "name_end")
            .withColumnRenamed("mapped_name_chr", "name_chr")
            .withColumnRenamed("mapped_name_start", "name_start")
            .withColumnRenamed("mapped_name_end", "name_end")
        )

        # Once the intervals are lifted, extracting the unique intervals:
        unique_intervals_with_genes = (
            javierre_remapped.select(
                f.col("chrom"),
                f.col("start").cast(t.IntegerType()),
                f.col("end").cast(t.IntegerType()),
            )
            .distinct()
            .alias("intervals")
            .join(
                gene_index.locations_lut().alias("genes"),
                on=[
                    f.col("intervals.chrom") == f.col("genes.chromosome"),
                    (
                        (f.col("intervals.start") >= f.col("genes.start"))
                        & (f.col("intervals.start") <= f.col("genes.end"))
                    )
                    | (
                        (f.col("intervals.end") >= f.col("genes.start"))
                        & (f.col("intervals.end") <= f.col("genes.end"))
                    ),
                ],
                how="left",
            )
            .select(
                f.col("intervals.chrom").alias("chrom"),
                f.col("intervals.start").alias("start"),
                f.col("intervals.end").alias("end"),
                f.col("genes.geneId").alias("geneId"),
                f.col("genes.tss").alias("tss"),
            )
        )

        # Joining back the data:
        return Intervals(
            _df=(
                javierre_remapped.join(
                    unique_intervals_with_genes,
                    on=["chrom", "start", "end"],
                    how="left",
                )
                .filter(
                    # Drop rows where the TSS is far from the start of the region
                    f.abs((f.col("start") + f.col("end")) / 2 - f.col("tss"))
                    <= twosided_threshold
                )
                # For each gene, keep only the highest scoring interval:
                .groupBy("name_chr", "name_start", "name_end", "geneId", "bio_feature")
                .agg(f.max(f.col("name_score")).alias("resourceScore"))
                # Create the output:
                .select(
                    f.col("name_chr").alias("chromosome"),
                    f.col("name_start").alias("start"),
                    f.col("name_end").alias("end"),
                    f.col("resourceScore").cast(t.DoubleType()),
                    f.col("geneId"),
                    f.col("bio_feature").alias("biofeature"),
                    f.lit(dataset_name).alias("datasourceId"),
                    f.lit(experiment_type).alias("datatypeId"),
                    f.lit(pmid).alias("pmid"),
                )
            ),
            _schema=Intervals.get_schema(),
        )

parse(javierre_raw: DataFrame, gene_index: GeneIndex, lift: LiftOverSpark) -> Intervals classmethod

Parse Javierre et al. 2016 dataset.

Parameters:

Name Type Description Default
javierre_raw DataFrame

Raw Javierre data

required
gene_index GeneIndex

Gene index

required
lift LiftOverSpark

LiftOverSpark instance

required

Returns:

Name Type Description
Intervals Intervals

Javierre et al. 2016 interval data

Source code in src/gentropy/datasource/intervals/javierre.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
@classmethod
def parse(
    cls: type[IntervalsJavierre],
    javierre_raw: DataFrame,
    gene_index: GeneIndex,
    lift: LiftOverSpark,
) -> Intervals:
    """Parse Javierre et al. 2016 dataset.

    Args:
        javierre_raw (DataFrame): Raw Javierre data
        gene_index (GeneIndex): Gene index
        lift (LiftOverSpark): LiftOverSpark instance

    Returns:
        Intervals: Javierre et al. 2016 interval data
    """
    # Constant values:
    dataset_name = "javierre2016"
    experiment_type = "pchic"
    pmid = "27863249"
    twosided_threshold = 2.45e6

    # Read Javierre data:
    javierre_parsed = (
        javierre_raw
        # Splitting name column into chromosome, start, end, and score:
        .withColumn("name_split", f.split(f.col("name"), r":|-|,"))
        .withColumn(
            "name_chr",
            f.regexp_replace(f.col("name_split")[0], "chr", "").cast(
                t.StringType()
            ),
        )
        .withColumn("name_start", f.col("name_split")[1].cast(t.IntegerType()))
        .withColumn("name_end", f.col("name_split")[2].cast(t.IntegerType()))
        .withColumn("name_score", f.col("name_split")[3].cast(t.FloatType()))
        # Cleaning up chromosome:
        .withColumn(
            "chrom",
            f.regexp_replace(f.col("chrom"), "chr", "").cast(t.StringType()),
        )
        .drop("name_split", "name", "annotation")
        # Keep canonical chromosomes and consistent chromosomes with scores:
        .filter(
            (f.col("name_score").isNotNull())
            & (f.col("chrom") == f.col("name_chr"))
            & f.col("name_chr").isin(
                [f"{x}" for x in range(1, 23)] + ["X", "Y", "MT"]
            )
        )
    )

    # Lifting over intervals:
    javierre_remapped = (
        javierre_parsed
        # Lifting over to GRCh38 interval 1:
        .transform(lambda df: lift.convert_intervals(df, "chrom", "start", "end"))
        .drop("start", "end")
        .withColumnRenamed("mapped_chrom", "chrom")
        .withColumnRenamed("mapped_start", "start")
        .withColumnRenamed("mapped_end", "end")
        # Lifting over interval 2 to GRCh38:
        .transform(
            lambda df: lift.convert_intervals(
                df, "name_chr", "name_start", "name_end"
            )
        )
        .drop("name_start", "name_end")
        .withColumnRenamed("mapped_name_chr", "name_chr")
        .withColumnRenamed("mapped_name_start", "name_start")
        .withColumnRenamed("mapped_name_end", "name_end")
    )

    # Once the intervals are lifted, extracting the unique intervals:
    unique_intervals_with_genes = (
        javierre_remapped.select(
            f.col("chrom"),
            f.col("start").cast(t.IntegerType()),
            f.col("end").cast(t.IntegerType()),
        )
        .distinct()
        .alias("intervals")
        .join(
            gene_index.locations_lut().alias("genes"),
            on=[
                f.col("intervals.chrom") == f.col("genes.chromosome"),
                (
                    (f.col("intervals.start") >= f.col("genes.start"))
                    & (f.col("intervals.start") <= f.col("genes.end"))
                )
                | (
                    (f.col("intervals.end") >= f.col("genes.start"))
                    & (f.col("intervals.end") <= f.col("genes.end"))
                ),
            ],
            how="left",
        )
        .select(
            f.col("intervals.chrom").alias("chrom"),
            f.col("intervals.start").alias("start"),
            f.col("intervals.end").alias("end"),
            f.col("genes.geneId").alias("geneId"),
            f.col("genes.tss").alias("tss"),
        )
    )

    # Joining back the data:
    return Intervals(
        _df=(
            javierre_remapped.join(
                unique_intervals_with_genes,
                on=["chrom", "start", "end"],
                how="left",
            )
            .filter(
                # Drop rows where the TSS is far from the start of the region
                f.abs((f.col("start") + f.col("end")) / 2 - f.col("tss"))
                <= twosided_threshold
            )
            # For each gene, keep only the highest scoring interval:
            .groupBy("name_chr", "name_start", "name_end", "geneId", "bio_feature")
            .agg(f.max(f.col("name_score")).alias("resourceScore"))
            # Create the output:
            .select(
                f.col("name_chr").alias("chromosome"),
                f.col("name_start").alias("start"),
                f.col("name_end").alias("end"),
                f.col("resourceScore").cast(t.DoubleType()),
                f.col("geneId"),
                f.col("bio_feature").alias("biofeature"),
                f.lit(dataset_name).alias("datasourceId"),
                f.lit(experiment_type).alias("datatypeId"),
                f.lit(pmid).alias("pmid"),
            )
        ),
        _schema=Intervals.get_schema(),
    )

read(spark: SparkSession, path: str) -> DataFrame staticmethod

Read Javierre dataset.

Parameters:

Name Type Description Default
spark SparkSession

Spark session

required
path str

Path to dataset

required

Returns:

Name Type Description
DataFrame DataFrame

Raw Javierre dataset

Source code in src/gentropy/datasource/intervals/javierre.py
21
22
23
24
25
26
27
28
29
30
31
32
@staticmethod
def read(spark: SparkSession, path: str) -> DataFrame:
    """Read Javierre dataset.

    Args:
        spark (SparkSession): Spark session
        path (str): Path to dataset

    Returns:
        DataFrame: Raw Javierre dataset
    """
    return spark.read.parquet(path)