let merge_bams
~(run_with : Machine.t)
?(delete_input_on_success = true)
?(attach_rg_tag = false)
?(uncompressed_bam_output = false)
?(compress_level_one = false)
?(combine_rg_headers = false)
?(combine_pg_headers = false)
(input_bam_list : KEDSL.bam_file KEDSL.workflow_node list)
(output_bam_path : string) =
let open KEDSL in
let samtools = Machine.get_tool run_with Machine.Tool.Default.samtools in
let () =
let lgth = List.length input_bam_list in
if lgth < 2 then
failwithf "Samtools.merge_bams: %d input-bam%s provided and `samtools merge` will fail with less than 2 Bam files"
lgth
(match lgth with 1 -> " was" | _ -> "s were")
in
let sorted_bams =
List.map input_bam_list ~f:(sort_bam_if_necessary ~run_with ~by:`Coordinate) in
let options =
(if attach_rg_tag then ["-r"] else [])
@ (if uncompressed_bam_output then ["-u"] else [])
@ (if compress_level_one then ["-1"] else [])
@ (if combine_rg_headers then ["-c"] else [])
@ (if combine_pg_headers then ["-p"] else [])
in
let program =
let open Program in
Machine.Tool.(init samtools)
&& exec (
["samtools"; "merge"] @ options @ [output_bam_path]
@ List.map sorted_bams ~f:(fun bam -> bam#product#path)
)
in
let name =
sprintf "merge-%d-bams-into-%s" (List.length input_bam_list)
(Filename.basename output_bam_path) in
let make = Machine.run_program ~name run_with program in
let remove_input_bams =
List.map input_bam_list ~f:(fun src ->
on_success_activate (Remove.file ~run_with src#product#path))
in
let edges =
depends_on Machine.Tool.(ensure samtools)
:: on_failure_activate (Remove.file ~run_with output_bam_path)
:: List.map sorted_bams ~f:depends_on
@ if delete_input_on_success then remove_input_bams else []
in
let product =
let one =
List.hd input_bam_list
|> Option.value_exn
~msg:(sprintf "Samtools.merge_bams: input is empty list")
in
(transform_bam one#product ~path:output_bam_path) in
workflow_node ~name product ~make ~edges