"""Core DMR formatting, annotation, and aggregation routines."""
import csv
import logging
import zipfile
from pathlib import Path
from xml.sax.saxutils import escape
from modalysis.core.chromosomes import normalize_allowed_chromosomes
from modalysis.core.expression import parse_expression_field
from modalysis.core.gene_regions import (
build_gene_regions,
find_genes_overlapping_interval,
parse_gff,
)
logger = logging.getLogger(__name__)
EXPRESSION_PROFILES = ["UP", "DOWN", "NDE"]
EFFECT_SIGNS = ["NON_NEGATIVE", "NEGATIVE"]
REGIONS = ["PROMOTER", "BODY", "ENHANCER"]
[docs]
def _to_excel_column_name(column_index: int) -> str:
"""Convert 1-based integer column index to Excel letter notation."""
result = ""
while column_index > 0:
column_index, remainder = divmod(column_index - 1, 26)
result = chr(ord("A") + remainder) + result
return result
[docs]
def _excel_inline_string_cell(row: int, column: int, value: str) -> str:
"""Build XML for an inline string cell in an XLSX worksheet."""
ref = "%s%s" % (_to_excel_column_name(column), row)
return '<c r="%s" t="inlineStr"><is><t>%s</t></is></c>' % (ref, escape(value))
[docs]
def _excel_number_cell(row: int, column: int, value: int) -> str:
"""Build XML for a numeric cell in an XLSX worksheet."""
ref = "%s%s" % (_to_excel_column_name(column), row)
return '<c r="%s"><v>%s</v></c>' % (ref, value)
[docs]
def _write_gene_counts_excel(
output_path_with_name: Path,
manifestation_order: list[str],
modification_order: list[str],
count_lookup: dict[tuple[str, str, str, str, str], int],
) -> None:
"""Write a compact XLSX workbook for aggregated DMR gene counts."""
output_excel_path = output_path_with_name.with_suffix(".xlsx")
logger.info("Writing DMR gene counts Excel workbook: %s", output_excel_path)
rows_xml = []
merge_ranges = []
merge_ranges.append("A1:A3")
merge_ranges.append("B1:B3")
row_1_cells = [
_excel_inline_string_cell(1, 1, "MANIFESTATION"),
_excel_inline_string_cell(1, 2, "EXPRESSION_PROFILE"),
]
row_2_cells = []
row_3_cells = []
current_col = 3
for modification in modification_order:
modification_start_col = current_col
modification_span = len(EFFECT_SIGNS) * len(REGIONS)
modification_end_col = modification_start_col + modification_span - 1
row_1_cells.append(_excel_inline_string_cell(1, modification_start_col, modification))
merge_ranges.append(
"%s1:%s1"
% (
_to_excel_column_name(modification_start_col),
_to_excel_column_name(modification_end_col),
)
)
for effect_sign in EFFECT_SIGNS:
sign_start_col = current_col
sign_end_col = sign_start_col + len(REGIONS) - 1
row_2_cells.append(_excel_inline_string_cell(2, sign_start_col, effect_sign))
merge_ranges.append(
"%s2:%s2"
% (
_to_excel_column_name(sign_start_col),
_to_excel_column_name(sign_end_col),
)
)
for region in REGIONS:
row_3_cells.append(_excel_inline_string_cell(3, current_col, region))
current_col += 1
rows_xml.append('<row r="1">%s</row>' % "".join(row_1_cells))
rows_xml.append('<row r="2">%s</row>' % "".join(row_2_cells))
rows_xml.append('<row r="3">%s</row>' % "".join(row_3_cells))
row_num = 4
for manifestation in manifestation_order:
for expression_profile in EXPRESSION_PROFILES:
row_cells = [
_excel_inline_string_cell(row_num, 1, manifestation),
_excel_inline_string_cell(row_num, 2, expression_profile),
]
col_num = 3
for modification in modification_order:
for effect_sign in EFFECT_SIGNS:
for region in REGIONS:
key = (
manifestation,
expression_profile,
effect_sign,
modification,
region,
)
value = count_lookup.get(key, 0)
row_cells.append(_excel_number_cell(row_num, col_num, value))
col_num += 1
rows_xml.append('<row r="%s">%s</row>' % (row_num, "".join(row_cells)))
row_num += 1
merge_cells_xml = "".join(
['<mergeCell ref="%s"/>' % merge_range for merge_range in merge_ranges]
)
sheet_xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<worksheet xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main">'
'<sheetViews><sheetView workbookViewId="0"/></sheetViews>'
'<sheetFormatPr defaultRowHeight="15"/>'
'<sheetData>%s</sheetData>'
'<mergeCells count="%s">%s</mergeCells>'
'<pageMargins left="0.7" right="0.7" top="0.75" bottom="0.75" header="0.3" footer="0.3"/>'
"</worksheet>"
) % ("".join(rows_xml), len(merge_ranges), merge_cells_xml)
workbook_xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<workbook xmlns="http://schemas.openxmlformats.org/spreadsheetml/2006/main" '
'xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships">'
'<sheets><sheet name="GeneCounts" sheetId="1" r:id="rId1"/></sheets>'
"</workbook>"
)
workbook_rels_xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/worksheet" Target="worksheets/sheet1.xml"/>'
"</Relationships>"
)
root_rels_xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'<Relationship Id="rId1" Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" Target="xl/workbook.xml"/>'
"</Relationships>"
)
content_types_xml = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">'
'<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>'
'<Default Extension="xml" ContentType="application/xml"/>'
'<Override PartName="/xl/workbook.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/>'
'<Override PartName="/xl/worksheets/sheet1.xml" ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.worksheet+xml"/>'
"</Types>"
)
workbook_zip = zipfile.ZipFile(output_excel_path, "w", compression=zipfile.ZIP_DEFLATED)
workbook_zip.writestr("[Content_Types].xml", content_types_xml)
workbook_zip.writestr("_rels/.rels", root_rels_xml)
workbook_zip.writestr("xl/workbook.xml", workbook_xml)
workbook_zip.writestr("xl/_rels/workbook.xml.rels", workbook_rels_xml)
workbook_zip.writestr("xl/worksheets/sheet1.xml", sheet_xml)
workbook_zip.close()
[docs]
def annotate(
dmr_path: str,
gff_path: str,
output_path: str,
output_name: str,
) -> None:
"""Annotate DMR intervals with overlapping promoter/body/enhancer gene IDs."""
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info(
"Annotating DMR. DMR: %s, GFF: %s, Output: %s",
dmr_path,
gff_path,
output_path_with_name,
)
genes_by_chromosome = parse_gff(gff_path)
regions = build_gene_regions(genes_by_chromosome)
input_file = open(dmr_path, newline="")
output_file = open(output_path_with_name, "w", newline="")
reader = csv.reader(input_file, delimiter="\t")
writer = csv.writer(output_file, delimiter="\t")
header = next(reader)
writer.writerow(header + ["PROMOTER", "BODY", "ENHANCER"])
num_processed_rows = 0
num_annotated_rows = 0
for row in reader:
chromosome = row[0]
interval_start = int(row[1])
interval_end = int(row[2])
promoter_genes = []
body_genes = []
enhancer_genes = []
if chromosome in regions:
chrom_regions = regions[chromosome]
promoter_genes = find_genes_overlapping_interval(
interval_start,
interval_end,
chrom_regions["promoter"],
chrom_regions["promoter_starts"],
)
body_genes = find_genes_overlapping_interval(
interval_start,
interval_end,
chrom_regions["body"],
chrom_regions["body_starts"],
)
enhancer_genes = find_genes_overlapping_interval(
interval_start,
interval_end,
chrom_regions["enhancer"],
chrom_regions["enhancer_starts"],
)
promoter_str = ",".join(promoter_genes)
body_str = ",".join(body_genes)
enhancer_str = ",".join(enhancer_genes)
if promoter_genes or body_genes or enhancer_genes:
num_annotated_rows += 1
writer.writerow(row + [promoter_str, body_str, enhancer_str])
num_processed_rows += 1
logger.info(
"Annotated DMR. Processed %s rows. %s rows had gene annotations.",
num_processed_rows,
num_annotated_rows,
)
input_file.close()
output_file.close()
[docs]
def gene_counts(
annotated_dmr_paths: list[str],
manifestations: list[str],
modifications: list[str],
manifestation_labels: list[str],
expression_labels: list[str],
annotated_gff_path: str,
output_path: str,
output_name: str,
output_excel: bool = False,
) -> None:
"""Count unique genes by manifestation/expression/effect/modification/region."""
if len(annotated_dmr_paths) != len(manifestations):
raise ValueError(
"Number of annotated DMR paths (%d) must match number of manifestations (%d)"
% (len(annotated_dmr_paths), len(manifestations))
)
if len(annotated_dmr_paths) != len(modifications):
raise ValueError(
"Number of annotated DMR paths (%d) must match number of modifications (%d)"
% (len(annotated_dmr_paths), len(modifications))
)
if len(manifestation_labels) != len(expression_labels):
raise ValueError(
"Number of manifestation labels (%d) must match number of expression labels (%d)"
% (len(manifestation_labels), len(expression_labels))
)
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info(
"Building DMR gene counts table. Annotated GFF: %s, Output: %s",
annotated_gff_path,
output_path_with_name,
)
manifestation_to_expression_label = {}
for manifestation_label, expression_label in zip(
manifestation_labels, expression_labels
):
manifestation_to_expression_label[manifestation_label.strip().upper()] = (
expression_label.strip().upper()
)
gene_to_expression = {}
gff_file = open(annotated_gff_path, newline="")
gff_reader = csv.DictReader(gff_file, delimiter="\t")
for row in gff_reader:
gene_id = row["GENE_ID"].strip().upper()
expression_by_label = parse_expression_field(row.get("EXPRESSION", ""))
gene_to_expression[gene_id] = expression_by_label
gff_file.close()
genes_by_combination = {}
file_constraints = []
num_processed_rows = 0
for dmr_path, manifestation, modification in zip(
annotated_dmr_paths, manifestations, modifications
):
normalized_manifestation = manifestation.strip().upper()
normalized_modification = modification.strip().upper()
file_constraints.append(
(dmr_path, normalized_manifestation, normalized_modification)
)
if normalized_manifestation not in manifestation_to_expression_label:
raise ValueError(
"No expression label configured for manifestation '%s'"
% normalized_manifestation
)
expression_label = manifestation_to_expression_label[normalized_manifestation]
dmr_file = open(dmr_path, newline="")
dmr_reader = csv.DictReader(dmr_file, delimiter="\t")
for row in dmr_reader:
effect_size = float(row["EFFECT_SIZE"])
effect_sign = "NON_NEGATIVE" if effect_size >= 0 else "NEGATIVE"
for region in REGIONS:
genes_field = row[region].strip()
if not genes_field:
continue
genes = [gene.strip().upper() for gene in genes_field.split(",")]
for gene in genes:
if not gene:
continue
if gene not in gene_to_expression:
continue
expression_profile = gene_to_expression[gene].get(expression_label)
if expression_profile not in EXPRESSION_PROFILES:
continue
key = (
dmr_path,
normalized_manifestation,
expression_profile,
effect_sign,
normalized_modification,
region,
)
if key not in genes_by_combination:
genes_by_combination[key] = set()
genes_by_combination[key].add(gene)
num_processed_rows += 1
dmr_file.close()
output_file = open(output_path_with_name, "w", newline="")
writer = csv.writer(output_file, delimiter="\t")
writer.writerow(
[
"MANIFESTATION",
"EXPRESSION_PROFILE",
"EFFECT_SIZE_SIGN",
"MODIFICATION",
"REGION",
"GENE_COUNT",
]
)
for dmr_path, manifestation, modification in file_constraints:
for expression_profile in EXPRESSION_PROFILES:
for effect_sign in EFFECT_SIGNS:
for region in REGIONS:
key = (
dmr_path,
manifestation,
expression_profile,
effect_sign,
modification,
region,
)
genes = genes_by_combination.get(key, set())
writer.writerow(
[
manifestation,
expression_profile,
effect_sign,
modification,
region,
len(genes),
]
)
logger.info(
"Built DMR gene counts table. Processed %s DMR rows from %s files.",
num_processed_rows,
len(annotated_dmr_paths),
)
output_file.close()
if output_excel:
manifestation_order = []
for manifestation_label in manifestation_labels:
normalized_manifestation = manifestation_label.strip().upper()
if normalized_manifestation not in manifestation_order:
manifestation_order.append(normalized_manifestation)
modification_order = []
for modification_label in modifications:
normalized_modification = modification_label.strip().upper()
if normalized_modification not in modification_order:
modification_order.append(normalized_modification)
combined_gene_sets = {}
for key, genes in genes_by_combination.items():
(
_dmr_path,
manifestation,
expression_profile,
effect_sign,
modification,
region,
) = key
aggregate_key = (
manifestation,
expression_profile,
effect_sign,
modification,
region,
)
if aggregate_key not in combined_gene_sets:
combined_gene_sets[aggregate_key] = set()
combined_gene_sets[aggregate_key].update(genes)
count_lookup = {
key: len(genes) for key, genes in combined_gene_sets.items()
}
_write_gene_counts_excel(
output_path_with_name,
manifestation_order,
modification_order,
count_lookup,
)
[docs]
def common_genes(
annotated_dmr_paths: list[str],
manifestations: list[str],
modifications: list[str],
manifestation_labels: list[str],
expression_labels: list[str],
modification_a: str,
modification_b: str,
annotated_gff_path: str,
output_path: str,
output_name: str,
) -> None:
"""Find common negative-effect genes across two modifications by region."""
if len(annotated_dmr_paths) != len(manifestations):
raise ValueError(
"Number of annotated DMR paths (%d) must match number of manifestations (%d)"
% (len(annotated_dmr_paths), len(manifestations))
)
if len(annotated_dmr_paths) != len(modifications):
raise ValueError(
"Number of annotated DMR paths (%d) must match number of modifications (%d)"
% (len(annotated_dmr_paths), len(modifications))
)
if len(manifestation_labels) != len(expression_labels):
raise ValueError(
"Number of manifestation labels (%d) must match number of expression labels (%d)"
% (len(manifestation_labels), len(expression_labels))
)
normalized_modification_a = modification_a.strip().upper()
normalized_modification_b = modification_b.strip().upper()
if normalized_modification_a == normalized_modification_b:
raise ValueError("Modification A and B must be different")
output_path_with_name = (Path(output_path) / output_name).with_suffix(".modalysis")
logger.info(
"Building common DMR genes table. Annotated GFF: %s, Output: %s",
annotated_gff_path,
output_path_with_name,
)
manifestation_to_expression_label = {}
for manifestation_label, expression_label in zip(
manifestation_labels, expression_labels
):
manifestation_to_expression_label[manifestation_label.strip().upper()] = (
expression_label.strip().upper()
)
gene_to_expression = {}
gff_file = open(annotated_gff_path, newline="")
gff_reader = csv.DictReader(gff_file, delimiter="\t")
for row in gff_reader:
gene_id = row["GENE_ID"].strip().upper()
expression_by_label = parse_expression_field(row.get("EXPRESSION", ""))
gene_to_expression[gene_id] = expression_by_label
gff_file.close()
genes_by_manifestation_modification_region = {}
manifestation_order = []
for manifestation_label in manifestation_labels:
normalized_manifestation = manifestation_label.strip().upper()
if normalized_manifestation not in manifestation_order:
manifestation_order.append(normalized_manifestation)
num_processed_rows = 0
for dmr_path, manifestation, modification in zip(
annotated_dmr_paths, manifestations, modifications
):
normalized_manifestation = manifestation.strip().upper()
normalized_modification = modification.strip().upper()
dmr_file = open(dmr_path, newline="")
dmr_reader = csv.DictReader(dmr_file, delimiter="\t")
for row in dmr_reader:
effect_size = float(row["EFFECT_SIZE"])
if effect_size >= 0:
num_processed_rows += 1
continue
for region in REGIONS:
genes_field = row[region].strip()
if not genes_field:
continue
key = (normalized_manifestation, normalized_modification, region)
if key not in genes_by_manifestation_modification_region:
genes_by_manifestation_modification_region[key] = set()
genes = [gene.strip().upper() for gene in genes_field.split(",")]
for gene in genes:
if not gene:
continue
genes_by_manifestation_modification_region[key].add(gene)
num_processed_rows += 1
dmr_file.close()
output_file = open(output_path_with_name, "w", newline="")
writer = csv.writer(output_file, delimiter="\t")
writer.writerow(
[
"ROW_TYPE",
"MANIFESTATION",
"REGION",
"MODIFICATION_A",
"MODIFICATION_B",
"COMMON_GENE_COUNT",
"GENE_ID",
"EXPRESSION_STATUS",
]
)
num_common_genes = 0
num_summary_rows = 0
for manifestation in manifestation_order:
if manifestation not in manifestation_to_expression_label:
raise ValueError(
"No expression label configured for manifestation '%s'" % manifestation
)
expression_label = manifestation_to_expression_label[manifestation]
for region in REGIONS:
genes_mod_a = genes_by_manifestation_modification_region.get(
(manifestation, normalized_modification_a, region), set()
)
genes_mod_b = genes_by_manifestation_modification_region.get(
(manifestation, normalized_modification_b, region), set()
)
region_common_genes = sorted(genes_mod_a.intersection(genes_mod_b))
writer.writerow(
[
"SUMMARY",
manifestation,
region,
normalized_modification_a,
normalized_modification_b,
len(region_common_genes),
"",
"",
]
)
num_summary_rows += 1
for gene in region_common_genes:
expression_status = gene_to_expression.get(gene, {}).get(
expression_label, ""
)
writer.writerow(
[
"GENE",
manifestation,
region,
normalized_modification_a,
normalized_modification_b,
"",
gene,
expression_status,
]
)
num_common_genes += 1
logger.info(
"Built common DMR genes table. Processed %s DMR rows. Wrote %s summary rows and %s common genes.",
num_processed_rows,
num_summary_rows,
num_common_genes,
)
output_file.close()