let indel_realigner_map_reduce :
type a.
?compress:bool ->
configuration:(Indel_realigner.t * Realigner_target_creator.t) ->
run_with:Machine.t ->
?run_directory: string ->
a KEDSL.bam_or_bams ->
a =
fun ?compress
~configuration ~run_with
?run_directory
input_bam_or_bams ->
let open KEDSL in
begin match input_bam_or_bams with
| Single_bam bam_node ->
let all_nodes =
let f on_region =
indel_realigner ?compress
~on_region
~configuration ~run_with ?run_directory
input_bam_or_bams
in
let reference =
Machine.get_reference_genome run_with
bam_node#product#reference_build in
List.map ~f (Reference_genome.major_contigs reference)
in
let result_path =
Filename.chop_extension bam_node#product#path
^ indel_realigner_output_filename_tag
~configuration [bam_node]
^ "-merged.bam"
in
Samtools.merge_bams ~run_with all_nodes result_path
| Bam_workflow_list bams ->
let all_nodes =
let f on_region =
let bam_list_node =
indel_realigner ?compress
~on_region
~configuration ~run_with ?run_directory
input_bam_or_bams
in
let exploded = KEDSL.explode_bam_list_node bam_list_node in
exploded
in
let reference =
Machine.get_reference_genome run_with
(List.hd_exn bams)#product#reference_build in
List.map ~f (Reference_genome.major_contigs reference)
in
let merged_bams =
List.mapi bams ~f:(fun index bam ->
let all_regions_for_this_bam =
List.map all_nodes ~f:(fun region_n ->
List.nth region_n index |>
Option.value_exn ~msg:"bug in Gatk.indel_realigner_map_reduce")
in
let result_path =
Filename.chop_extension bam#product#path
^ sprintf "-%d-" index
^ indel_realigner_output_filename_tag
~configuration bams
^ "-merged.bam"
in
Samtools.merge_bams ~run_with all_regions_for_this_bam result_path
)
in
workflow_node ~name:"Indel-realigner-map-reduce"
~edges:(List.map merged_bams ~f:depends_on)
(bam_list (List.map merged_bams ~f:(fun n -> n#product)))
end