Source code for modalysis.server.pileup
"""FastAPI routes for pileup operations."""
import logging
from fastapi import APIRouter
from modalysis.core import pileup as core_pileup
from modalysis.server.models import PileupFormatRequest, PileupMergeRequest
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/pileup")
[docs]
@router.post("/merge")
def pileup_merge(request: PileupMergeRequest) -> dict[str, str]:
"""Handle POST `/pileup/merge` and run core pileup merging."""
logger.info("Server received pileup merge request: %s", request)
core_pileup.merge(
pileup_paths=request.pileup_paths,
output_path=request.output_path,
output_name=request.output_name,
min_files=request.min_files,
min_file_coverage=request.min_file_coverage,
min_reads=request.min_reads,
)
return {"status": "success"}