let indel_realigner :
type a.
?compress:bool ->
?on_region: Region.t ->
configuration:(Indel_realigner.t * Realigner_target_creator.t) ->
run_with:Machine.t ->
?run_directory: string ->
a KEDSL.bam_or_bams ->
a =
fun ?(compress=false)
?(on_region = `Full)
~configuration ~run_with ?run_directory
input_bam_or_bams ->
let open KEDSL in
let input_bam_1, more_input_bams =
match input_bam_or_bams with
| Single_bam bam -> bam, []
| Bam_workflow_list [] ->
failwithf "Empty bam-list in Gatk.indel_realigner`"
| Bam_workflow_list (one :: more) -> (one, more)
in
let run_directory =
match run_directory with
| None ->
let dir = Filename.dirname input_bam_1#product#path in
List.iter more_input_bams ~f:(fun bam ->
if Filename.dirname bam#product#path <> dir then
failwithf "These two BAMS are not in the same directory:\n\ %s\n\ %s\nGATK.indel_realigner when running on multiple bams requires a proper run-directory, clean-up your bams or provide the option ~run_directory
" input_bam_1#product#path bam#product#path
);
dir
| Some rundir -> rundir
in
let indel_config, target_config = configuration in
let input_sorted_bam_1 =
Samtools.sort_bam_if_necessary
~run_with ~by:`Coordinate input_bam_1 in
let more_input_sorted_bams =
List.map more_input_bams
~f:(Samtools.sort_bam_if_necessary
~run_with ~by:`Coordinate) in
let more_input_bams = `Use_the_sorted_ones_please in
let input_bam_1 = `Use_the_sorted_ones_please in
ignore (more_input_bams, input_bam_1);
let name =
sprintf "Indel Realignment on %s (%s)"
(Filename.basename input_sorted_bam_1#product#path)
(Region.to_filename on_region)
in
let gatk = Machine.get_tool run_with Machine.Tool.Default.gatk in
let reference_genome =
let reference_build = input_sorted_bam_1#product#reference_build in
Machine.get_reference_genome run_with reference_build in
let fasta = Reference_genome.fasta reference_genome in
let output_suffix =
indel_realigner_output_filename_tag
~configuration ~region:on_region
(input_sorted_bam_1 :: more_input_sorted_bams)
in
let intervals_file =
Filename.chop_suffix input_sorted_bam_1#product#path ".bam"
^ output_suffix ^ ".intervals" in
let output_bam_path input =
run_directory // (
Filename.chop_extension input#product#path ^ output_suffix ^ ".bam"
|> Filename.basename)
in
let processors = Machine.max_processors run_with in
let make =
let target_creation_args =
[
"-R"; Filename.quote fasta#product#path;
"-I"; Filename.quote input_sorted_bam_1#product#path;
"-o"; Filename.quote intervals_file;
"-nt"; Int.to_string processors;
]
@ Realigner_target_creator.render target_config
@ List.concat_map more_input_sorted_bams
~f:(fun bam -> ["-I"; Filename.quote bam#product#path])
in
let indel_real_args =
[ "-R"; fasta#product#path;
"-I"; input_sorted_bam_1#product#path;
"-targetIntervals"; intervals_file;
] @ Indel_realigner.render indel_config @
begin match more_input_sorted_bams with
| [] ->
["-o"; output_bam_path input_sorted_bam_1]
| more ->
List.concat_map more
~f:(fun b -> ["-I"; Filename.quote b#product#path])
@ ["--nWayOut"; output_suffix ^ ".bam"]
end
in
let intervals_option = Region.to_gatk_option on_region in
Machine.run_big_program run_with ~name ~processors
~self_ids:["gatk"; "indel-realigner"]
Program.(
Machine.Tool.(init gatk)
&& shf "cd %s" (Filename.quote run_directory)
&& shf "java -jar $GATK_JAR -T RealignerTargetCreator %s %s"
intervals_option
(String.concat ~sep:" " target_creation_args)
&& sh ("java -jar $GATK_JAR -T IndelRealigner "
^ intervals_option
^ (if compress then " " else " -compress 0 ")
^ (String.concat ~sep:" " indel_real_args)))
in
let edges =
let sequence_dict =
Picard.create_dict ~run_with fasta in
[
depends_on Machine.Tool.(ensure gatk);
depends_on fasta;
depends_on (Samtools.faidx ~run_with fasta);
depends_on sequence_dict;
on_failure_activate (Remove.file ~run_with intervals_file);
]
@ List.concat_map (input_sorted_bam_1 :: more_input_sorted_bams) ~f:(fun b -> [
depends_on b;
depends_on (Samtools.index_to_bai ~run_with b);
on_failure_activate (Remove.file ~run_with (output_bam_path b));
])
in
let node : type a. a bam_or_bams -> a =
function
| Single_bam _ ->
workflow_node ~name ~make ~edges
(transform_bam input_sorted_bam_1#product
(output_bam_path input_sorted_bam_1))
| Bam_workflow_list _ ->
workflow_node ~name ~make ~edges
(bam_list
(List.map (input_sorted_bam_1 :: more_input_sorted_bams)
~f:(fun b ->
transform_bam b#product (output_bam_path b))))
in
node input_bam_or_bams