struct   type 'a pipeline = 'a t   type workflow_option_failure_mode = [ `Silent | `Fail_if_not_happening ]   type workflow_option = [     | `Multi_sample_indel_realignment of workflow_option_failure_mode     | `Parallel_alignment_over_fastq_fragments of         [ `Bwa_mem | `Bwa | `Mosaik | `Star | `Hisat ] list         * workflow_option_failure_mode     | `Map_reduce of [ `Gatk_indel_realigner ]   ]   type t = {     reference_build: Reference_genome.name;     work_dir: string;     machine : Machine.t;     options: workflow_option list;     wrap_bam_node:       bam pipeline ->       KEDSL.bam_file KEDSL.workflow_node ->       KEDSL.bam_file KEDSL.workflow_node;     wrap_vcf_node:       vcf pipeline ->       KEDSL.vcf_file KEDSL.workflow_node ->       KEDSL.vcf_file KEDSL.workflow_node;     wrap_gtf_node:       gtf pipeline ->       KEDSL.single_file KEDSL.workflow_node ->       KEDSL.single_file KEDSL.workflow_node;   }   let create       ?(wrap_bam_node = fun _ x -> x)       ?(wrap_vcf_node = fun _ x -> x)       ?(wrap_gtf_node = fun _ x -> x)       ?(options=[])       ~reference_build ~work_dir ~machine () =     {reference_build; work_dir; machine; options;      wrap_bam_node; wrap_vcf_node; wrap_gtf_node}   let has_option {options; _} f =     List.exists options ~f   let apply_with_metadata ~metadata_spec wf =     begin match metadata_spec with     | `Add_tags tgs -> KEDSL.add_tags ~recursive:false wf tgs     | `Add_tags_rec tgs -> KEDSL.add_tags ~recursive:true wf tgs     end;     wf   (* This compiler variable name is confusing, perhaps 'state' is better? *)   let rec fastq_step ~read ~compiler (pipeline: fastq pipeline) =     let {work_dir; machine ; _ } = compiler in     match pipeline with     | Fastq f -> f     | Concat_text (l: fastq pipeline list) ->       failwith "Compilation of Biokepi.Pipeline.Concat_text: not implemented"     | Gunzip_concat (l: fastq_gz pipeline list) ->       let fastqs =         let rec f =           function           | Fastq_gz t -> t           | With_metadata (metadata_spec, p) ->             apply_with_metadata ~metadata_spec (f p)         in         List.map l ~f in       let result_path = work_dir // to_file_prefix ~read pipeline ^ ".fastq" in       dbg "Result_Path: %S" result_path;       Workflow_utilities.Gunzip.concat ~run_with:machine fastqs ~result_path     | With_metadata (metadata_spec, pipeline) ->       fastq_step ~read ~compiler pipeline |> apply_with_metadata ~metadata_spec   let rec fastq_sample_step ~compiler (fs : fastq_sample pipeline) =     let {work_dir; machine ; _ } = compiler in       match fs with       | Bam_to_fastq (how, what) ->         let bam = compile_aligner_step ~compiler what in         let sample_type =           match how with `Single -> `Single_end | `Paired -> `Paired_end in         let fastq_pair =           let output_prefix = work_dir // to_file_prefix ?read:None what in           Picard.bam_to_fastq ~run_with:machine ~sample_type             ~output_prefix bam         in         fastq_pair       | Paired_end_sample (info, l1, l2) ->         let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) l1 in         let r2 = fastq_step ~compiler ~read:(`R2 info.fragment_id) l2 in         let open KEDSL in         workflow_node (fastq_reads ~host:Machine.(as_host machine)                          ~name:info.sample_name                          r1#product#path (Some r2#product#path))           ~name:(sprintf "Pairing sample %s (%s and %s)"                    info.sample_name                    (Filename.basename r1#product#path)                    (Filename.basename r2#product#path))           ~edges:[ depends_on r1; depends_on r2 ]       | Single_end_sample (info, single) ->         let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) single in         let open KEDSL in         workflow_node (fastq_reads ~host:Machine.(as_host machine)                          ~name:info.sample_name                          r1#product#path None)           ~equivalence:`None           ~edges:[ depends_on r1 ]           ~name:(sprintf "single-end %s (%s)"                    info.sample_name                    (Filename.basename r1#product#path))       | With_metadata (metadata_spec, p) ->         fastq_sample_step ~compiler p |> apply_with_metadata ~metadata_spec   and compile_aligner_step ~compiler (pipeline : bam pipeline) =     let {reference_build; work_dir; machine; _} = compiler in     let result_prefix = work_dir // to_file_prefix pipeline in     dbg "Result_Prefix: %S" result_prefix;     let rec parallelize_alignment ~make_aligner sample =       match sample with       | Paired_end_sample (info,                            Gunzip_concat r1_list,                            Gunzip_concat r2_list) ->         let exploded =           let count = ref 0 in           List.map2 r1_list r2_list             ~f:(fun r1 r2 ->                 match r1, r2 with                 | (Fastq_gz wf1, Fastq_gz wf2) ->                   let new_info =                     incr count;                     {info with                      fragment_id =                        (* fragmenting = creating fragments of previous fragment *)                        sprintf "%s-fragment-%d" info.fragment_id !count} in                   make_aligner (                     Paired_end_sample (new_info,                                        Gunzip_concat [r1],                                        Gunzip_concat [r2]))                 | other -> failwith "compile_aligner_step: not implemented"               ) in         let bams = List.map exploded ~f:(compile_aligner_step ~compiler) in         Samtools.merge_bams ~run_with:machine bams           (result_prefix ^ "-merged.bam")       | With_metadata (metadata_spec, p) ->         parallelize_alignment ~make_aligner p         |> apply_with_metadata ~metadata_spec       | other ->         failwith "parallelize_alignment: Not fully implemented"     in     let catch_parallelize_aligner aligner sample =       let option =         List.find_map compiler.options           (function           | `Parallel_alignment_over_fastq_fragments (al, todo)             when List.mem ~set:al aligner -> Some todo           | _ -> None)       in       match sample with       | Paired_end_sample (name,                            Gunzip_concat r1_list,                            Gunzip_concat r2_list)         when List.length r1_list > 1 ->         begin match option with         | Some _ -> `Caught         | None -> `Not_caught         end       | Paired_end_sample (name,                            Gunzip_concat r1_list,                            Gunzip_concat r2_list)         when List.length r1_list = 1 -> `Not_caught       | other ->         begin match option with         | Some `Silent         | None -> `Not_caught         | Some `Fail_if_not_happening ->           failwithf "Option Parallel_alignment_over_fastq_fragments is set to Fail_if_not_happening and it didn't happen:\n%s"             (to_json other |> Yojson.Basic.pretty_to_string)         end     in     let perform_aligner_parallelization aligner_tag ~make_aligner         ~make_workflow sample =       begin match catch_parallelize_aligner aligner_tag sample  with       | `Caught -> parallelize_alignment ~make_aligner sample       | `Not_caught ->         let fastq = fastq_sample_step ~compiler sample in         make_workflow fastq       end     in     let bam_node =       match pipeline with       | Bam_sample (name, bam_target) -> bam_target       | Gatk_indel_realigner (configuration, bam)         when has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner)) ->         let input_bam = compile_aligner_step ~compiler bam in         Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false           ~configuration (KEDSL.Single_bam input_bam)       | Gatk_indel_realigner (configuration, bam) ->         let input_bam = compile_aligner_step ~compiler bam in         Gatk.indel_realigner ~run_with:machine ~compress:false           ~configuration (KEDSL.Single_bam input_bam)       | Gatk_bqsr (configuration, bam) ->         let input_bam = compile_aligner_step ~compiler bam in         let output_bam = result_prefix ^ ".bam" in         Gatk.base_quality_score_recalibrator           ~configuration ~run_with:machine ~input_bam ~output_bam       | Picard_mark_duplicates (settings, bam) ->         let input_bam = compile_aligner_step ~compiler bam in         let output_bam = result_prefix ^ ".bam" in         Picard.mark_duplicates ~settings           ~run_with:machine ~input_bam output_bam       | Bwa_mem (bwa_mem_config, fq_sample) ->         perform_aligner_parallelization           `Bwa_mem ~make_aligner:(fun pe -> Bwa_mem (bwa_mem_config, pe))           fq_sample           ~make_workflow:(fun fastq ->               Bwa.mem_align_to_sam ~reference_build                 ~configuration:bwa_mem_config                 ~fastq ~result_prefix ~run_with:machine ()               |> Samtools.sam_to_bam ~reference_build ~run_with:machine)       | Bwa (bwa_config, what) ->         perform_aligner_parallelization           `Bwa ~make_aligner:(fun pe -> Bwa (bwa_config, pe)) what           ~make_workflow:(fun fastq ->               Bwa.align_to_sam ~reference_build                 ~configuration:bwa_config                 ~fastq ~result_prefix ~run_with:machine ()               |> Samtools.sam_to_bam ~reference_build ~run_with:machine)       | Mosaik (what) ->         perform_aligner_parallelization           `Mosaik ~make_aligner:(fun pe -> Mosaik (pe)) what           ~make_workflow:(fun fastq -> Mosaik.align ~reference_build                 ~fastq ~result_prefix ~run_with:machine ())       | Star (configuration, what) ->         perform_aligner_parallelization           `Star ~make_aligner:(fun pe -> Star (configuration, pe)) what           ~make_workflow:(fun fastq ->               Star.align ~configuration ~reference_build                 ~fastq ~result_prefix ~run_with:machine ())       | Hisat (configuration, what) ->         perform_aligner_parallelization           `Hisat ~make_aligner:(fun pe -> Hisat (configuration, pe)) what           ~make_workflow:(fun fastq ->               Hisat.align ~configuration ~reference_build                 ~fastq ~result_prefix ~run_with:machine ()               |> Samtools.sam_to_bam ~reference_build ~run_with:machine             )       | With_metadata (metadata_spec, p)  ->         compile_aligner_step ~compiler p         |> apply_with_metadata ~metadata_spec     in     compiler.wrap_bam_node pipeline bam_node   let rec compile_bam_pair ~compiler (pipeline : bam_pair pipeline) :       [ `Normal of KEDSL.bam_file KEDSL.workflow_node ] *       [ `Tumor of KEDSL.bam_file KEDSL.workflow_node ] *       [ `Pipeline of bam_pair pipeline ]     =     let {reference_build; work_dir; machine; _} = compiler in     begin match pipeline with     | Bam_pair (         (Gatk_bqsr (n_bqsr_config, Gatk_indel_realigner (n_gir_conf, n_bam)))         ,         (Gatk_bqsr (t_bqsr_config, Gatk_indel_realigner (t_gir_conf, t_bam)))       )       when         has_option compiler           (function `Multi_sample_indel_realignment _ -> true | _ -> false)         && n_gir_conf = t_gir_conf ->       let normal = compile_aligner_step ~compiler n_bam in       let tumor = compile_aligner_step ~compiler t_bam in       let bam_list_node =         if has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner))         then (           Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false             ~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor])         ) else           Gatk.indel_realigner ~run_with:machine ~compress:false             ~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor])       in       begin match KEDSL.explode_bam_list_node bam_list_node with       | [realigned_normal; realigned_tumor] ->         let new_pipeline =           Bam_pair (             Gatk_bqsr (n_bqsr_config,                        Bam_sample (Filename.chop_extension realigned_normal#product#path,                                    realigned_normal)),             Gatk_bqsr (t_bqsr_config,                        Bam_sample (Filename.chop_extension realigned_tumor#product#path,                                    realigned_tumor)))         in         compile_bam_pair ~compiler new_pipeline       | other ->         failwithf "Gatk.indel_realigner did not return the correct list of length 2 (tumor, normal): it gave %d bams"           (List.length other)       end     | Bam_pair ( Gatk_bqsr (_, Gatk_indel_realigner (_, _)),                  Gatk_bqsr (_, Gatk_indel_realigner (_, _))) as bam_pair       when         has_option compiler           ((=) (`Multi_sample_indel_realignment `Fail_if_not_happening)) ->       failwithf "Option (`Multi_sample_indel_realignment `Fail_if_not_happening) is set and this pipeline does not qualify:\n%s"         (to_json bam_pair |> Yojson.Basic.pretty_to_string)     | Bam_pair (normal_t, tumor_t) as final_pipeline ->       let normal = compile_aligner_step ~compiler normal_t in       let tumor = compile_aligner_step ~compiler tumor_t in       (`Normal normal, `Tumor tumor, `Pipeline final_pipeline)     | With_metadata (metadata_spec, p) ->       let `Normal normal, `Tumor tumor, `Pipeline p =         compile_bam_pair ~compiler p in       (`Normal (apply_with_metadata ~metadata_spec normal),        `Tumor (apply_with_metadata ~metadata_spec tumor),        `Pipeline p)     end   let rec compile_variant_caller_step ~compiler (pipeline: vcf pipeline) =     let {reference_build; work_dir; machine; _} = compiler in     (* result prefix ignore optimizations *)     let vcf_node =       match pipeline with       | Somatic_variant_caller (som_vc, bam_pair) ->         let (`Normal normal, `Tumor tumor, `Pipeline new_bam_pair) =           compile_bam_pair ~compiler bam_pair in         let result_prefix =           work_dir           // to_file_prefix (Somatic_variant_caller (som_vc, new_bam_pair)) in         dbg "Result_Prefix: %S" result_prefix;         som_vc.Variant_caller.make_target           ~run_with:machine ~input:(Variant_caller.Somatic {normal; tumor})           ~result_prefix ()       | Germline_variant_caller (gvc, bam) ->         let result_prefix = work_dir // to_file_prefix pipeline in         dbg "Result_Prefix: %S" result_prefix;         let input_bam = compile_aligner_step ~compiler bam in         gvc.Variant_caller.make_target           ~run_with:machine ~input:(Variant_caller.Germline input_bam)           ~result_prefix ()       | With_metadata (metadata_spec, p) ->         compile_variant_caller_step ~compiler p         |> apply_with_metadata ~metadata_spec     in     compiler.wrap_vcf_node pipeline vcf_node   let rec compile_gtf_step ~compiler (pipeline: gtf pipeline) =     let {reference_build; work_dir; machine; _} = compiler in     let result_prefix = work_dir // to_file_prefix pipeline in     dbg "Result_Prefix: %S" result_prefix;     let gtf_node =       match pipeline with       | Stringtie (configuration, bam) ->         let bam = compile_aligner_step ~compiler bam in         Stringtie.run ~configuration           ~bam ~result_prefix ~run_with:machine ()       | With_metadata (metadata_spec, p) ->         compile_gtf_step ~compiler p         |> apply_with_metadata ~metadata_spec     in     compiler.wrap_gtf_node pipeline gtf_node   let rec seq2hla_hla_types_step ~compiler (pipeline : seq2hla_hla_types pipeline) =     let { machine ; work_dir; _ } = compiler in     match pipeline with     | Seq2HLA sample ->       let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in       (* TODO: Seq2HLA can actually take the gzipped version too, so we'd            need a unique type for that. *)       let fastq_pair = fastq_sample_step ~compiler sample in       let sample_name = fastq_pair#product#sample_name in       let r1 = KEDSL.read_1_file_node fastq_pair in       let r2 =         match KEDSL.read_2_file_node fastq_pair with         | Some r2 -> r2         | _ -> failwithf "Seq2HLA doesn't support Single_end_sample(s)."       in       Seq2HLA.hla_type         ~work_dir ~run_with:machine ~run_name:sample_name ~r1 ~r2     | With_metadata (metadata_spec, p) ->       seq2hla_hla_types_step ~compiler p       |> apply_with_metadata ~metadata_spec   let rec optitype_hla_types_step ~compiler (pipeline : optitype_hla_types pipeline) =     let { machine ; work_dir; _ } = compiler in     match pipeline with     | Optitype (kind, sample) ->       let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in       let fastq = fastq_sample_step ~compiler sample in       let sample_name = fastq#product#sample_name in       Optitype.hla_type         ~work_dir ~run_with:machine ~run_name:sample_name ~fastq kind     | With_metadata (metadata_spec, p) ->       optitype_hla_types_step ~compiler p       |> apply_with_metadata ~metadata_spec end