(* code generated with [./tools/build-doc.sh ketrew,ppx_deriving.std] *) module Common_pipelines = struct (**************************************************************************) (* Copyright 2015, Sebastien Mondet *) (* *) (* Licensed under the Apache License, Version 2.0 (the "License"); *) (* you may not use this file except in compliance with the License. *) (* You may obtain a copy of the License at *) (* *) (* http://www.apache.org/licenses/LICENSE-2.0 *) (* *) (* Unless required by applicable law or agreed to in writing, software *) (* distributed under the License is distributed on an "AS IS" BASIS, *) (* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *) (* implied. See the License for the specific language governing *) (* permissions and limitations under the License. *) (**************************************************************************) open Biokepi_run_environment open Biokepi_bfx_tools open Common module Somatic = struct type from_fastqs = normal_fastqs:Pipeline.Construct.input_fastq -> tumor_fastqs:Pipeline.Construct.input_fastq -> dataset:string -> Pipeline.vcf Pipeline.t list let crazy_example ~normal_fastqs ~tumor_fastqs ~dataset = let open Pipeline.Construct in let normal = input_fastq ~dataset normal_fastqs in let tumor = input_fastq ~dataset tumor_fastqs in let bam_pair ?configuration () = let normal = bwa ?configuration normal |> gatk_indel_realigner |> picard_mark_duplicates |> gatk_bqsr in let tumor = bwa ?configuration tumor |> gatk_indel_realigner |> picard_mark_duplicates |> gatk_bqsr in pair ~normal ~tumor in let bam_pairs = let non_default = let open Bwa.Configuration.Aln in { name = "config42"; gap_open_penalty = 10; gap_extension_penalty = 7; mismatch_penalty = 5; } in [ bam_pair (); bam_pair ~configuration:non_default (); ] in let vcfs = List.concat_map bam_pairs ~f:(fun bam_pair -> [ mutect bam_pair; somaticsniper bam_pair; somaticsniper ~configuration:Somaticsniper.Configuration.{ name = "example0001-095"; prior_probability = 0.001; theta = 0.95; } bam_pair; varscan_somatic bam_pair; strelka ~configuration:Strelka.Configuration.exome_default bam_pair; ]) in vcfs let from_fastqs_with_variant_caller ~variant_caller ~normal_fastqs ~tumor_fastqs ~dataset = let open Pipeline.Construct in let normal = input_fastq ~dataset normal_fastqs in let tumor = input_fastq ~dataset tumor_fastqs in let make_bam data = data |> bwa_mem |> gatk_indel_realigner |> picard_mark_duplicates |> gatk_bqsr in let vc_input = pair ~normal:(make_bam normal) ~tumor:(make_bam tumor) in [variant_caller vc_input] end end module Optimization_framework = struct open Nonstd (** QueL-like Compiler Optimization Framework This is “stolen” from ["quel_o.ml"] … i.e. the “Query optimization framework in tagless-final.” The code is reusable for any optimization pass. *) module type Trans_base = sig type 'a from type 'a term val fwd : 'a from -> 'a term (* reflection *) val bwd : 'a term -> 'a from (* reification *) end module type Transformation = sig include Trans_base val fmap : ('a from -> 'b from) -> ('a term -> 'b term) val fmap2 : ('a from -> 'b from -> 'c from) -> ('a term -> 'b term -> 'c term) end (** Add the default implementations of the mapping functions *) module Define_transformation(X : Trans_base) = struct include X let fmap f term = fwd (f (bwd term)) let fmap2 f term1 term2 = fwd (f (bwd term1) (bwd term2)) end (** The default, generic optimizer. Concrete optimizers are built by overriding the interpretations of some DSL forms. See the module {!Transform_applications} for an example of use. *) module Generic_optimizer (X: Transformation) (Input: Semantics.Bioinformatics_base with type 'a repr = 'a X.from) : Semantics.Bioinformatics_base with type 'a repr = 'a X.term and type 'a observation = 'a Input.observation = struct module Tools = Biokepi_bfx_tools open X type 'a repr = 'a term type 'a observation = 'a Input.observation let lambda f = fwd (Input.lambda (fun x -> bwd (f (fwd x)))) let apply e1 e2 = fmap2 Input.apply e1 e2 let observe f = Input.observe (fun () -> bwd (f ())) let list l = fwd (Input.list (List.map ~f:bwd l)) let list_map l ~f = fwd (Input.list_map (bwd l) (bwd f)) let to_unit x = fwd (Input.to_unit (bwd x)) let pair a b = fwd (Input.pair (bwd a) (bwd b)) let pair_first x = fwd (Input.pair_first (bwd x)) let pair_second x = fwd (Input.pair_second (bwd x)) let input_url u = fwd (Input.input_url u) let save ~name thing = fwd (Input.save ~name (bwd thing)) let fastq ~sample_name ?fragment_id ~r1 ?r2 () = let r2 = Option.map ~f:bwd r2 in fwd (Input.fastq ~sample_name ?fragment_id ~r1:(bwd r1) ?r2 ()) let fastq_gz ~sample_name ?fragment_id ~r1 ?r2 () = let r1 = bwd r1 in let r2 = Option.map ~f:bwd r2 in fwd (Input.fastq_gz ~sample_name ?fragment_id ~r1 ?r2 ()) let bam ~sample_name ?sorting ~reference_build input = fwd (Input.bam ~sample_name ?sorting ~reference_build (bwd input)) let bed file = fwd (Input.bed (bwd file)) let mhc_alleles = function | `File f -> fwd (Input.mhc_alleles (`File (bwd f))) | `Names _ as m -> fwd (Input.mhc_alleles m) let index_bam bam = fwd (Input.index_bam (bwd bam)) let kallisto ~reference_build ?bootstrap_samples fq = fwd (Input.kallisto ?bootstrap_samples ~reference_build (bwd fq)) let delly2 ?configuration ~normal ~tumor () = fwd (Input.delly2 ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let cufflinks ?reference_build bam = fwd (Input.cufflinks ?reference_build (bwd bam)) let bwa_aln ?configuration ~reference_build fq = fwd (Input.bwa_aln ?configuration ~reference_build (bwd fq)) let bwa_mem ?configuration ~reference_build fq = fwd (Input.bwa_mem ?configuration ~reference_build (bwd fq)) let bwa_mem_opt ?configuration ~reference_build input = fwd (Input.bwa_mem_opt ?configuration ~reference_build (match input with | `Fastq f -> `Fastq (bwd f) | `Fastq_gz f -> `Fastq_gz (bwd f) | `Bam (b, p) -> `Bam (bwd b, p))) let star ?configuration ~reference_build fq = fwd (Input.star ?configuration ~reference_build (bwd fq)) let hisat ?configuration ~reference_build fq = fwd (Input.hisat ?configuration ~reference_build (bwd fq)) let mosaik ~reference_build fq = fwd (Input.mosaik ~reference_build (bwd fq)) let stringtie ?configuration fq = fwd (Input.stringtie ?configuration (bwd fq)) let gatk_indel_realigner ?configuration bam = fwd (Input.gatk_indel_realigner ?configuration (bwd bam)) let gatk_indel_realigner_joint ?configuration pair = fwd (Input.gatk_indel_realigner_joint ?configuration (bwd pair)) let picard_mark_duplicates ?configuration bam = fwd (Input.picard_mark_duplicates ?configuration (bwd bam)) let picard_reorder_sam ?mem_param ?reference_build bam = fwd (Input.picard_reorder_sam ?mem_param ?reference_build (bwd bam)) let picard_clean_bam bam = fwd (Input.picard_clean_bam (bwd bam)) let gatk_bqsr ?configuration bam = fwd (Input.gatk_bqsr ?configuration (bwd bam)) let seq2hla fq = fwd (Input.seq2hla (bwd fq)) let optitype how fq = fwd (Input.optitype how (bwd fq)) let hlarp input = fwd (Input.hlarp (match input with | `Seq2hla f -> `Seq2hla (bwd f) | `Optitype f -> `Optitype (bwd f))) let filter_to_region vcf bed = fwd (Input.filter_to_region (bwd vcf) (bwd bed)) let gatk_haplotype_caller bam = fwd (Input.gatk_haplotype_caller (bwd bam)) let gunzip gz = fwd (Input.gunzip (bwd gz)) let gunzip_concat gzl = fwd (Input.gunzip_concat (bwd gzl)) let concat l = fwd (Input.concat (bwd l)) let merge_bams ?delete_input_on_success ?attach_rg_tag ?uncompressed_bam_output ?compress_level_one ?combine_rg_headers ?combine_pg_headers bl = fwd (Input.merge_bams (bwd bl)) let bam_to_fastq ?fragment_id se_or_pe bam = fwd (Input.bam_to_fastq ?fragment_id se_or_pe (bwd bam)) let bam_left_align ~reference_build bam = fwd (Input.bam_left_align ~reference_build (bwd bam)) let sambamba_filter ~filter bam = fwd (Input.sambamba_filter ~filter (bwd bam)) let mutect ?configuration ~normal ~tumor () = fwd (Input.mutect ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let mutect2 ?configuration ~normal ~tumor () = fwd (Input.mutect2 ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let somaticsniper ?configuration ~normal ~tumor () = fwd (Input.somaticsniper ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let strelka ?configuration ~normal ~tumor () = fwd (Input.strelka ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let varscan_somatic ?adjust_mapq ~normal ~tumor () = fwd (Input.varscan_somatic ?adjust_mapq ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let muse ?configuration ~normal ~tumor () = fwd (Input.muse ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let virmid ?configuration ~normal ~tumor () = fwd (Input.virmid ?configuration ~normal:(bwd normal) ~tumor:(bwd tumor) ()) let fastqc fq = fwd (Input.fastqc (bwd fq)) let flagstat bam = fwd (Input.flagstat (bwd bam)) let vcf_annotate_polyphen vcf = fwd (Input.vcf_annotate_polyphen (bwd vcf)) let snpeff vcf = fwd (Input.snpeff (bwd vcf)) let isovar ?configuration vcf bam = fwd (Input.isovar ?configuration (bwd vcf) (bwd bam)) let topiary ?configuration vcfs predictor alleles = fwd (Input.topiary ?configuration (List.map ~f:bwd vcfs) predictor (bwd alleles)) let vaxrank ?configuration vcfs bam predictor alleles = fwd (Input.vaxrank ?configuration (List.map ~f:(fun v -> (bwd v)) vcfs) (bwd bam) predictor (bwd alleles)) let seqtk_shift_phred_scores fastq = fwd (Input.seqtk_shift_phred_scores (bwd fastq)) end end module Pipeline = struct (**************************************************************************) (* Copyright 2014, Sebastien Mondet *) (* *) (* Licensed under the Apache License, Version 2.0 (the "License"); *) (* you may not use this file except in compliance with the License. *) (* You may obtain a copy of the License at *) (* *) (* http://www.apache.org/licenses/LICENSE-2.0 *) (* *) (* Unless required by applicable law or agreed to in writing, software *) (* distributed under the License is distributed on an "AS IS" BASIS, *) (* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or *) (* implied. See the License for the specific language governing *) (* permissions and limitations under the License. *) (**************************************************************************) open Biokepi_run_environment open Biokepi_bfx_tools open Common module File = struct type t = KEDSL.file_workflow end type json = Yojson.Basic.json type fastq_gz = Fastq_gz type fastq = Fastq type fastq_sample = Fastq_sample type bam = KEDSL.bam_file KEDSL.workflow_node type bam_pair = Bam_pair type vcf = Vcf type gtf = Gtf (** Seq2HLA & Optitype have unique return file formats *) type seq2hla_hla_types = Seq2HLA_hla_types type optitype_hla_types = Optitype_hla_types type somatic = { normal : bam; tumor : bam } type germline = bam type fastq_sample_info = { sample_name: string; fragment_id: string; } module Variant_caller = struct type _ input = | Somatic: somatic -> somatic input | Germline: germline -> germline input type 'a t = { name: string; configuration_json: json; configuration_name: string; make_target: run_with: Machine.t -> input: 'a input -> result_prefix: string -> ?more_edges: KEDSL.workflow_edge list -> unit -> KEDSL.vcf_file KEDSL.workflow_node } end type metadata_spec = [ | `Add_tags of string list | `Add_tags_rec of string list ] type _ t = | Fastq_gz: File.t -> fastq_gz t | Fastq: File.t -> fastq t (* | List: 'a t list -> 'a list t *) | Bam_sample: string * bam -> bam t | Bam_to_fastq: [ `Single | `Paired ] * bam t -> fastq_sample t | Paired_end_sample: fastq_sample_info * fastq t * fastq t -> fastq_sample t | Single_end_sample: fastq_sample_info * fastq t -> fastq_sample t | Gunzip_concat: fastq_gz t list -> fastq t | Concat_text: fastq t list -> fastq t | Star: Star.Configuration.Align.t * fastq_sample t -> bam t | Hisat: Hisat.Configuration.t * fastq_sample t -> bam t | Stringtie: Stringtie.Configuration.t * bam t -> gtf t | Bwa: Bwa.Configuration.Aln.t * fastq_sample t -> bam t | Bwa_mem: Bwa.Configuration.Mem.t * fastq_sample t -> bam t | Mosaik: fastq_sample t -> bam t | Gatk_indel_realigner: Gatk.Configuration.indel_realigner * bam t -> bam t | Picard_mark_duplicates: Picard.Mark_duplicates_settings.t * bam t -> bam t | Gatk_bqsr: (Gatk.Configuration.bqsr * bam t) -> bam t | Bam_pair: bam t * bam t -> bam_pair t | Somatic_variant_caller: somatic Variant_caller.t * bam_pair t -> vcf t | Germline_variant_caller: germline Variant_caller.t * bam t -> vcf t | Seq2HLA: fastq_sample t -> seq2hla_hla_types t | Optitype: ([`DNA | `RNA] * fastq_sample t) -> optitype_hla_types t | With_metadata: metadata_spec * 'a t -> 'a t module Construct = struct type input_fastq = [ | `Paired_end of File.t list * File.t list | `Single_end of File.t list ] let input_fastq ~dataset (fastqs: input_fastq) = let is_fastq_gz p = Filename.check_suffix p "fastq.gz" || Filename.check_suffix p "fq.gz" in let is_fastq p = Filename.check_suffix p "fastq" || Filename.check_suffix p "fq" in let theyre_all l f = List.for_all l ~f:(fun file -> f file#product#path) in let bring_to_single_fastq l = match l with | [] -> failwithf "Dataset %S seems empty" dataset | gzs when theyre_all gzs is_fastq_gz -> Gunzip_concat (List.map gzs (fun f -> Fastq_gz f)) | fqs when theyre_all fqs is_fastq -> Concat_text (List.map fqs (fun f -> Fastq f)) | not_supported -> failwithf "For now, a sample must be a uniform list of \ fastq.gz/fq.gz or .fq/.fastq files. \ Dataset %S does not qualify: [%s] " dataset (List.map not_supported ~f:(fun f -> Filename.basename f#product#path) |> String.concat ~sep:", ") in let sample_info = {sample_name = dataset; fragment_id = dataset} in match fastqs with | `Paired_end (l1, l2) -> Paired_end_sample (sample_info, bring_to_single_fastq l1, bring_to_single_fastq l2) | `Single_end l -> Single_end_sample (sample_info, bring_to_single_fastq l) let bam ~dataset bam = Bam_sample (dataset, bam) let bam_to_fastq how bam = Bam_to_fastq (how, bam) let bwa ?(configuration = Bwa.Configuration.Aln.default) fastq = Bwa (configuration, fastq) let bwa_aln = bwa let bwa_mem ?(configuration = Bwa.Configuration.Mem.default) fastq = Bwa_mem (configuration, fastq) let mosaik fastq = Mosaik fastq let star ?(configuration = Star.Configuration.Align.default) fastq = Star (configuration, fastq) let hisat ?(configuration = Hisat.Configuration.default_v1) fastq = Hisat (configuration, fastq) let stringtie ?(configuration = Stringtie.Configuration.default) bam = Stringtie (configuration, bam) let gatk_indel_realigner ?(configuration=Gatk.Configuration.default_indel_realigner) bam = Gatk_indel_realigner (configuration, bam) let picard_mark_duplicates ?(settings=Picard.Mark_duplicates_settings.default) bam = Picard_mark_duplicates (settings, bam) let gatk_bqsr ?(configuration=Gatk.Configuration.default_bqsr) bam = Gatk_bqsr (configuration, bam) let pair ~normal ~tumor = Bam_pair (normal, tumor) let germline_variant_caller t input_bam = Germline_variant_caller (t, input_bam) let gatk_haplotype_caller input_bam = let configuration_name = "default" in let configuration_json = `Assoc [ "Name", `String configuration_name; ] in let make_target ~run_with ~input ~result_prefix ?more_edges () = match input with | Variant_caller.Germline input_bam -> Gatk.haplotype_caller ?more_edges ~run_with ~input_bam ~result_prefix `Map_reduce in germline_variant_caller {Variant_caller.name = "Gatk-HaplotypeCaller"; configuration_json; configuration_name; make_target;} input_bam let somatic_variant_caller t bam_pair = Somatic_variant_caller (t, bam_pair) let mutect ?(configuration=Mutect.Configuration.default) bam_pair = let configuration_name = configuration.Mutect.Configuration.name in let configuration_json = Mutect.Configuration.to_json configuration in let make_target ~run_with ~input ~result_prefix ?more_edges () = match input with | Variant_caller.Somatic {normal; tumor} -> Mutect.run ~configuration ?more_edges ~run_with ~normal ~tumor ~result_prefix `Map_reduce in somatic_variant_caller {Variant_caller.name = "Mutect"; configuration_json; configuration_name; make_target;} bam_pair let mutect2 ?(configuration=Gatk.Configuration.Mutect2.default) bam_pair = let configuration_name = configuration.Gatk.Configuration.Mutect2.name in let configuration_json = Gatk.Configuration.Mutect2.to_json configuration in let make_target ~run_with ~input ~result_prefix ?more_edges () = match input with | Variant_caller.Somatic {normal; tumor} -> Gatk.mutect2 ~configuration ?more_edges ~run_with ~input_normal_bam:normal ~input_tumor_bam:tumor ~result_prefix `Map_reduce in somatic_variant_caller {Variant_caller.name = "Mutect"; configuration_json; configuration_name; make_target;} bam_pair let somaticsniper ?(configuration = Somaticsniper.Configuration.default) bam_pair = let make_target ~run_with ~input ~result_prefix ?more_edges () = match input with | Variant_caller.Somatic {normal; tumor} -> Somaticsniper.run ~configuration ~run_with ~normal ~tumor ~result_prefix () in somatic_variant_caller {Variant_caller.name = "Somaticsniper"; configuration_json = Somaticsniper.Configuration.to_json configuration; configuration_name = Somaticsniper.Configuration.name configuration; make_target;} bam_pair let varscan_somatic ?adjust_mapq bam_pair = let configuration_name = sprintf "amq-%s" (Option.value_map ~default:"NONE" adjust_mapq ~f:Int.to_string) in let configuration_json = `Assoc [ "Name", `String configuration_name; "Adjust_mapq", `String (Option.value_map adjust_mapq ~f:Int.to_string ~default:"None"); ] in somatic_variant_caller {Variant_caller.name = "Varscan-somatic"; configuration_json; configuration_name; make_target = begin fun ~run_with ~input ~result_prefix ?more_edges () -> match input with | Variant_caller.Somatic {normal; tumor} -> Varscan.somatic_map_reduce ?adjust_mapq ?more_edges ~run_with ~normal ~tumor ~result_prefix () end} bam_pair let strelka ~configuration bam_pair = somatic_variant_caller {Variant_caller.name = "Strelka"; configuration_json = Strelka.Configuration.to_json configuration; configuration_name = configuration.Strelka.Configuration.name; make_target = fun ~run_with ~input ~result_prefix ?more_edges () -> match input with | Variant_caller.Somatic {normal; tumor} -> Strelka.run ?more_edges ~configuration ~normal ~tumor ~run_with ~result_prefix () } bam_pair let virmid ~configuration bam_pair = somatic_variant_caller {Variant_caller.name = "Virmid"; configuration_json = Virmid.Configuration.to_json configuration; configuration_name = configuration.Virmid.Configuration.name; make_target = fun ~run_with ~input ~result_prefix ?more_edges () -> match input with | Variant_caller.Somatic {normal; tumor} -> Virmid.run ?more_edges ~configuration ~normal ~tumor ~run_with ~result_prefix () } bam_pair let muse ~configuration bam_pair = let make_target ~(run_with: Machine.t) ~input ~result_prefix ?more_edges () = match input with | Variant_caller.Somatic {normal; tumor} -> Muse.run ~configuration ?more_edges ~run_with ~normal ~tumor ~result_prefix `Map_reduce in somatic_variant_caller {Variant_caller.name = "Muse"; configuration_json = Muse.Configuration.to_json configuration; configuration_name = configuration.Muse.Configuration.name; make_target } bam_pair let seq2hla fastq_sample = Seq2HLA fastq_sample let optitype kind fastq_sample = Optitype (kind, fastq_sample) let add_tags ?(recursively = false) tags pipeline = With_metadata ((if recursively then `Add_tags_rec tags else `Add_tags tags), pipeline) end let rec to_file_prefix: type a. ?read:[ `R1 of string | `R2 of string ] -> a t -> string = fun ?read w -> begin match w with | With_metadata (_, p) -> to_file_prefix ?read p | Fastq_gz _ -> failwith "TODO" | Fastq _ -> failwith "TODO" | Single_end_sample (info, _) -> info.fragment_id | Gunzip_concat [] -> failwith "TODO" | Gunzip_concat (_ :: _) -> begin match read with | None -> "-cat" | Some (`R1 s) -> sprintf "%s-R1-cat" s | Some (`R2 s) -> sprintf "%s-R2-cat" s end | Concat_text _ -> failwith "TODO" | Bam_sample (name, _) -> Filename.basename name | Bam_to_fastq (how, bam) -> sprintf "%s-b2fq-%s" (to_file_prefix bam) (match how with `Paired -> "PE" | `Single -> "SE") | Paired_end_sample (info, _ , _) -> info.fragment_id | Bwa (configuration, sample) -> sprintf "%s-bwa-%s" (to_file_prefix sample) (Bwa.Configuration.Aln.name configuration) | Bwa_mem (configuration, sample) -> sprintf "%s-bwa-mem-%s" (to_file_prefix sample) (Bwa.Configuration.Mem.name configuration) | Star (configuration, sample) -> sprintf "%s-%s-star-aligned" (to_file_prefix sample) (Star.Configuration.Align.name configuration) | Hisat (conf, sample) -> sprintf "%s-hisat-%s-aligned" (to_file_prefix sample) (conf.Hisat.Configuration.name) | Stringtie (conf, sample) -> sprintf "%s-%s-stringtie" (to_file_prefix sample) (conf.Stringtie.Configuration.name) | Mosaik (sample) -> sprintf "%s-mosaik" (to_file_prefix sample) | Gatk_indel_realigner ((indel_cfg, target_cfg), bam) -> let open Gatk.Configuration in sprintf "%s-indelrealigned-%s-%s" (to_file_prefix ?read bam) indel_cfg.Indel_realigner.name target_cfg.Realigner_target_creator.name | Gatk_bqsr ((bqsr_cfg, print_reads_cfg), bam) -> let open Gatk.Configuration in sprintf "%s-bqsr-%s-%s" (to_file_prefix ?read bam) bqsr_cfg.Bqsr.name print_reads_cfg.Print_reads.name | Picard_mark_duplicates (_, bam) -> (* The settings, for now, do not impact the result *) sprintf "%s-dedup" (to_file_prefix ?read bam) | Bam_pair (nor, tum) -> to_file_prefix tum | Somatic_variant_caller (vc, bp) -> let prev = to_file_prefix bp in sprintf "%s-%s-%s" prev vc.Variant_caller.name vc.Variant_caller.configuration_name | Germline_variant_caller (vc, bp) -> let prev = to_file_prefix bp in sprintf "%s-%s-%s" prev vc.Variant_caller.name vc.Variant_caller.configuration_name | Seq2HLA s -> sprintf "seq2hla-%s" (to_file_prefix ?read s) | Optitype (kind, s) -> sprintf "optitype-%s-%s" (match kind with `DNA -> "DNA" | `RNA -> "RNA") (to_file_prefix ?read s) end let rec to_json: type a. a t -> json = fun w -> let call name (args : json list): json = `List (`String name :: args) in match w with | Fastq_gz file -> call "Fastq_gz" [`String file#product#path] | Fastq file -> call "Fastq" [`String file#product#path] | Bam_sample (name, file) -> call "Bam-sample" [`String name; `String file#product#path] | Bam_to_fastq (how, bam) -> let how_string = match how with `Paired -> "Paired" | `Single -> "Single" in call "Bam-to-fastq" [`String how_string; to_json bam] | Paired_end_sample ({sample_name; fragment_id}, r1, r2) -> call "Paired-end" [`String sample_name; `String fragment_id; to_json r1; to_json r2] | Single_end_sample ({sample_name; fragment_id}, r) -> call "Single-end" [`String sample_name; `String fragment_id; to_json r] | Gunzip_concat fastq_gz_list -> call "Gunzip-concat" (List.map ~f:to_json fastq_gz_list) | Concat_text fastq_list -> call "Concat" (List.map ~f:to_json fastq_list) | Bwa (config, input) -> call "BWA" [ `Assoc ["configuration", Bwa.Configuration.Aln.to_json config]; to_json input ] | Bwa_mem (params, input) -> let input_json = to_json input in call "BWA-MEM" [ `Assoc ["configuration", Bwa.Configuration.Mem.to_json params]; input_json ] | Star (conf, input) -> let input_json = to_json input in call "STAR" [ `Assoc ["configuration", Star.Configuration.Align.to_json conf]; input_json; ] | Hisat (conf, input) -> let input_json = to_json input in call "HISAT" [ `Assoc ["configuration", Hisat.Configuration.to_json conf]; input_json; ] | Stringtie (conf, input) -> let input_json = to_json input in call "Stringtie" [ `Assoc ["configuration", Stringtie.Configuration.to_json conf]; input_json; ] | Mosaik (input) -> let input_json = to_json input in call "MOSAIK" [input_json] | Gatk_indel_realigner ((indel_cfg, target_cfg), bam) -> let open Gatk.Configuration in let input_json = to_json bam in let indel_cfg_json = Indel_realigner.to_json indel_cfg in let target_cfg_json = Realigner_target_creator.to_json target_cfg in call "Gatk_indel_realigner" [`Assoc [ "Configuration", `Assoc [ "IndelRealigner Configuration", indel_cfg_json; "RealignerTargetCreator Configuration", target_cfg_json; ]; "Input", input_json; ]] | Gatk_bqsr ((bqsr_cfg, print_reads_cfg), bam) -> let open Gatk.Configuration in let input_json = to_json bam in call "Gatk_bqsr" [`Assoc [ "Configuration", `Assoc [ "Bqsr", Bqsr.to_json bqsr_cfg; "Print_reads", Print_reads.to_json print_reads_cfg; ]; "Input", input_json; ]] | Germline_variant_caller (gvc, bam) -> call gvc.Variant_caller.name [`Assoc [ "Configuration", gvc.Variant_caller.configuration_json; "Input", to_json bam; ]] | Picard_mark_duplicates (settings, bam) -> (* The settings should not impact the output, so we don't dump them. *) call "Picard_mark_duplicates" [`Assoc ["input", to_json bam]] | Bam_pair (normal, tumor) -> call "Bam-pair" [`Assoc ["normal", to_json normal; "tumor", to_json tumor]] | Somatic_variant_caller (svc, bam_pair) -> call svc.Variant_caller.name [`Assoc [ "Configuration", svc.Variant_caller.configuration_json; "Input", to_json bam_pair; ]] | Seq2HLA input -> call "Seq2HLA" [`Assoc [ "Input", to_json input ]] | Optitype (kind, input) -> call "Optitype" [`Assoc [ "Input", to_json input; "Kind", `String (match kind with `DNA -> "DNA" | `RNA -> "RNA") ]] | With_metadata (_, p) -> to_json p module Compiler = struct type 'a pipeline = 'a t type workflow_option_failure_mode = [ `Silent | `Fail_if_not_happening ] type workflow_option = [ | `Multi_sample_indel_realignment of workflow_option_failure_mode | `Parallel_alignment_over_fastq_fragments of [ `Bwa_mem | `Bwa | `Mosaik | `Star | `Hisat ] list * workflow_option_failure_mode | `Map_reduce of [ `Gatk_indel_realigner ] ] type t = { reference_build: Reference_genome.name; work_dir: string; machine : Machine.t; options: workflow_option list; wrap_bam_node: bam pipeline -> KEDSL.bam_file KEDSL.workflow_node -> KEDSL.bam_file KEDSL.workflow_node; wrap_vcf_node: vcf pipeline -> KEDSL.vcf_file KEDSL.workflow_node -> KEDSL.vcf_file KEDSL.workflow_node; wrap_gtf_node: gtf pipeline -> KEDSL.single_file KEDSL.workflow_node -> KEDSL.single_file KEDSL.workflow_node; } let create ?(wrap_bam_node = fun _ x -> x) ?(wrap_vcf_node = fun _ x -> x) ?(wrap_gtf_node = fun _ x -> x) ?(options=[]) ~reference_build ~work_dir ~machine () = {reference_build; work_dir; machine; options; wrap_bam_node; wrap_vcf_node; wrap_gtf_node} let has_option {options; _} f = List.exists options ~f let apply_with_metadata ~metadata_spec wf = begin match metadata_spec with | `Add_tags tgs -> KEDSL.add_tags ~recursive:false wf tgs | `Add_tags_rec tgs -> KEDSL.add_tags ~recursive:true wf tgs end; wf (* This compiler variable name is confusing, perhaps 'state' is better? *) let rec fastq_step ~read ~compiler (pipeline: fastq pipeline) = let {work_dir; machine ; _ } = compiler in match pipeline with | Fastq f -> f | Concat_text (l: fastq pipeline list) -> failwith "Compilation of Biokepi.Pipeline.Concat_text: not implemented" | Gunzip_concat (l: fastq_gz pipeline list) -> let fastqs = let rec f = function | Fastq_gz t -> t | With_metadata (metadata_spec, p) -> apply_with_metadata ~metadata_spec (f p) in List.map l ~f in let result_path = work_dir // to_file_prefix ~read pipeline ^ ".fastq" in dbg "Result_Path: %S" result_path; Workflow_utilities.Gunzip.concat ~run_with:machine fastqs ~result_path | With_metadata (metadata_spec, pipeline) -> fastq_step ~read ~compiler pipeline |> apply_with_metadata ~metadata_spec let rec fastq_sample_step ~compiler (fs : fastq_sample pipeline) = let {work_dir; machine ; _ } = compiler in match fs with | Bam_to_fastq (how, what) -> let bam = compile_aligner_step ~compiler what in let sample_type = match how with `Single -> `Single_end | `Paired -> `Paired_end in let fastq_pair = let output_prefix = work_dir // to_file_prefix ?read:None what in Picard.bam_to_fastq ~run_with:machine ~sample_type ~output_prefix bam in fastq_pair | Paired_end_sample (info, l1, l2) -> let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) l1 in let r2 = fastq_step ~compiler ~read:(`R2 info.fragment_id) l2 in let open KEDSL in workflow_node (fastq_reads ~host:Machine.(as_host machine) ~name:info.sample_name r1#product#path (Some r2#product#path)) ~name:(sprintf "Pairing sample %s (%s and %s)" info.sample_name (Filename.basename r1#product#path) (Filename.basename r2#product#path)) ~edges:[ depends_on r1; depends_on r2 ] | Single_end_sample (info, single) -> let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) single in let open KEDSL in workflow_node (fastq_reads ~host:Machine.(as_host machine) ~name:info.sample_name r1#product#path None) ~equivalence:`None ~edges:[ depends_on r1 ] ~name:(sprintf "single-end %s (%s)" info.sample_name (Filename.basename r1#product#path)) | With_metadata (metadata_spec, p) -> fastq_sample_step ~compiler p |> apply_with_metadata ~metadata_spec and compile_aligner_step ~compiler (pipeline : bam pipeline) = let {reference_build; work_dir; machine; _} = compiler in let result_prefix = work_dir // to_file_prefix pipeline in dbg "Result_Prefix: %S" result_prefix; let rec parallelize_alignment ~make_aligner sample = match sample with | Paired_end_sample (info, Gunzip_concat r1_list, Gunzip_concat r2_list) -> let exploded = let count = ref 0 in List.map2 r1_list r2_list ~f:(fun r1 r2 -> match r1, r2 with | (Fastq_gz wf1, Fastq_gz wf2) -> let new_info = incr count; {info with fragment_id = (* fragmenting = creating fragments of previous fragment *) sprintf "%s-fragment-%d" info.fragment_id !count} in make_aligner ( Paired_end_sample (new_info, Gunzip_concat [r1], Gunzip_concat [r2])) | other -> failwith "compile_aligner_step: not implemented" ) in let bams = List.map exploded ~f:(compile_aligner_step ~compiler) in Samtools.merge_bams ~run_with:machine bams (result_prefix ^ "-merged.bam") | With_metadata (metadata_spec, p) -> parallelize_alignment ~make_aligner p |> apply_with_metadata ~metadata_spec | other -> failwith "parallelize_alignment: Not fully implemented" in let catch_parallelize_aligner aligner sample = let option = List.find_map compiler.options (function | `Parallel_alignment_over_fastq_fragments (al, todo) when List.mem ~set:al aligner -> Some todo | _ -> None) in match sample with | Paired_end_sample (name, Gunzip_concat r1_list, Gunzip_concat r2_list) when List.length r1_list > 1 -> begin match option with | Some _ -> `Caught | None -> `Not_caught end | Paired_end_sample (name, Gunzip_concat r1_list, Gunzip_concat r2_list) when List.length r1_list = 1 -> `Not_caught | other -> begin match option with | Some `Silent | None -> `Not_caught | Some `Fail_if_not_happening -> failwithf "Option Parallel_alignment_over_fastq_fragments is set to \ Fail_if_not_happening and it didn't happen:\n%s" (to_json other |> Yojson.Basic.pretty_to_string) end in let perform_aligner_parallelization aligner_tag ~make_aligner ~make_workflow sample = begin match catch_parallelize_aligner aligner_tag sample with | `Caught -> parallelize_alignment ~make_aligner sample | `Not_caught -> let fastq = fastq_sample_step ~compiler sample in make_workflow fastq end in let bam_node = match pipeline with | Bam_sample (name, bam_target) -> bam_target | Gatk_indel_realigner (configuration, bam) when has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner)) -> let input_bam = compile_aligner_step ~compiler bam in Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false ~configuration (KEDSL.Single_bam input_bam) | Gatk_indel_realigner (configuration, bam) -> let input_bam = compile_aligner_step ~compiler bam in Gatk.indel_realigner ~run_with:machine ~compress:false ~configuration (KEDSL.Single_bam input_bam) | Gatk_bqsr (configuration, bam) -> let input_bam = compile_aligner_step ~compiler bam in let output_bam = result_prefix ^ ".bam" in Gatk.base_quality_score_recalibrator ~configuration ~run_with:machine ~input_bam ~output_bam | Picard_mark_duplicates (settings, bam) -> let input_bam = compile_aligner_step ~compiler bam in let output_bam = result_prefix ^ ".bam" in Picard.mark_duplicates ~settings ~run_with:machine ~input_bam output_bam | Bwa_mem (bwa_mem_config, fq_sample) -> perform_aligner_parallelization `Bwa_mem ~make_aligner:(fun pe -> Bwa_mem (bwa_mem_config, pe)) fq_sample ~make_workflow:(fun fastq -> Bwa.mem_align_to_sam ~reference_build ~configuration:bwa_mem_config ~fastq ~result_prefix ~run_with:machine () |> Samtools.sam_to_bam ~reference_build ~run_with:machine) | Bwa (bwa_config, what) -> perform_aligner_parallelization `Bwa ~make_aligner:(fun pe -> Bwa (bwa_config, pe)) what ~make_workflow:(fun fastq -> Bwa.align_to_sam ~reference_build ~configuration:bwa_config ~fastq ~result_prefix ~run_with:machine () |> Samtools.sam_to_bam ~reference_build ~run_with:machine) | Mosaik (what) -> perform_aligner_parallelization `Mosaik ~make_aligner:(fun pe -> Mosaik (pe)) what ~make_workflow:(fun fastq -> Mosaik.align ~reference_build ~fastq ~result_prefix ~run_with:machine ()) | Star (configuration, what) -> perform_aligner_parallelization `Star ~make_aligner:(fun pe -> Star (configuration, pe)) what ~make_workflow:(fun fastq -> Star.align ~configuration ~reference_build ~fastq ~result_prefix ~run_with:machine ()) | Hisat (configuration, what) -> perform_aligner_parallelization `Hisat ~make_aligner:(fun pe -> Hisat (configuration, pe)) what ~make_workflow:(fun fastq -> Hisat.align ~configuration ~reference_build ~fastq ~result_prefix ~run_with:machine () |> Samtools.sam_to_bam ~reference_build ~run_with:machine ) | With_metadata (metadata_spec, p) -> compile_aligner_step ~compiler p |> apply_with_metadata ~metadata_spec in compiler.wrap_bam_node pipeline bam_node let rec compile_bam_pair ~compiler (pipeline : bam_pair pipeline) : [ `Normal of KEDSL.bam_file KEDSL.workflow_node ] * [ `Tumor of KEDSL.bam_file KEDSL.workflow_node ] * [ `Pipeline of bam_pair pipeline ] = let {reference_build; work_dir; machine; _} = compiler in begin match pipeline with | Bam_pair ( (Gatk_bqsr (n_bqsr_config, Gatk_indel_realigner (n_gir_conf, n_bam))) , (Gatk_bqsr (t_bqsr_config, Gatk_indel_realigner (t_gir_conf, t_bam))) ) when has_option compiler (function `Multi_sample_indel_realignment _ -> true | _ -> false) && n_gir_conf = t_gir_conf -> let normal = compile_aligner_step ~compiler n_bam in let tumor = compile_aligner_step ~compiler t_bam in let bam_list_node = if has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner)) then ( Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false ~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor]) ) else Gatk.indel_realigner ~run_with:machine ~compress:false ~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor]) in begin match KEDSL.explode_bam_list_node bam_list_node with | [realigned_normal; realigned_tumor] -> let new_pipeline = Bam_pair ( Gatk_bqsr (n_bqsr_config, Bam_sample (Filename.chop_extension realigned_normal#product#path, realigned_normal)), Gatk_bqsr (t_bqsr_config, Bam_sample (Filename.chop_extension realigned_tumor#product#path, realigned_tumor))) in compile_bam_pair ~compiler new_pipeline | other -> failwithf "Gatk.indel_realigner did not return the correct list \ of length 2 (tumor, normal): it gave %d bams" (List.length other) end | Bam_pair ( Gatk_bqsr (_, Gatk_indel_realigner (_, _)), Gatk_bqsr (_, Gatk_indel_realigner (_, _))) as bam_pair when has_option compiler ((=) (`Multi_sample_indel_realignment `Fail_if_not_happening)) -> failwithf "Option (`Multi_sample_indel_realignment \ `Fail_if_not_happening) is set and this pipeline does not \ qualify:\n%s" (to_json bam_pair |> Yojson.Basic.pretty_to_string) | Bam_pair (normal_t, tumor_t) as final_pipeline -> let normal = compile_aligner_step ~compiler normal_t in let tumor = compile_aligner_step ~compiler tumor_t in (`Normal normal, `Tumor tumor, `Pipeline final_pipeline) | With_metadata (metadata_spec, p) -> let `Normal normal, `Tumor tumor, `Pipeline p = compile_bam_pair ~compiler p in (`Normal (apply_with_metadata ~metadata_spec normal), `Tumor (apply_with_metadata ~metadata_spec tumor), `Pipeline p) end let rec compile_variant_caller_step ~compiler (pipeline: vcf pipeline) = let {reference_build; work_dir; machine; _} = compiler in (* result prefix ignore optimizations *) let vcf_node = match pipeline with | Somatic_variant_caller (som_vc, bam_pair) -> let (`Normal normal, `Tumor tumor, `Pipeline new_bam_pair) = compile_bam_pair ~compiler bam_pair in let result_prefix = work_dir // to_file_prefix (Somatic_variant_caller (som_vc, new_bam_pair)) in dbg "Result_Prefix: %S" result_prefix; som_vc.Variant_caller.make_target ~run_with:machine ~input:(Variant_caller.Somatic {normal; tumor}) ~result_prefix () | Germline_variant_caller (gvc, bam) -> let result_prefix = work_dir // to_file_prefix pipeline in dbg "Result_Prefix: %S" result_prefix; let input_bam = compile_aligner_step ~compiler bam in gvc.Variant_caller.make_target ~run_with:machine ~input:(Variant_caller.Germline input_bam) ~result_prefix () | With_metadata (metadata_spec, p) -> compile_variant_caller_step ~compiler p |> apply_with_metadata ~metadata_spec in compiler.wrap_vcf_node pipeline vcf_node let rec compile_gtf_step ~compiler (pipeline: gtf pipeline) = let {reference_build; work_dir; machine; _} = compiler in let result_prefix = work_dir // to_file_prefix pipeline in dbg "Result_Prefix: %S" result_prefix; let gtf_node = match pipeline with | Stringtie (configuration, bam) -> let bam = compile_aligner_step ~compiler bam in Stringtie.run ~configuration ~bam ~result_prefix ~run_with:machine () | With_metadata (metadata_spec, p) -> compile_gtf_step ~compiler p |> apply_with_metadata ~metadata_spec in compiler.wrap_gtf_node pipeline gtf_node let rec seq2hla_hla_types_step ~compiler (pipeline : seq2hla_hla_types pipeline) = let { machine ; work_dir; _ } = compiler in match pipeline with | Seq2HLA sample -> let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in (* TODO: Seq2HLA can actually take the gzipped version too, so we'd need a unique type for that. *) let fastq_pair = fastq_sample_step ~compiler sample in let sample_name = fastq_pair#product#sample_name in let r1 = KEDSL.read_1_file_node fastq_pair in let r2 = match KEDSL.read_2_file_node fastq_pair with | Some r2 -> r2 | _ -> failwithf "Seq2HLA doesn't support Single_end_sample(s)." in Seq2HLA.hla_type ~work_dir ~run_with:machine ~run_name:sample_name ~r1 ~r2 | With_metadata (metadata_spec, p) -> seq2hla_hla_types_step ~compiler p |> apply_with_metadata ~metadata_spec let rec optitype_hla_types_step ~compiler (pipeline : optitype_hla_types pipeline) = let { machine ; work_dir; _ } = compiler in match pipeline with | Optitype (kind, sample) -> let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in let fastq = fastq_sample_step ~compiler sample in let sample_name = fastq#product#sample_name in Optitype.hla_type ~work_dir ~run_with:machine ~run_name:sample_name ~fastq kind | With_metadata (metadata_spec, p) -> optitype_hla_types_step ~compiler p |> apply_with_metadata ~metadata_spec end (* Compiler *) end module Pipeline_library : sig module Input : sig type t = | Fastq of fastq | Bam of bam and fastq_fragment = (string option * fastq_data) and fastq = { fastq_sample_name : string; files : fastq_fragment list; } and bam = { bam_sample_name: string; path: string; how: how; sorting: sorting option; reference_build: string; } and fastq_data = | PE of string * string | SE of string | Of_bam of how * sorting option * string * string and sorting = [ `Coordinate | `Read_name ] and how = [ `PE | `SE ] val pp : Format.formatter -> t -> unit val show : t -> string val pe : ?fragment_id:string -> string -> string -> fastq_fragment val se : ?fragment_id:string -> string -> fastq_fragment val fastq_of_bam : ?fragment_id: string -> ?sorted:[ `Coordinate | `Read_name ] -> reference_build:string -> [ `PE | `SE ] -> string -> fastq_fragment val fastq_sample : sample_name:string -> fastq_fragment list -> t val bam_sample : sample_name:string -> ?sorting:sorting -> reference_build:string -> how:how -> string -> t val to_yojson : t -> Yojson.Safe.json val of_yojson : Yojson.Safe.json -> (t, string) Pvem.Result.t module Derive : sig (** Make it easy to create Input.t from directories on hosts. *) val ensure_one_flowcell_max : string list -> (string list, [> `Multiple_flowcells of string list | `Re_group_error of string ]) Pvem_lwt_unix.Deferred_result.t (** This will error out immediately if more than one flowcell is in a directory. We want to make sure only one flowcell is in this directory, otherwise this'll give us bad ordering: for now, we'll just fail with an informative exception: if it turns out we need to handle this case, we can do it here. The proper ordering should be grouping and ordering with flowcell groups, and arbitrary order between groups. *) val fastqs : ?paired_end:bool -> host:Ketrew_pure.Host.t -> string -> (fastq_fragment list, [> `Host of _ Ketrew.Host_io.Error.execution Ketrew.Host_io.Error.non_zero_execution | `Multiple_flowcells of string list | `R2_expected_for_r1 of string | `Re_group_error of string]) Pvem_lwt_unix.Deferred_result.t (** This gives us lexicographically sorted FASTQ filenames in a given directory on the host. This assumes Illumina-style file-naming: Sample_name_barcode_lane#_read#_fragment#.(flowcell)?.fastq.gz N.B. If a given directory contains FASTQs with multiple flowcells, this will fail. *) end end module Make: functor (Bfx : Semantics.Bioinformatics_base) -> sig val fastq_of_files : sample_name:string -> ?fragment_id:string -> r1:string -> ?r2:string -> unit -> [ `Fastq ] Bfx.repr val fastq_of_input : Input.t -> [ `Fastq ] list Bfx.repr val bam_of_input_exn : Input.t -> [ `Bam ] Bfx.repr val bwa_mem_opt_inputs_exn: Input.t -> [ `Bam of [ `Bam ] Bfx.repr * [ `PE | `SE ] | `Fastq of [ `Fastq ] Bfx.repr | `Fastq_gz of [ `Gz of [ `Fastq ] ] Bfx.repr ] list end end = struct (** Reusable and useful pieces of pipeline. *) open Biokepi_run_environment.Common module Input = struct type t = | Fastq of fastq | Bam of bam and fastq = { fastq_sample_name : string; files : fastq_fragment list; } and bam = { bam_sample_name: string; path: string; how: how; sorting: sorting option; reference_build: string; } and fastq_fragment = (string option * fastq_data) and fastq_data = | PE of string * string | SE of string | Of_bam of how * sorting option * string * string and sorting = [ `Coordinate | `Read_name ] and how = [ `PE | `SE ] [@@deriving show,yojson] let pe ?fragment_id a b = (fragment_id, PE (a, b)) let se ?fragment_id a = (fragment_id, SE a) let fastq_of_bam ?fragment_id ?sorted ~reference_build how s = (fragment_id, Of_bam (how, sorted, reference_build, s)) let fastq_sample ~sample_name files = Fastq {fastq_sample_name = sample_name; files} let bam_sample ~sample_name ?sorting ~reference_build ~how path = Bam {bam_sample_name = sample_name; path; how; reference_build; sorting } let tag_v0 = "biokepi-input-v0" let current_version_tag = tag_v0 let to_yojson = let string s = `String s in let option f o : Yojson.Safe.json = Option.value_map o ~default:`Null ~f in let data_to_yojson = function | PE (r1, r2) -> `Assoc ["paired-end", `Assoc ["r1", string r1; "r2", string r2]] | SE f -> `Assoc ["single-end", string f] | Of_bam (how, sorto, refb, fil) -> `Assoc ["bam", `Assoc ["kind", string (match how with | `PE -> "paired-end" | `SE -> "single-end"); "sorting", (match sorto with | None -> `Null | Some `Read_name -> `String "read-name" | Some `Coordinate -> `String "coordinate"); "reference-genome", string refb; "path", string fil]] in let file_to_yojson (fragment_option, data) = `Assoc [ "fragment-id", option string fragment_option; "data", data_to_yojson data; ] in let files_to_yojson files = `List (List.map ~f:file_to_yojson files) in function | Fastq {fastq_sample_name; files} -> `Assoc [current_version_tag, `Assoc ["fastq", `Assoc [ "sample-name", string fastq_sample_name; "fragments", files_to_yojson files; ]]] | Bam bam -> `Assoc [current_version_tag, `Assoc ["bam", bam_to_yojson bam]] let of_yojson j = let open Pvem.Result in let error ?json fmt = ksprintf (fun s -> fail (sprintf "%s%s" s (Option.value_map ~default:"" json ~f:(fun j -> sprintf " but got %s" @@ Yojson.Safe.pretty_to_string ~std:true j))) ) fmt in let data_of_yojson = function | `Assoc ["paired-end", `Assoc ["r1", `String r1; "r2", `String r2]] -> return (PE (r1, r2)) | `Assoc ["single-end", `String file] -> SE file |> return | `Assoc ["bam", `Assoc ["kind", `String kind; "sorting", sorting; "reference-genome", `String refb; "path", `String path;]] -> begin match sorting with | `Null -> return None | `String "coordinate" -> Some `Coordinate |> return | `String "read-name" -> Some `Read_name |> return | other -> error ~json:other "Expecting %S, %S or null (in \"sorting\": ...)" "coordinate" "read-name" end >>= fun sorting -> begin match kind with | "single-end" -> return `SE | "paired-end" -> return `PE | other -> error "Kind in bam must be \"SE\" or \"PE\"" end >>= fun kind -> return (Of_bam (kind, sorting, refb, path)) | other -> error ~json:other "Expecting string or null (in \"fragment\": ...)" in let fragment_of_yojson = function | `Assoc ["fragment-id", frag; "data", data] -> begin match frag with | `String s -> return (Some s) | `Null -> return (None) | other -> error ~json:other "Expecting string or null (in \"fragment\": ...)" end >>= fun fragment_id -> data_of_yojson data >>= fun data_parsed -> return (fragment_id, data_parsed) | other -> error ~json:other "Expecting {\"fragment\": ... , \"data\": ...}" in match j with | `Assoc [vtag, more] when vtag = current_version_tag -> begin match more with | `Assoc ["fastq", `Assoc ["sample-name", `String sample; "fragments", `List frgs]] -> List.fold ~init:(return []) frgs ~f:(fun prev frag -> prev >>= fun p -> fragment_of_yojson frag >>= fun more -> return (more :: p)) >>= fun l -> return (Fastq { fastq_sample_name = sample; files = List.rev l }) | `Assoc ["bam", bam] -> begin match bam_of_yojson bam with | Result.Ok ok -> return ok | Result.Error err -> fail err end >>= fun bam -> return (Bam bam) | other -> error ~json:other "Expecting Fastq or Bam" end | other -> error ~json:other "Expecting Biokepi_input_v0" module Derive = struct let ensure_one_flowcell_max files = let open Pvem_lwt_unix.Deferred_result in let flowcell_re = Re.compile (Re_posix.re {re|.*(\..+)?\.fastq\.gz|re}) in let flowcells = Pvem_lwt_unix.Deferred_list.while_sequential files ~f:(fun f -> match Re.Group.all (Re.exec flowcell_re f) with | [|_; flowcell|] -> return flowcell | _ -> fail (`Re_group_error "Re.Group.all returned an unexpected number of groups.")) in flowcells >>| List.dedup >>= fun flowcells -> if List.length flowcells > 1 then fail (`Multiple_flowcells flowcells) else return files let fastqs ?(paired_end=true) ~host dir = let open Pvem_lwt_unix.Deferred_result in (* We only want to select fastq.gzs in the directory: *) let fastq_re = Re.compile (Re_posix.re {re|.*\.fastq\.gz|re}) in let fastq_p = Re.execp fastq_re in let r1_to_r2 s = let r1_re = Re.compile (Re_posix.re "_R1_") in Re.replace_string r1_re ~by:"_R2_" s in (* If we're expecting paired_end reads, make sure we have a R2 in the directory for each R1 fragment. Returns the list of R1 fragments. *) let ensure_matching_r2s files = if paired_end then (List.fold ~init:(return []) files ~f:(fun acc r1 -> acc >>= fun r1s -> match String.index_of_string r1 ~sub:"_R1_" with | None -> acc | Some _ -> if List.mem (r1_to_r2 r1) ~set:files then return (r1 :: r1s) else fail (`R2_expected_for_r1 r1))) else return files in let as_fragments read1s_filenames = List.mapi read1s_filenames ~f:(fun i r1 -> if paired_end then let r2 = dir // (r1_to_r2 r1) in let r1 = dir // r1 in let fragment_id = sprintf "fragment-%d" i in pe ~fragment_id r1 r2 else let r1 = dir // r1 in let fragment_id = sprintf "fragment-%d" i in se ~fragment_id r1) |> return in let cmd = (sprintf "ls '%s' | sort" dir) in Ketrew.Host_io.(get_shell_command_output (create ()) ~host:host cmd) >>= fun (out, err) -> String.split ~on:(`Character '\n') out |> List.filter ~f:fastq_p |> return >>= ensure_matching_r2s >>= ensure_one_flowcell_max >>= as_fragments end end module Make (Bfx : Semantics.Bioinformatics_base) = struct (** This functions guesses whether to use [fastq] or [fastq_gz] given the file name extension. If [r1] and [r2] dot match, it fails with an exception. *) let fastq_of_files ~sample_name ?fragment_id ~r1 ?r2 () = let is_gz r = Filename.check_suffix r ".gz" || Filename.check_suffix r ".fqz" in match is_gz r1, Option.map ~f:is_gz r2 with | true, None | true, Some true -> let r1 = Bfx.input_url r1 in let r2 = Option.map ~f:Bfx.input_url r2 in Bfx.(fastq_gz ~sample_name ?fragment_id ~r1 ?r2 () |> gunzip) | false, None | false, Some false -> let r1 = Bfx.input_url r1 in let r2 = Option.map ~f:Bfx.input_url r2 in Bfx.(fastq ~sample_name ?fragment_id ~r1 ?r2 ()) | _ -> failwithf "fastq_of_files: cannot handle mixed gzipped \ and non-gzipped fastq pairs (for a given same fragment)" (** Transform a value of type {!Input.t} into a list of BAMs. *) let bam_of_input_exn u = let open Input in match u with | Fastq _ -> failwith "Can't pass Input.t Fastq to bam_of_input_exn" | Bam {bam_sample_name; path; how; sorting; reference_build} -> let f = Bfx.input_url path in Bfx.bam ~sample_name:bam_sample_name ?sorting ~reference_build f (** Transform a value of type {!Input.t} into a pipeline returning FASTQ data, providing the necessary intermediary steps. *) let fastq_of_input u = let open Input in match u with | Bam {bam_sample_name; path; how; sorting; reference_build} -> let f = Bfx.input_url path in let bam = Bfx.bam ~sample_name:bam_sample_name ?sorting ~reference_build f in Bfx.list [Bfx.bam_to_fastq how bam] | Fastq {fastq_sample_name; files} -> let sample_name = fastq_sample_name in List.map files ~f:(fun (fragment_id, source) -> match source with | PE (r1, r2) -> fastq_of_files ~sample_name ?fragment_id ~r1 ~r2 () | SE r -> fastq_of_files ~sample_name ?fragment_id ~r1:r () | Of_bam (how, sorting, reference_build, path) -> let f = Bfx.input_url path in let bam = Bfx.bam ~sample_name ?sorting ~reference_build f in Bfx.bam_to_fastq ?fragment_id how bam ) |> Bfx.list let bwa_mem_opt_inputs_exn inp = let open Input in let is_gz r = Filename.check_suffix r ".gz" || Filename.check_suffix r ".fqz" in let inputs = match inp with | Bam _ -> failwith "Can't pass Input.t Bam to bwa_mem_opt_inputs" | Fastq {fastq_sample_name; files} -> let sample_name = fastq_sample_name in List.map files ~f:(fun (fragment_id, source) -> match source with | PE (r1, r2) when is_gz r1 && is_gz r2 -> `Fastq_gz Bfx.( let r1 = input_url r1 in let r2 = input_url r2 in Bfx.fastq_gz ~sample_name ?fragment_id ~r1 ~r2 () ) | PE (r1, r2) when not (is_gz r1 || is_gz r2) -> `Fastq Bfx.( let r1 = input_url r1 in let r2 = input_url r2 in Bfx.fastq ~sample_name ?fragment_id ~r1 ~r2 () ) | PE _ (* heterogeneous *) -> failwithf "Heterogeneous gzipped / non-gzipped input paired-end \ FASTQs not implemented" | SE r when is_gz r -> `Fastq_gz Bfx.( let r1 = input_url r in Bfx.fastq_gz ~sample_name ?fragment_id ~r1 () ) | SE r (* when not (is_gz r) *) -> `Fastq_gz Bfx.( let r1 = input_url r in Bfx.fastq_gz ~sample_name ?fragment_id ~r1 () ) | Of_bam (how, sorting, reference_build, path) -> let f = Bfx.input_url path in let bam = Bfx.bam ~sample_name ?sorting ~reference_build f in `Bam (bam, how) ) in inputs end end module Semantics = struct module type Lambda_calculus = sig type 'a repr (* representation type *) (* lambda abstract *) val lambda : ('a repr -> 'b repr) -> ('a -> 'b) repr (* application *) val apply : ('a -> 'b) repr -> 'a repr -> 'b repr type 'a observation val observe : (unit -> 'a repr) -> 'a observation end module type Lambda_with_list_operations = sig include Lambda_calculus val list: ('a repr) list -> 'a list repr val list_map: ('a list repr) -> f:('a -> 'b) repr -> ('b list repr) end module type Bioinformatics_base = sig include Lambda_with_list_operations open Biokepi_bfx_tools val pair: 'a repr -> 'b repr -> ('a * 'b) repr val pair_first: ('a * 'b) repr -> 'a repr val pair_second: ('a * 'b) repr -> 'b repr (** This is used to opacify the type of the expression. *) val to_unit: 'a repr -> unit repr val input_url: string -> [ `Raw_file ] repr (** Decrlare an URL as a input to a a pipeline. - If the URL has the scheme ["file://"] or no scheme it will be treated as a “local file” (i.e. local to the {!Biokepi.Machine.t}). - If the URL has the schemes ["http://"] or ["https://"] the file will be downloaded into the work-directory. - If the URL has the scheme ["gs://"] the file will be fetched thanks to ["gsutil"] (Google Cloud's storage buckets management tool). For both ["http(s)://"] and ["gs://"] schemes one can override the local filename, by adding a ["filename"] argument to the query part of the URL. E.g. ["https://data.example.com/my-sample.bam?filename=sample-from-example.bam"]. *) val save : name : string -> 'a repr -> [ `Saved of 'a ] repr (** Tag a product of the EDSL as “interesting to be saved” (the meaning is implementation dependent; when compiling to a Ketrew workflow this means copying the file(s) to another directory. *) val fastq : sample_name : string -> ?fragment_id : string -> r1: [ `Raw_file ] repr -> ?r2: [ `Raw_file ] repr -> unit -> [ `Fastq ] repr val fastq_gz: sample_name : string -> ?fragment_id : string -> r1: [ `Raw_file ] repr -> ?r2: [ `Raw_file ] repr -> unit -> [ `Gz of [ `Fastq ] ] repr val bam : sample_name : string -> ?sorting: [ `Coordinate | `Read_name ] -> reference_build: string -> [ `Raw_file ] repr -> [ `Bam ] repr val bed : [ `Raw_file ] repr -> [ `Bed ] repr (** Input a file containing HLA allelles for Topiary *) val mhc_alleles: [ `File of [ `Raw_file ] repr | `Names of string list] -> [ `MHC_alleles ] repr val index_bam: [ `Bam ] repr -> [ `Bai ] repr val kallisto: reference_build: string -> ?bootstrap_samples: int -> [ `Fastq ] repr -> [ `Kallisto_result ] repr val cufflinks: ?reference_build: string -> [ `Bam ] repr -> [ `Cufflinks_result ] repr val gunzip: [ `Gz of 'a] repr -> 'a repr val gunzip_concat: ([ `Gz of 'a] list) repr -> 'a repr val concat: ('a list) repr -> 'a repr val merge_bams: ?delete_input_on_success: bool -> ?attach_rg_tag: bool -> ?uncompressed_bam_output: bool -> ?compress_level_one: bool -> ?combine_rg_headers: bool -> ?combine_pg_headers: bool -> ([ `Bam ] list) repr -> [ `Bam ] repr val bam_to_fastq: ?fragment_id : string -> [ `SE | `PE ] -> [ `Bam ] repr -> [ `Fastq ] repr val bwa_aln: ?configuration: Biokepi_bfx_tools.Bwa.Configuration.Aln.t -> reference_build: Biokepi_run_environment.Reference_genome.name -> [ `Fastq ] repr -> [ `Bam ] repr val bwa_mem: ?configuration: Biokepi_bfx_tools.Bwa.Configuration.Mem.t -> reference_build: Biokepi_run_environment.Reference_genome.name -> [ `Fastq ] repr -> [ `Bam ] repr (** Optimized version of bwa-mem. *) val bwa_mem_opt: ?configuration: Biokepi_bfx_tools.Bwa.Configuration.Mem.t -> reference_build: Biokepi_run_environment.Reference_genome.name -> [ | `Fastq of [ `Fastq ] repr | `Fastq_gz of [ `Gz of [ `Fastq ] ] repr | `Bam of [ `Bam ] repr * [ `PE | `SE ] ] -> [ `Bam ] repr val star: ?configuration: Star.Configuration.Align.t -> reference_build: Biokepi_run_environment.Reference_genome.name -> [ `Fastq ] repr -> [ `Bam ] repr val hisat: ?configuration: Hisat.Configuration.t -> reference_build: Biokepi_run_environment.Reference_genome.name -> [ `Fastq ] repr -> [ `Bam ] repr val mosaik: reference_build: Biokepi_run_environment.Reference_genome.name -> [ `Fastq ] repr -> [ `Bam ] repr val stringtie: ?configuration: Stringtie.Configuration.t -> [ `Bam ] repr -> [ `Gtf ] repr val gatk_indel_realigner: ?configuration : Gatk.Configuration.indel_realigner -> [ `Bam ] repr -> [ `Bam ] repr val gatk_indel_realigner_joint: ?configuration : Gatk.Configuration.indel_realigner -> ([ `Bam ] * [ `Bam ]) repr -> ([ `Bam ] * [ `Bam ]) repr val picard_mark_duplicates: ?configuration : Picard.Mark_duplicates_settings.t -> [ `Bam ] repr -> [ `Bam ] repr val picard_reorder_sam: ?mem_param : string -> ?reference_build : string -> [ `Bam ] repr -> [ `Bam ] repr val picard_clean_bam: [ `Bam ] repr -> [ `Bam ] repr val gatk_bqsr: ?configuration : Gatk.Configuration.bqsr -> [ `Bam ] repr -> [ `Bam ] repr val seq2hla: [ `Fastq ] repr -> [ `Seq2hla_result ] repr val optitype: [`DNA | `RNA] -> [ `Fastq ] repr -> [ `Optitype_result ] repr val hlarp: [ `Optitype of [`Optitype_result] repr | `Seq2hla of [`Seq2hla_result] repr] -> [ `MHC_alleles ] repr val filter_to_region: [ `Vcf ] repr -> [ `Bed ] repr -> [ `Vcf ] repr (** FreeBayes' bamleftalign utility, which left-normalizes indels. That is, indels which could be aligned multiple ways (e.g. AAA_G_GAA is the same as AAAG_G_AA) are moved to the left, as in the first example. This is so that they can be treated uniformly in post-processing. *) val bam_left_align: reference_build: string -> [ `Bam ] repr -> [ `Bam ] repr (** Sambamba's view filter tool, used to filter down a BAM to one with reads matching some predicate (filter language at https://github.com/lomereiter/sambamba/wiki/%5Bsambamba-view%5D-Filter-expression-syntax). *) val sambamba_filter: filter: Sambamba.Filter.t -> [ `Bam ] repr -> [ `Bam ] repr val gatk_haplotype_caller: [ `Bam ] repr -> [ `Vcf ] repr val mutect: ?configuration: Biokepi_bfx_tools.Mutect.Configuration.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val mutect2: ?configuration: Gatk.Configuration.Mutect2.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val somaticsniper: ?configuration: Somaticsniper.Configuration.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val delly2: ?configuration: Delly2.Configuration.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr (** Run delly2 on a tumor/normal sample. *) val varscan_somatic: ?adjust_mapq : int -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val strelka: ?configuration: Strelka.Configuration.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val virmid: ?configuration: Virmid.Configuration.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val muse: ?configuration: Muse.Configuration.t -> normal: [ `Bam ] repr -> tumor: [ `Bam ] repr -> unit -> [ `Vcf ] repr val fastqc: [ `Fastq ] repr -> [ `Fastqc ] repr (** Call the FASTQC tool (the result is an output directory custom to the tool). *) val flagstat: [ `Bam ] repr -> [ `Flagstat ] repr val vcf_annotate_polyphen: [ `Vcf ] repr -> [ `Vcf ] repr val snpeff: [ `Vcf ] repr -> [ `Vcf ] repr val isovar: ?configuration: Isovar.Configuration.t -> [ `Vcf ] repr -> [ `Bam ] repr -> [ `Isovar ] repr val topiary: ?configuration: Topiary.Configuration.t -> [ `Vcf ] repr list -> Biokepi_run_environment.Hla_utilities.predictor_type -> [ `MHC_alleles ] repr -> [ `Topiary ] repr val vaxrank: ?configuration: Vaxrank.Configuration.t -> [ `Vcf ] repr list -> [ `Bam ] repr -> Biokepi_run_environment.Hla_utilities.predictor_type -> [ `MHC_alleles ] repr -> [ `Vaxrank ] repr val seqtk_shift_phred_scores: [ `Fastq ] repr -> [ `Fastq ] repr end end module To_display : sig (** Compiler from EDSL representations to {!SmartPrint.t} pretty printing. *) include Semantics.Bioinformatics_base with type 'a repr = var_count: int -> SmartPrint.t and type 'a observation = SmartPrint.t end = struct open Nonstd module Tools = Biokepi_bfx_tools module SP = SmartPrint module OCaml = SmartPrint.OCaml let entity sp = SP.(nest (parens (sp))) type 'a repr = var_count: int -> SP.t type 'a observation = SP.t let lambda f = fun ~var_count -> let var_name = sprintf "var%d" var_count in let var_repr = fun ~var_count -> SP.string var_name in let applied = f var_repr ~var_count:(var_count + 1) in entity SP.(string "λ" ^^ string var_name ^^ string "→" ^^ applied) let apply f v = fun ~var_count -> entity (SP.separate SP.space [f ~var_count; v ~var_count]) let observe f = f () ~var_count:0 let to_unit x = fun ~var_count -> entity SP.(x ~var_count ^^ string ":> unit") let list l = fun ~var_count -> SP.nest (OCaml.list (fun a -> a ~var_count) l) let list_map l ~f = let open SP in fun ~var_count -> entity ( string "List.map" ^^ nest (string "~f:" ^^ f ~var_count) ^^ l ~var_count ) include To_json.Make_serializer (struct type t = SP.t let input_value name kv = let open SP in fun ~var_count -> entity ( ksprintf string "input-%s" name ^^ (OCaml.list (fun (k, v) -> parens (string k ^-^ string ":" ^^ string v)) kv) ) let function_call name params = let open SP in entity ( ksprintf string "%s" name ^^ (OCaml.list (fun (k, v) -> parens (string k ^-^ string ":" ^^ v)) params) ) let string = SP.string end) end module To_dot = struct open Nonstd let failwithf fmt = ksprintf failwith fmt type parameters = { color_input: name: string -> attributes: (string * string) list -> string option; } let default_parameters = { color_input = (fun ~name ~attributes -> None); } module Tree = struct type box = { id: string; name : string; attributes: (string * string) list} type arrow = { label: string; points_to: t } and t = [ | `Variable of string * string | `Lambda of string * string * t | `Apply of string * t * t | `String of string | `Input_value of box | `Node of box * arrow list ] let node_count = ref 0 let id_style = `Structural module Index_anything = struct (** This an implementation of an almost bijection: 'a -> int It compares values structurally (with [(=)]), and assigns an integer, unique over the execution of the program. It replaces [Hashtbl.hash] for which we were hitting annoying collisions. *) type e = E: 'a -> e let count = ref 0 let nodes : (e * int) list ref = ref [] let get v = match List.find !nodes ~f:(fun (ee, _) -> ee = E v) with | Some (_, i) -> i | None -> incr count; nodes := (E v, !count) :: !nodes; !count end let make_id a = match a, id_style with | _, `Unique | `Unique, _ -> incr node_count; sprintf "node%03d" !node_count | `Of v, `Structural -> Index_anything.get v |> sprintf "id%d" let arrow label points_to = {label; points_to} (* [id] is just an argument used for hashing an identifier *) let variable id name = `Variable (make_id (`Of id), name) let lambda varname expr = `Lambda (make_id (`Of (varname, expr)), varname, expr) let apply f v = `Apply (make_id (`Of (f,v)), f, v) let string s = `String s let node ?id ?(a = []) name l : t = let id = match id with | Some i -> i | None -> make_id (`Of (name, a, l)) in `Node ({id; name; attributes = a}, l) let input_value ?(a = []) name : t = let id = make_id (`Of (name, a)) in `Input_value {id; name; attributes = a} let to_dot t ~parameters = let open SmartPrint in let semicolon = string ";" in let sentence sp = sp ^-^ semicolon ^-^ newline in let dot_attributes l = brakets ( List.map l ~f:(fun (k, v) -> string k ^^ string "=" ^^ v) |> separate (semicolon) ) in let in_quotes s = ksprintf string "\"%s\"" s in let label_attribute lab = ("label", in_quotes lab) in let font_name `Mono = ("fontname", in_quotes "monospace") in let font_size = function | `Small -> ("fontsize", in_quotes "12") | `Default -> ("fontsize", in_quotes "16") in let dot_arrow src dest = string src ^^ string "->" ^^ string dest in let id_of = function | `Lambda (id, _, _) -> id | `Apply (id, _, _) -> id | `Variable (id, _) -> id | `String s -> assert false | `Input_value {id; _} -> id | `Node ({id; _}, _) -> id in let label name attributes = match attributes with | [] -> name | _ -> sprintf "{%s | %s\\l }" name (List.map attributes ~f:(fun (k,v) -> sprintf "%s: %s" k v) |> String.concat "\\l") in let one o = [o] in let rec go = function | `Variable (_, s) as v -> let id = id_of v in sentence ( string id ^^ dot_attributes [ label_attribute (label s []); font_name `Mono; "shape", in_quotes "hexagon"; ] ) |> one | `Lambda (id, v, expr) -> [ (* To be displayed subgraphs need to be called “clusterSomething” *) string "subgraph" ^^ ksprintf string "cluster_%s" id ^^ braces ( ( sentence (string "color=grey") :: sentence (string "style=rounded") :: sentence (string "penwidth=4") :: go (node ~id (sprintf "Lambda %s" v) [arrow "Expr" expr]) ) |> List.dedup |> separate empty ) ^-^ newline ] | `Apply (id, f, v) -> go (node ~id "Apply F(X)" [arrow "F" f; arrow "X" v]) | `String s -> failwithf "`String %S -> should have been eliminated" s | `Input_value {id; name; attributes} -> let color = parameters.color_input ~name ~attributes |> Option.value ~default:"black" in sentence ( string id ^^ dot_attributes [ label_attribute (label name attributes); font_name `Mono; "shape", in_quotes "Mrecord"; "color", in_quotes color; ] ) |> one | `Node ({id; name; attributes}, trees) -> sentence ( string id ^^ dot_attributes [ label_attribute (label name attributes); font_name `Mono; "shape", in_quotes "Mrecord"; ] ) :: List.concat_map trees ~f:(fun {label; points_to} -> sentence ( dot_arrow (id_of points_to) id ^^ dot_attributes [ label_attribute label; font_size `Small; ] ) :: go points_to ) in let dot = string "digraph target_graph" ^^ braces (nest ( sentence ( string "graph" ^^ dot_attributes [ "rankdir", in_quotes "LR"; font_size `Default; ] ) ^-^ (go t |> List.dedup |> separate empty) )) in dot end type 'a repr = var_count: int -> Tree.t type 'a observation = parameters: parameters -> SmartPrint.t let lambda f = fun ~var_count -> let var_name = sprintf "Var_%d" var_count in (* Here is the hack that makes nodes containing variables “semi-unique.” [var_repr_fake] is a first version of the variable representation, that we feed to the the function [f] a first time. This resulting tree is used to create the real variable representation, the first argument [applied_once] will be hashed to create a unique-for-this-subtree identifier. Finally get the normal [applied] subtree and feed it to [Tree.lambda]. *) let var_repr_fake = fun ~var_count -> Tree.string var_name in let applied_once = f var_repr_fake ~var_count:(var_count + 1) in let var_repr = fun ~var_count -> Tree.variable applied_once var_name in let applied = f var_repr ~var_count:(var_count + 1) in Tree.lambda var_name applied let apply f v = fun ~var_count -> let func = f ~var_count in let arg = v ~var_count in Tree.apply func arg let observe f = f () ~var_count:0 |> Tree.to_dot let to_unit x = x let list l = fun ~var_count -> Tree.node "List.make" (List.mapi ~f:(fun i a -> Tree.arrow (sprintf "L%d" i) (a ~var_count)) l) let list_map l ~f = fun ~var_count -> Tree.node "List.map" [ Tree.arrow "list" (l ~var_count); Tree.arrow "function" (f ~var_count); ] include To_json.Make_serializer (struct type t = Tree.t let input_value name a ~var_count = Tree.input_value name ~a let function_call name params = let a, arrows = List.partition_map params ~f:(fun (k, v) -> match v with | `String s -> `Fst (k, s) | _ -> `Snd (k, v) ) in Tree.node ~a name (List.map ~f:(fun (k,v) -> Tree.arrow k v) arrows) let string s = Tree.string s end) end module To_json = struct open Nonstd module Tools = Biokepi_bfx_tools module Hla_utils = Biokepi_run_environment.Hla_utilities type json = Yojson.Basic.json type 'a repr = var_count : int -> json type 'a observation = json let lambda f = fun ~var_count -> let var_name = sprintf "var%d" var_count in let var_repr = fun ~var_count -> `String var_name in let applied = f var_repr ~var_count:(var_count + 1) in `Assoc [ "lambda", `String var_name; "body", applied; ] let apply f v = fun ~var_count -> let func = f ~var_count in let arg = v ~var_count in `Assoc [ "function", func; "argument", arg; ] let observe f = f () ~var_count:0 let to_unit j = j let list l = fun ~var_count -> `List (List.map ~f:(fun a -> a ~var_count) l) let list_map l ~f = fun ~var_count -> `Assoc [ "list-map", f ~var_count; "argument", l ~var_count; ] module Make_serializer (How : sig type t val input_value : string -> (string * string) list -> var_count:int -> t val function_call : string -> (string * t) list -> t val string : string -> t end) = struct open How let input_url u = input_value "Input" [ "URL", u; ] let save ~name thing = fun ~var_count -> function_call name (["save-as", string name; "element", thing ~var_count]) let fastq_or_gz name ~sample_name ?fragment_id ~r1 ?r2 () = fun ~var_count -> function_call name ( [ "sample_name", string sample_name; "fragment_id", string (Option.value ~default:"NONE" fragment_id); "R1", r1 ~var_count; ] @ Option.value_map ~default:[] ~f:(fun r2 -> ["R2", r2 ~var_count]) r2 ) let fastq ~sample_name ?fragment_id ~r1 ?r2 () = fastq_or_gz "fastq" ~sample_name ?fragment_id ~r1 ?r2 () let fastq_gz ~sample_name ?fragment_id ~r1 ?r2 () = fastq_or_gz "fastq-gz" ~sample_name ?fragment_id ~r1 ?r2 () let bam ~sample_name ?sorting ~reference_build input = fun ~var_count -> function_call "bam" [ "sample_name", string sample_name; "sorting", string ( Option.value_map ~default:"NONE" sorting ~f:(function `Coordinate -> "Coordinate" | `Read_name -> "Read-name") ); "reference_build", string reference_build; "file", input ~var_count; ] let bed file = fun ~var_count -> function_call "bed" ["path", file ~var_count] let mhc_alleles = function | `Names sl -> input_value "mhc_alleles" [ "inline", (String.concat ", " sl) ] | `File p -> fun ~var_count -> function_call "mhc_alleles" [ "file", p ~var_count; ] let pair a b ~(var_count : int) = function_call "make-pair" [ "first", a ~var_count; "second", b ~var_count; ] let pair_first p ~(var_count : int) = function_call "pair-first" [ "pair", p ~var_count; ] let pair_second p ~(var_count : int) = function_call "pair-second" [ "pair", p ~var_count; ] let aligner name conf_name ~reference_build fq : var_count: int -> How.t = fun ~var_count -> let fq_compiled = fq ~var_count in function_call name [ "configuration", string conf_name; "reference_build", string reference_build; "input", fq_compiled; ] let one_to_one name conf_name input_file = fun ~(var_count : int) -> let input_file_compiled = input_file ~var_count in function_call name [ "configuration", string conf_name; "input", input_file_compiled; ] let bwa_aln ?(configuration = Tools.Bwa.Configuration.Aln.default) = aligner "bwa-aln" (Tools.Bwa.Configuration.Aln.name configuration) let bwa_mem ?(configuration = Tools.Bwa.Configuration.Mem.default) = aligner "bwa-mem" (Tools.Bwa.Configuration.Mem.name configuration) let bwa_mem_opt ?(configuration = Tools.Bwa.Configuration.Mem.default) ~reference_build input = match input with | `Fastq f -> aligner "bwa-mem-opt-fq" (Tools.Bwa.Configuration.Mem.name configuration) ~reference_build f | `Fastq_gz f -> aligner "bwa-mem-opt-fqz" (Tools.Bwa.Configuration.Mem.name configuration) ~reference_build f | `Bam (f, _) -> aligner "bwa-mem-opt-bam" (Tools.Bwa.Configuration.Mem.name configuration) ~reference_build f let gunzip gz ~(var_count : int) = function_call "gunzip" ["input", gz ~var_count] let gunzip_concat gzl ~(var_count : int) = function_call "gunzip-concat" ["input-list", gzl ~var_count] let concat l ~(var_count : int) = function_call "concat" ["input-list", l ~var_count] let merge_bams ?delete_input_on_success ?attach_rg_tag ?uncompressed_bam_output ?compress_level_one ?combine_rg_headers ?combine_pg_headers bl ~(var_count : int) = function_call "merge-bams" ["input-list", bl ~var_count] let star ?(configuration = Tools.Star.Configuration.Align.default) = aligner "star" (Tools.Star.Configuration.Align.name configuration) let hisat ?(configuration = Tools.Hisat.Configuration.default_v1) = aligner "hisat" (configuration.Tools.Hisat.Configuration.name) let mosaik = aligner "mosaik" "default" let stringtie ?(configuration = Tools.Stringtie.Configuration.default) = one_to_one "stringtie" configuration.Tools.Stringtie.Configuration.name let bam_left_align ~reference_build = one_to_one "bam_left_align" reference_build let sambamba_filter ~filter = one_to_one "filter" (Tools.Sambamba.Filter.to_string filter) let indel_real_config (indel, target) = (sprintf "I%s-TC%s" indel.Tools.Gatk.Configuration.Indel_realigner.name target.Tools.Gatk.Configuration.Realigner_target_creator.name) let gatk_indel_realigner ?(configuration = Tools.Gatk.Configuration.default_indel_realigner) = one_to_one "gatk_indel_realigner" (indel_real_config configuration) let gatk_indel_realigner_joint ?(configuration = Tools.Gatk.Configuration.default_indel_realigner) = one_to_one "gatk_indel_realigner_joint" (indel_real_config configuration) let picard_mark_duplicates ?(configuration = Tools.Picard.Mark_duplicates_settings.default) = one_to_one "picard_mark_duplicates" configuration.Tools.Picard.Mark_duplicates_settings.name let picard_reorder_sam ?mem_param ?reference_build = one_to_one "picard_reorder_sam" (sprintf "ref-%s" (match reference_build with None -> "inputs" | Some r -> r)) let picard_clean_bam = one_to_one "picard_clean_bam" "default" let gatk_bqsr ?(configuration = Tools.Gatk.Configuration.default_bqsr) = let (bqsr, preads) = configuration in one_to_one "gatk_bqsr" (sprintf "B%s-PR%s" bqsr.Tools.Gatk.Configuration.Bqsr.name preads.Tools.Gatk.Configuration.Print_reads.name) let seq2hla = one_to_one "seq2hla" "default" let hlarp input = let name, v = match input with | `Seq2hla f -> "hlarp-seq2hla", f | `Optitype f -> "hlarp-optitype", f in one_to_one name "hlarp" v let filter_to_region vcf bed = fun ~(var_count: int) -> function_call "filter_to_region" ["bed", bed ~var_count; "vcf", vcf ~var_count] let index_bam = one_to_one "index_bam" "default" let kallisto ~reference_build ?bootstrap_samples = let samples = match bootstrap_samples with | None -> "default" | Some s -> sprintf "%d" s in one_to_one "kallisto" (sprintf "%s-samples:%s" reference_build samples) let cufflinks ?reference_build = let build = match reference_build with | None -> "bam's build" | Some s -> s in one_to_one "cufflinks" (sprintf "build:%s" build) let fastqc = one_to_one "fastqc" "default" let flagstat = one_to_one "flagstat" "default" let vcf_annotate_polyphen = one_to_one "vcf_annotate_polyphen" "default" let snpeff = one_to_one "snpeff" "default" let isovar ?(configuration = Tools.Isovar.Configuration.default) vcf bam = fun ~(var_count : int) -> let vcf_compiled = vcf ~var_count in let bam_compiled = bam ~var_count in function_call "isovar" [ "configuration", string Tools.Isovar.Configuration.(name configuration); "vcf", vcf_compiled; "bam", bam_compiled; ] let topiary ?(configuration = Tools.Topiary.Configuration.default) vcfs predictor alleles = fun ~(var_count : int) -> let vcfs_compiled = List.map vcfs ~f:(fun v -> v ~var_count) in function_call "topiary" ([ "configuration", string Tools.Topiary.Configuration.(name configuration); "alleles", alleles ~var_count; "predictor", string Hla_utils.(predictor_to_string predictor); ] @ (List.mapi ~f:(fun i v -> (sprintf "vcf%d" i, v)) vcfs_compiled)) let vaxrank ?(configuration = Tools.Vaxrank.Configuration.default) vcfs bam predictor alleles = fun ~(var_count : int) -> let vcfs_compiled = List.map vcfs ~f:(fun v -> v ~var_count) in let bam_compiled = bam ~var_count in function_call "vaxrank" ([ "configuration", string Tools.Vaxrank.Configuration.(name configuration); "alleles", alleles ~var_count; "predictor", string Hla_utils.(predictor_to_string predictor); "bam", bam_compiled; ] @ (List.mapi ~f:(fun i v -> (sprintf "vcf%d" i, v)) vcfs_compiled)) let optitype how = one_to_one "optitype" (match how with `DNA -> "DNA" | `RNA -> "RNA") let gatk_haplotype_caller = one_to_one "gatk_haplotype_caller" "default" let bam_to_fastq ?fragment_id se_or_pe bam = fun ~(var_count : int) -> let bamc = bam ~var_count in function_call "bam_to_fastq" [ "fragment_id", Option.value_map ~f:string ~default:(string "N/A") fragment_id; "endness", string ( match se_or_pe with | `SE -> "SE" | `PE -> "PE" ); "input", bamc; ] let variant_caller name conf_name ~normal ~tumor () ~(var_count : int) = function_call name [ "configuration", string conf_name; "normal", normal ~var_count; "tumor", tumor ~var_count; ] let mutect ?(configuration = Tools.Mutect.Configuration.default) = variant_caller "mutect" configuration.Tools.Mutect.Configuration.name let mutect2 ?(configuration = Tools.Gatk.Configuration.Mutect2.default) = variant_caller "mutect2" configuration.Tools.Gatk.Configuration.Mutect2.name let delly2 ?(configuration=Tools.Delly2.Configuration.default) = variant_caller "delly2" configuration.Tools.Delly2.Configuration.name let somaticsniper ?(configuration = Tools.Somaticsniper.Configuration.default) = variant_caller "somaticsniper" configuration.Tools.Somaticsniper.Configuration.name let strelka ?(configuration = Tools.Strelka.Configuration.default) = variant_caller "strelka" configuration.Tools.Strelka.Configuration.name let varscan_somatic ?adjust_mapq = variant_caller "varscan_somatic" (Option.value_map ~f:(sprintf "amap%d") ~default:"default" adjust_mapq) let muse ?(configuration = Tools.Muse.Configuration.default `WGS) = variant_caller "muse" configuration.Tools.Muse.Configuration.name let virmid ?(configuration = Tools.Virmid.Configuration.default) = variant_caller "virmid" configuration.Tools.Virmid.Configuration.name let seqtk_shift_phred_scores = one_to_one "seqtk_shift_phred_scores" "default" end include Make_serializer (struct type t = json let input_value name kv = fun ~var_count -> `Assoc ( ("input-value", `String name) :: List.map kv ~f:(fun (k, v) -> k, `String v) ) let function_call name params = `Assoc ( ("function-call", `String name) :: params) let string s = `String s end) end module To_workflow = struct open Nonstd module String = Sosa.Native_string let (//) = Filename.concat module Name_file = Biokepi_run_environment.Common.Name_file (** The link between {!Biokepi.KEDSL} values and EDSL expression types. *) module File_type_specification = struct open Biokepi_run_environment.Common.KEDSL type t = .. type t += | To_unit: t -> t | Fastq: fastq_reads workflow_node -> t | Bam: bam_file workflow_node -> t | Vcf: vcf_file workflow_node -> t | Bed: single_file workflow_node -> t | Gtf: single_file workflow_node -> t | Seq2hla_result: Biokepi_bfx_tools.Seq2HLA.product workflow_node -> t | Optitype_result: Biokepi_bfx_tools.Optitype.product workflow_node -> t | Fastqc_result: list_of_files workflow_node -> t | Flagstat_result: single_file workflow_node -> t | Isovar_result: single_file workflow_node -> t | Topiary_result: single_file workflow_node -> t | Vaxrank_result: Biokepi_bfx_tools.Vaxrank.product workflow_node -> t | MHC_alleles: single_file workflow_node -> t | Bai: single_file workflow_node -> t | Kallisto_result: single_file workflow_node -> t | Cufflinks_result: single_file workflow_node -> t | Raw_file: single_file workflow_node -> t | Gz: t -> t | List: t list -> t | Pair: t * t -> t | Lambda: (t -> t) -> t let to_string_functions : (t -> string option) list ref = ref [] let add_to_string f = to_string_functions := f :: !to_string_functions let rec to_string : type a. t -> string = function | To_unit a -> sprintf "(to_unit %s)" (to_string a) | Fastq _ -> "Fastq" | Bam _ -> "Bam" | Vcf _ -> "Vcf" | Bed _ -> "Bed" | Gtf _ -> "Gtf" | Seq2hla_result _ -> "Seq2hla_result" | Optitype_result _ -> "Optitype_result" | Fastqc_result _ -> "Fastqc_result" | Flagstat_result _ -> "Flagstat_result" | Isovar_result _ -> "Isovar_result" | Topiary_result _ -> "Topiary_result" | Vaxrank_result _ -> "Vaxrank_result" | MHC_alleles _ -> "MHC_alleles" | Bai _ -> "Bai" | Kallisto_result _ -> "Kallisto_result" | Cufflinks_result _ -> "Cufflinks_result" | Raw_file _ -> "Input_url" | Gz a -> sprintf "(gz %s)" (to_string a) | List l -> sprintf "[%s]" (List.map l ~f:to_string |> String.concat ~sep:"; ") | Pair (a, b) -> sprintf "(%s, %s)" (to_string a) (to_string b) | Lambda _ -> "--LAMBDA--" | other -> begin match List.find_map !to_string_functions ~f:(fun f -> f other) with | None -> "##UNKNOWN##" | Some s -> s end let fail_get other name = ksprintf failwith "Error while extracting File_type_specification.t \ (%s case, in %s), this usually means that the type \ has been wrongly extended" (to_string other) name let get_fastq : t -> fastq_reads workflow_node = function | Fastq b -> b | o -> fail_get o "Fastq" let get_bam : t -> bam_file workflow_node = function | Bam b -> b | o -> fail_get o "Bam" let get_vcf : t -> vcf_file workflow_node = function | Vcf v -> v | o -> fail_get o "Vcf" let get_bed : t -> single_file workflow_node = function | Bed v -> v | o -> fail_get o "Bed" let get_gtf : t -> single_file workflow_node = function | Gtf v -> v | o -> fail_get o "Gtf" let get_raw_file : t -> single_file workflow_node = function | Raw_file v -> v | o -> fail_get o "Raw_file" let get_seq2hla_result : t -> Biokepi_bfx_tools.Seq2HLA.product workflow_node = function | Seq2hla_result v -> v | o -> fail_get o "Seq2hla_result" let get_fastqc_result : t -> list_of_files workflow_node = function | Fastqc_result v -> v | o -> fail_get o "Fastqc_result" let get_flagstat_result : t -> single_file workflow_node = function | Flagstat_result v -> v | o -> fail_get o "Flagstat_result" let get_isovar_result : t -> single_file workflow_node = function | Isovar_result v -> v | o -> fail_get o "Isovar_result" let get_topiary_result : t -> single_file workflow_node = function | Topiary_result v -> v | o -> fail_get o "Topiary_result" let get_vaxrank_result : t -> Biokepi_bfx_tools.Vaxrank.product workflow_node = function | Vaxrank_result v -> v | o -> fail_get o "Vaxrank_result" let get_mhc_alleles : t -> single_file workflow_node = function | MHC_alleles v -> v | o -> fail_get o "Topiary_result" let get_bai : t -> single_file workflow_node = function | Bai v -> v | o -> fail_get o "Bai" let get_kallisto_result : t -> single_file workflow_node = function | Kallisto_result v -> v | o -> fail_get o "Kallisto_result" let get_cufflinks_result : t -> single_file workflow_node = function | Cufflinks_result v -> v | o -> fail_get o "Cufflinks_result" let get_optitype_result : t -> Biokepi_bfx_tools.Optitype.product workflow_node = function | Optitype_result v -> v | o -> fail_get o "Optitype_result" let pair_fst = function | Pair (a, _) -> a | other -> fail_get other "Pair-fst" let pair_snd = function | Pair (_, b) -> b | other -> fail_get other "Pair-snd" let get_gz : t -> t = function | Gz v -> v | o -> fail_get o "Gz" let get_list : t -> t list = function | List v -> v | o -> fail_get o "List" let to_deps_functions : (t -> workflow_edge list option) list ref = ref [] let add_to_dependencies_edges_function f = to_deps_functions := f :: !to_deps_functions let rec as_dependency_edges : type a. t -> workflow_edge list = let one_depends_on wf = [depends_on wf] in function | To_unit v -> as_dependency_edges v | Fastq wf -> one_depends_on wf | Bam wf -> one_depends_on wf | Vcf wf -> one_depends_on wf | Gtf wf -> one_depends_on wf | Seq2hla_result wf -> one_depends_on wf | Fastqc_result wf -> one_depends_on wf | Flagstat_result wf -> one_depends_on wf | Isovar_result wf -> one_depends_on wf | Optitype_result wf -> one_depends_on wf | Topiary_result wf -> one_depends_on wf | Vaxrank_result wf -> one_depends_on wf | Cufflinks_result wf -> one_depends_on wf | Bai wf -> one_depends_on wf | Kallisto_result wf -> one_depends_on wf | MHC_alleles wf -> one_depends_on wf | Raw_file w -> one_depends_on w | List l -> List.concat_map l ~f:as_dependency_edges | Pair (a, b) -> as_dependency_edges a @ as_dependency_edges b | other -> begin match List.find_map !to_deps_functions ~f:(fun f -> f other) with | None -> fail_get other "as_dependency_edges" | Some s -> s end let get_unit_workflow : name: string -> t -> unknown_product workflow_node = fun ~name f -> match f with | To_unit v -> workflow_node without_product ~name ~edges:(as_dependency_edges v) | other -> fail_get other "get_unit_workflow" end open Biokepi_run_environment module type Compiler_configuration = sig val work_dir : string (** Directory where all work product done by the TTFI end up. *) val results_dir : string option (** Directory where `save` files will end up. *) val machine : Machine.t (** Biokepi machine used by the TTFI workflow. *) val map_reduce_gatk_indel_realigner : bool (** Whether or not to scatter-gather the indel-realigner results. *) val input_files: [ `Copy | `Link | `Do_nothing ] (** What to do with input files: copy or link them to the work directory, or do nothing. Doing nothing means letting some tools like ["samtools sort"] write in the input-file's directory. *) end module Defaults = struct let map_reduce_gatk_indel_realigner = true let input_files = `Link let results_dir = None end module Provenance_description = struct type t = { name: string; sub_tree_arguments: (string * t) list; string_arguments: (string * string) list; json_arguments: (string * Yojson.Basic.json) list; } let rec to_yojson t : Yojson.Basic.json = let fields = List.concat [ List.map t.sub_tree_arguments ~f:(fun (k, v) -> k, to_yojson v); List.map t.string_arguments ~f:(fun (k, v) -> k, `String v); t.json_arguments; ] in `Assoc (("node-name", `String t.name) :: fields) end module Annotated_file = struct type t = { file: File_type_specification.t; provenance: Provenance_description.t; functional: (t -> Provenance_description.t) option; } let with_provenance ?functional ?(string_arguments = []) ?(json_arguments = []) name arguments file = { file; provenance = { Provenance_description. name; sub_tree_arguments = arguments; string_arguments; json_arguments}; functional; } let get_file t = t.file let get_provenance t = t.provenance let get_functional_provenance t = t.functional |> Option.value_exn ~msg:"get_functional_provenance" end module Saving_utilities = struct (** Compute the base-path used by the [EDSL.save] function. *) let base_path ?results_dir ~work_dir ~name () = let name = String.map name ~f:(function | '0' .. '9' | 'a' .. 'z' | 'A' .. 'Z' | '-' | '_' as c -> c | other -> '_') in match results_dir with | None -> work_dir // "results" // name | Some r -> r // name end open Biokepi_run_environment.Common.KEDSL let get_workflow : name: string -> Annotated_file.t -> unknown_product workflow_node = fun ~name f -> let v = Annotated_file.get_file f in workflow_node without_product ~name ~edges:(File_type_specification.as_dependency_edges v) module Make (Config : Compiler_configuration) : Semantics.Bioinformatics_base with type 'a repr = Annotated_file.t and type 'a observation = Annotated_file.t = struct open File_type_specification (* open Annotated_file *) module AF = Annotated_file module Tools = Biokepi_bfx_tools module KEDSL = Common.KEDSL let failf fmt = ksprintf failwith fmt type 'a repr = Annotated_file.t type 'a observation = Annotated_file.t let observe : (unit -> 'a repr) -> 'a observation = fun f -> f () let lambda : ('a repr -> 'b repr) -> ('a -> 'b) repr = fun f -> Lambda (fun x -> let annot = f (x |> AF.with_provenance "variable" []) in AF.get_file annot ) |> AF.with_provenance "lamda" [] ~functional:(fun x -> f x |> AF.get_provenance) let apply : ('a -> 'b) repr -> 'a repr -> 'b repr = fun f_repr x -> let annot_f = AF.get_functional_provenance f_repr in let annot_x = AF.get_provenance x in match AF.get_file f_repr with | Lambda f -> f (AF.get_file x) |> AF.with_provenance "apply" [ "function", annot_f x; "argument", annot_x; ] | _ -> assert false let list : 'a repr list -> 'a list repr = fun l -> let ann = List.mapi ~f:(fun i x -> sprintf "element_%d" i, AF.get_provenance x) l in List (List.map l ~f:AF.get_file) |> AF.with_provenance "list" ann let list_map : 'a list repr -> f:('a -> 'b) repr -> 'b list repr = fun l ~f -> let ann_l = AF.get_provenance l in (* let ann_f = AF.get_functional_provenance f in *) match AF.get_file l with | List l -> let applied_annotated = List.map ~f:(fun v -> apply f (v |> AF.with_provenance "X" [])) l in let prov = ("list", ann_l) :: List.map applied_annotated ~f:(fun x -> "applied", AF.get_provenance x) in List (List.map applied_annotated ~f:AF.get_file) |> AF.with_provenance "list-map" prov | _ -> assert false let pair a b = Pair (AF.get_file a, AF.get_file b) |> AF.with_provenance "pair" ["left", AF.get_provenance a; "right", AF.get_provenance b] let pair_first x = match AF.get_file x with | Pair (a, _) -> a |> AF.with_provenance "pair-first" ["pair", AF.get_provenance x] | other -> fail_get other "Pair" let pair_second x = match AF.get_file x with | Pair (_, b) -> b |> AF.with_provenance "pair-second" ["pair", AF.get_provenance x] | other -> fail_get other "Pair" let to_unit x = To_unit (AF.get_file x) |> AF.with_provenance "to-unit" ["argument", AF.get_provenance x] let host = Machine.as_host Config.machine let run_with = Config.machine let deal_with_input_file (ifile : _ KEDSL.workflow_node) ~make_product = let open KEDSL in let new_path = Name_file.in_directory Config.work_dir ~readable_suffix:(Filename.basename ifile#product#path) [ifile#product#path] in let make = Machine.quick_run_program Config.machine Program.( shf "mkdir -p %s" Config.work_dir && ( match Config.input_files with | `Link -> shf "cd %s" Config.work_dir && shf "ln -s %s %s" ifile#product#path new_path | `Copy -> shf "cp %s %s" ifile#product#path new_path | `Do_nothing -> shf "echo 'No input action on %s'" ifile#product#path ) ) in let host = ifile#product#host in let product = match Config.input_files with | `Do_nothing -> ifile#product | `Link | `Copy -> make_product new_path in let name = sprintf "Input file %s (%s)" (Filename.basename new_path) (match Config.input_files with | `Link -> "link" | `Copy -> "copy" | `Do_nothing -> "as-is") in let ensures = `Is_verified Condition.( chain_and [ volume_exists Volume.(create ~host ~root:Config.work_dir (dir "." [])); product#is_done |> Option.value_exn ~msg:"File without is_done?"; ] ) in workflow_node product ~ensures ~name ~make ~edges:[depends_on ifile] let input_url url = let open KEDSL in let uri = Uri.of_string url in let path_of_uri uri = let basename = match Uri.get_query_param uri "filename" with | Some f -> f | None -> Digest.(string url |> to_hex) ^ (Uri.path uri |> Filename.basename) in Config.work_dir // basename in begin match Uri.scheme uri with | None | Some "file" -> let raw_file = workflow_node (Uri.path uri |> single_file ~host) ~name:(sprintf "Input file: %s" url) in Raw_file (deal_with_input_file raw_file (single_file ~host)) | Some "http" | Some "https" -> let path = path_of_uri uri in Raw_file Biokepi_run_environment.( Workflow_utilities.Download.wget ~host ~run_program:(Machine.run_download_program Config.machine) url path ) | Some "gs" -> let path = path_of_uri uri in Raw_file Biokepi_run_environment.( Workflow_utilities.Download.gsutil_cp ~host ~run_program:(Machine.run_download_program Config.machine) ~url ~local_path:path ) | Some other -> ksprintf failwith "URI scheme %S (in %s) NOT SUPPORTED" other url end |> AF.with_provenance "input-url" ~string_arguments:["url", url] [] let save ~name thing = let open KEDSL in let basename = Filename.basename in let canonicalize path = (* Remove the ending '/' from the path. This is so rsync syncs the directory itself + *) let suffix = "/" in if Filename.check_suffix path suffix then String.chop_suffix_exn ~suffix path else path in let base_path = Saving_utilities.base_path ?results_dir:Config.results_dir ~work_dir:Config.work_dir ~name () in let move ?(and_gzip = false) ~from_path ~wf product = let json = `Assoc [ "base-path", `String base_path; "saved-from", `String from_path; "provenance", AF.get_provenance thing |> Provenance_description.to_yojson; ] |> Yojson.Basic.pretty_to_string in let make = Machine.quick_run_program Config.machine Program.( shf "mkdir -p %s" base_path && shf "rsync -a %s %s" from_path base_path && ( match and_gzip with | true -> shf "for f in $(find %s -type f) ; do \ gzip --force --keep $f ; \ done" base_path | false -> sh "echo 'No GZipping Requested'" ) && shf "echo %s > %s.json" (Filename.quote json) base_path ) in let name = sprintf "Saving \"%s\"" name in workflow_node product ~name ~make ~edges:[depends_on wf] in let tf path = transform_single_file ~path in begin match AF.get_file thing with | Bam wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Bam (move ~from_path ~wf (transform_bam ~path:to_path wf#product)) | Vcf wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Vcf (move ~and_gzip:true ~from_path ~wf (transform_vcf ~path:to_path wf#product)) | Gtf wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Gtf (move ~from_path ~wf (tf to_path wf#product)) | Flagstat_result wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Flagstat_result (move ~from_path ~wf (tf to_path wf#product)) | Isovar_result wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Isovar_result (move ~from_path ~wf (tf to_path wf#product)) | Topiary_result wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Topiary_result (move ~from_path ~wf (tf to_path wf#product)) | Vaxrank_result wf -> let from_path = canonicalize wf#product#output_folder_path in let to_path = base_path // basename from_path in let vp = Tools.Vaxrank.move_vaxrank_product ~output_folder_path:to_path wf#product in Vaxrank_result (move ~from_path ~wf vp) | Optitype_result wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in let o = Tools.Optitype.move_optitype_product ~path:to_path wf#product in let from_path = wf#product#path in Optitype_result (move ~from_path ~wf o) | Seq2hla_result wf -> let from_path = canonicalize wf#product#work_dir_path in let to_path = base_path // basename from_path in let s = Tools.Seq2HLA.move_seq2hla_product ~path:to_path wf#product in Seq2hla_result (move ~from_path ~wf s) | Fastqc_result wf -> let from_path = wf#product#paths |> List.hd_exn |> Filename.dirname |> canonicalize in let fqc = let paths = List.map wf#product#paths ~f:(fun p -> base_path // (basename from_path) // (basename p)) in list_of_files paths in Fastqc_result (move ~from_path ~wf fqc) | Cufflinks_result wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Cufflinks_result (move ~from_path ~wf (tf to_path wf#product)) | Bai wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Bai (move ~from_path ~wf (tf to_path wf#product)) | Kallisto_result wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in Kallisto_result (move ~from_path ~wf (tf to_path wf#product)) | MHC_alleles wf -> let from_path = wf#product#path in let to_path = base_path // basename from_path in MHC_alleles (move ~from_path ~wf (tf to_path wf#product)) | Raw_file wf -> let from_path = canonicalize wf#product#path in let to_path = base_path // basename from_path in Raw_file (move ~from_path ~wf (tf to_path wf#product)) | Gz _ -> failwith "Cannot `save` Gz." | List _ -> failwith "Cannot `save` List." | Pair _ -> failwith "Cannot `save` Pair." | Lambda _ -> failwith "Cannot `save` Lambda." | _ -> failwith "Shouldn't get here: pattern match for `save` must be exhaustive." end |> AF.with_provenance "save" ["product", AF.get_provenance thing] ~string_arguments:["name", name] let fastq ~sample_name ?fragment_id ~r1 ?r2 () = Fastq ( let open KEDSL in let read1 = get_raw_file (AF.get_file r1) in let read2 = Option.map r2 ~f:(fun r -> AF.get_file r |> get_raw_file) in fastq_node_of_single_file_nodes ?fragment_id ~host ~name:sample_name read1 read2 ) |> AF.with_provenance "fastq" (("r1", AF.get_provenance r1) :: Option.value_map ~default:[] r2 ~f:(fun r -> ["r2", AF.get_provenance r])) ~string_arguments:( ("sample-name", sample_name) :: Option.value_map fragment_id ~default:[] ~f:(fun id -> ["fragment-id", id])) let fastq_gz ~sample_name ?fragment_id ~r1 ?r2 () = let fq = fastq ~sample_name ?fragment_id ~r1 ?r2 () in Gz (AF.get_file fq) |> AF.with_provenance "gz" ["fastq", AF.get_provenance fq] let bam ~sample_name ?sorting ~reference_build input = Bam ( let open KEDSL in let host = Machine.as_host Config.machine in let f = get_raw_file (AF.get_file input) in let bam = bam_file ~host ~name:sample_name ?sorting ~reference_build f#product#path in workflow_node bam ~equivalence:`None ~name:(sprintf "Input-bam: %s" sample_name) ~edges:[depends_on f] ) |> AF.with_provenance "bam" ["file", AF.get_provenance input] ~string_arguments:[ "sample-name", sample_name; "sorting", (match sorting with | Some `Coordinate -> "coordinate" | None -> "none" | Some `Read_name -> "read-name"); "reference-build", reference_build; ] let bed file = Bed (get_raw_file (AF.get_file file)) |> AF.with_provenance "bed" ["file", AF.get_provenance file] let mhc_alleles how = match how with | `File raw -> MHC_alleles (get_raw_file (AF.get_file raw)) |> AF.with_provenance "mhc-alleles" ["file", AF.get_provenance raw] | `Names strlist -> let path = Name_file.in_directory ~readable_suffix:"MHC_allleles.txt" Config.work_dir strlist in let open KEDSL in let host = Machine.as_host Config.machine in let product = single_file ~host path in let node = workflow_node product ~name:(sprintf "Inline-MHC-alleles: %s" (String.concat ~sep:", " strlist)) ~make:( Machine.quick_run_program Config.machine Program.( let line s = shf "echo %s >> %s" (Filename.quote s) (Filename.quote path) in shf "mkdir -p %s" (Filename.dirname path) && shf "rm -f %s" (Filename.quote path) && chain (List.map ~f:line strlist) ) ) in MHC_alleles node |> AF.with_provenance "mhc-allelles" [] ~string_arguments:(List.mapi strlist ~f:(fun i s -> sprintf "allele-%d" i, s)) let index_bam bam = let input_bam = get_bam (AF.get_file bam) in let sorted_bam = Tools.Samtools.sort_bam_if_necessary ~run_with ~by:`Coordinate input_bam in Bai ( Tools.Samtools.index_to_bai ~run_with ~check_sorted:true sorted_bam ) |> AF.with_provenance "index-bam" ["bam", AF.get_provenance bam] let kallisto ~reference_build ?(bootstrap_samples=100) fastq = let fq = get_fastq (AF.get_file fastq) in let result_prefix = Name_file.in_directory ~readable_suffix:"kallisto" Config.work_dir [ fq#product#escaped_sample_name; sprintf "%d" bootstrap_samples; reference_build; ] in Kallisto_result ( Tools.Kallisto.run ~reference_build ~fastq:fq ~run_with ~result_prefix ~bootstrap_samples ) |> AF.with_provenance "kallisto" ["fastq", AF.get_provenance fastq] ~string_arguments:[ "reference-build", reference_build; "bootstrap-samples", Int.to_string bootstrap_samples; ] let delly2 ?(configuration=Tools.Delly2.Configuration.default) ~normal ~tumor () = let normal_bam = get_bam (AF.get_file normal) in let tumor_bam = get_bam (AF.get_file tumor) in let output_path = Name_file.in_directory ~readable_suffix:"-delly2.vcf" Config.work_dir [ normal_bam#product#path; tumor_bam#product#path; Tools.Delly2.Configuration.name configuration; ] in let bcf = Tools.Delly2.run_somatic ~configuration ~run_with ~normal:normal_bam ~tumor:tumor_bam ~output_path:(output_path ^ ".bcf") in Vcf (Tools.Bcftools.bcf_to_vcf ~run_with ~reference_build:normal_bam#product#reference_build ~bcf output_path) |> AF.with_provenance "delly2" ["normal", AF.get_provenance normal; "tumor", AF.get_provenance tumor] ~string_arguments:[ "configuration-name", Tools.Delly2.Configuration.name configuration; ] ~json_arguments:[ "configuration", Tools.Delly2.Configuration.to_json configuration; ] let cufflinks ?reference_build bamf = let bam = get_bam (AF.get_file bamf) in let reference_build = match reference_build with | None -> bam#product#reference_build | Some r -> r in let result_prefix = Name_file.in_directory ~readable_suffix:"cufflinks" Config.work_dir [ bam#product#escaped_sample_name; reference_build ] in Cufflinks_result ( Tools.Cufflinks.run ~reference_build ~bam ~run_with ~result_prefix ) |> AF.with_provenance "cufflinks" ["bam", AF.get_provenance bamf] ~string_arguments:["reference-build", reference_build] let make_aligner name ~make_workflow ~config_name ~config_to_json ~configuration ~reference_build fastq = let freads = get_fastq (AF.get_file fastq) in let result_prefix = Name_file.in_directory ~readable_suffix:name Config.work_dir [ config_name configuration; freads#product#escaped_sample_name; freads#product#fragment_id_forced; name; ] in Bam ( make_workflow ~reference_build ~configuration ~result_prefix ~run_with freads ) |> AF.with_provenance name ["fastq", AF.get_provenance fastq] ~string_arguments:[ "configuration-name", config_name configuration; "reference-build", reference_build; ] ~json_arguments:["configuration", config_to_json configuration] let bwa_aln ?(configuration = Tools.Bwa.Configuration.Aln.default) = make_aligner "bwaaln" ~configuration ~config_name:Tools.Bwa.Configuration.Aln.name ~config_to_json:Tools.Bwa.Configuration.Aln.to_json ~make_workflow:( fun ~reference_build ~configuration ~result_prefix ~run_with freads -> Tools.Bwa.align_to_sam ~reference_build ~configuration ~fastq:freads ~result_prefix ~run_with () |> Tools.Samtools.sam_to_bam ~reference_build ~run_with ) let bwa_mem ?(configuration = Tools.Bwa.Configuration.Mem.default) = make_aligner "bwamem" ~configuration ~config_name:Tools.Bwa.Configuration.Mem.name ~config_to_json:Tools.Bwa.Configuration.Mem.to_json ~make_workflow:( fun ~reference_build ~configuration ~result_prefix ~run_with freads -> Tools.Bwa.mem_align_to_sam ~reference_build ~configuration ~fastq:freads ~result_prefix ~run_with () |> Tools.Samtools.sam_to_bam ~reference_build ~run_with ) let bwa_mem_opt ?(configuration = Tools.Bwa.Configuration.Mem.default) ~reference_build input = let bwa_mem_opt input annot = let result_prefix = Name_file.in_directory ~readable_suffix:"bwamemopt" Config.work_dir [ Tools.Bwa.Configuration.Mem.name configuration; Tools.Bwa.Input_reads.sample_name input; Tools.Bwa.Input_reads.read_group_id input; ] in Bam ( Tools.Bwa.mem_align_to_bam ~configuration ~reference_build ~run_with ~result_prefix input ) |> AF.with_provenance "bwa-mem-opt" ["input", annot] ~string_arguments:[ "reference-build", reference_build; "configuration-name", Tools.Bwa.Configuration.Mem.name configuration; ] ~json_arguments:[ "configuration", Tools.Bwa.Configuration.Mem.to_json configuration; ] in let of_input = function | `Fastq fastq -> let fq = get_fastq (AF.get_file fastq) in bwa_mem_opt (`Fastq fq) (AF.get_provenance fastq) | `Fastq_gz fqz -> let fq = get_gz (AF.get_file fqz) |> get_fastq in bwa_mem_opt (`Fastq fq) (AF.get_provenance fqz) | `Bam (b, p) -> let bam = get_bam (AF.get_file b) in bwa_mem_opt (`Bam (bam, `PE)) (AF.get_provenance b) in of_input input let star ?(configuration = Tools.Star.Configuration.Align.default) = make_aligner "star" ~configuration ~config_name:Tools.Star.Configuration.Align.name ~config_to_json:Tools.Star.Configuration.Align.to_json ~make_workflow:( fun ~reference_build ~configuration ~result_prefix ~run_with fastq -> Tools.Star.align ~configuration ~reference_build ~fastq ~result_prefix ~run_with () ) let hisat ?(configuration = Tools.Hisat.Configuration.default_v1) = make_aligner "hisat" ~configuration ~config_name:Tools.Hisat.Configuration.name ~config_to_json:Tools.Hisat.Configuration.to_json ~make_workflow:( fun ~reference_build ~configuration ~result_prefix ~run_with fastq -> Tools.Hisat.align ~configuration ~reference_build ~fastq ~result_prefix ~run_with () |> Tools.Samtools.sam_to_bam ~reference_build ~run_with ) let mosaik = make_aligner "mosaik" ~configuration:() ~config_name:(fun _ -> "default") ~config_to_json:(fun _ -> `Assoc ["name", `String "default"]) ~make_workflow:( fun ~reference_build ~configuration ~result_prefix ~run_with fastq -> Tools.Mosaik.align ~reference_build ~fastq ~result_prefix ~run_with ()) let gunzip: Annotated_file.t -> Annotated_file.t = fun gz -> let inside = get_gz (AF.get_file gz) in begin match inside with | Fastq f -> let make_result_path read = let base = Filename.basename read in Config.work_dir // (match base with | fastqgz when Filename.check_suffix base ".fastq.gz" -> Filename.chop_suffix base ".gz" | fqz when Filename.check_suffix base ".fqz" -> Filename.chop_suffix base ".fqz" ^ ".fastq" | other -> ksprintf failwith "To_workflow.gunzip: cannot recognize Gz-Fastq \ extension: %S" other) in let gunzip read = let result_path = make_result_path read#product#path in Workflow_utilities.Gunzip.concat ~run_with [read] ~result_path in let fastq_r1 = gunzip (KEDSL.read_1_file_node f) in let fastq_r2 = Option.map (KEDSL.read_2_file_node f) ~f:gunzip in Fastq ( KEDSL.fastq_node_of_single_file_nodes ~host ~name:f#product#sample_name ?fragment_id:f#product#fragment_id fastq_r1 fastq_r2 ) |> AF.with_provenance "gunzip" ["fastq-gz", AF.get_provenance gz] | other -> ksprintf failwith "To_workflow.gunzip: non-FASTQ input not implemented" end let gunzip_concat gzl = ksprintf failwith "To_workflow.gunzip_concat: not implemented" let concat : Annotated_file.t -> Annotated_file.t = fun l -> begin match get_list (AF.get_file l) with | Fastq one :: [] -> Fastq one |> AF.with_provenance "concat" ["single-element", AF.get_provenance l] | Fastq first_fastq :: _ as lfq -> let fqs = List.map lfq ~f:get_fastq in let r1s = List.map fqs ~f:(KEDSL.read_1_file_node) in let r2s = List.filter_map fqs ~f:KEDSL.read_2_file_node in (* TODO add some verifications that they have the same number of files? i.e. that we are not mixing SE and PE fastqs *) let concat_files ~read l = let result_path = Name_file.in_directory Config.work_dir ~readable_suffix:( sprintf "%s-Read%d-Concat.fastq" first_fastq#product#escaped_sample_name read) ( first_fastq#product#escaped_sample_name :: first_fastq#product#fragment_id_forced :: List.map l ~f:(fun wf -> wf#product#path) ) in Workflow_utilities.Cat.concat ~run_with l ~result_path in let read_1 = concat_files r1s ~read:1 in let read_2 = match r2s with [] -> None | more -> Some (concat_files more ~read:2) in Fastq ( KEDSL.fastq_node_of_single_file_nodes ~host ~name:first_fastq#product#sample_name ~fragment_id:"edsl-concat" read_1 read_2 ) |> AF.with_provenance "concat" ["fastq-list", AF.get_provenance l] | other -> ksprintf failwith "To_workflow.concat: not implemented" end let merge_bams ?(delete_input_on_success = true) ?(attach_rg_tag = false) ?(uncompressed_bam_output = false) ?(compress_level_one = false) ?(combine_rg_headers = false) ?(combine_pg_headers = false) bam_list = match AF.get_file bam_list with | List [ one_bam ] -> one_bam |> AF.with_provenance "merge-bams" ["pass-through", AF.get_provenance bam_list] | List bam_files -> let bams = List.map bam_files ~f:get_bam in let output_path = Name_file.in_directory ~readable_suffix:"samtoolsmerge.bam" Config.work_dir (List.map bams ~f:(fun bam -> bam#product#path)) in Bam (Tools.Samtools.merge_bams ~delete_input_on_success ~attach_rg_tag ~uncompressed_bam_output ~compress_level_one ~combine_rg_headers ~combine_pg_headers ~run_with bams output_path) |> AF.with_provenance "merge-bams" ["bam-list", AF.get_provenance bam_list] ~string_arguments:[ "delete-input-on-success", string_of_bool delete_input_on_success; "attach-rg-tag", string_of_bool attach_rg_tag; "uncompressed-bam-output", string_of_bool uncompressed_bam_output; "compress-level-one", string_of_bool compress_level_one; "combine-rg-headers", string_of_bool combine_rg_headers; "combine-pg-headers", string_of_bool combine_pg_headers; ] | other -> fail_get other "To_workflow.merge_bams: not a list of bams?" let stringtie ?(configuration = Tools.Stringtie.Configuration.default) bamt = let bam = get_bam (AF.get_file bamt) in let result_prefix = Name_file.from_path bam#product#path ~readable_suffix:"stringtie" [ configuration.Tools.Stringtie.Configuration.name; ] in Gtf (Tools.Stringtie.run ~configuration ~bam ~result_prefix ~run_with ()) |> AF.with_provenance "stringtie" ["bam", AF.get_provenance bamt] ~string_arguments:["configuration-name", configuration.Tools.Stringtie.Configuration.name] ~json_arguments:["configuration", Tools.Stringtie.Configuration.to_json configuration] let indel_realigner_function: type a. configuration: _ -> a KEDSL.bam_or_bams -> a = fun ~configuration on_what -> match Config.map_reduce_gatk_indel_realigner with | true -> Tools.Gatk.indel_realigner_map_reduce ~run_with ~compress:false ~configuration on_what | false -> Tools.Gatk.indel_realigner ~run_with ~compress:false ~configuration on_what let indel_realigner_provenance ~configuration t_arguments product = let config_indel, config_target = configuration in AF.with_provenance "gatk-indel-realigner" t_arguments product ~string_arguments:[ "indel-realigner-configuration-name", config_indel.Tools.Gatk.Configuration.Indel_realigner.name; "target-creator-configuration-name", config_target.Tools.Gatk.Configuration.Realigner_target_creator.name; ] ~json_arguments:[ "indel-realigner-configuration", Tools.Gatk.Configuration.Indel_realigner.to_json config_indel; "target-creator-configuration", Tools.Gatk.Configuration.Realigner_target_creator.to_json config_target; ] let gatk_indel_realigner ?(configuration = Tools.Gatk.Configuration.default_indel_realigner) bam = let input_bam = get_bam (AF.get_file bam) in Bam (indel_realigner_function ~configuration (KEDSL.Single_bam input_bam)) |> indel_realigner_provenance ~configuration ["bam", AF.get_provenance bam] let gatk_indel_realigner_joint ?(configuration = Tools.Gatk.Configuration.default_indel_realigner) bam_pair = let bam1 = (AF.get_file bam_pair) |> pair_fst |> get_bam in let bam2 = (AF.get_file bam_pair) |> pair_snd |> get_bam in let bam_list_node = indel_realigner_function (KEDSL.Bam_workflow_list [bam1; bam2]) ~configuration in begin match KEDSL.explode_bam_list_node bam_list_node with | [realigned_normal; realigned_tumor] -> Pair (Bam realigned_normal, Bam realigned_tumor) |> indel_realigner_provenance ~configuration ["bam-pair", AF.get_provenance bam_pair] | other -> failf "Gatk.indel_realigner did not return the correct list \ of length 2 (tumor, normal): it gave %d bams" (List.length other) end let picard_mark_duplicates ?(configuration = Tools.Picard.Mark_duplicates_settings.default) bam = let input_bam = get_bam (AF.get_file bam) in let output_bam = (* We assume that the settings do not impact the actual result. *) Name_file.from_path input_bam#product#path ~readable_suffix:"mark_dups.bam" [] in Bam ( Tools.Picard.mark_duplicates ~settings:configuration ~run_with ~input_bam output_bam )|> AF.with_provenance "picard-mark-duplicates" ["bam", AF.get_provenance bam] let picard_reorder_sam ?mem_param ?reference_build bam = let input_bam = get_bam (AF.get_file bam) in let reference_build_param = match reference_build with | None -> input_bam#product#reference_build | Some r -> r in let output_bam_path = (* We assume that the settings do not impact the actual result. *) Name_file.from_path input_bam#product#path ~readable_suffix:"reorder_sam.bam" [reference_build_param] in Bam ( Tools.Picard.reorder_sam ?mem_param ?reference_build ~run_with ~input_bam output_bam_path ) |> AF.with_provenance "picard-reorder-sam" ["bam", AF.get_provenance bam] ~string_arguments:["reference-build", reference_build_param] let picard_clean_bam bam = let input_bam = get_bam (AF.get_file bam) in let output_bam_path = Name_file.from_path input_bam#product#path ~readable_suffix:"cleaned.bam" [] in Bam ( Tools.Picard.clean_bam ~run_with input_bam output_bam_path ) |> AF.with_provenance "picard-clean-bam" ["bam", AF.get_provenance bam] let gatk_bqsr ?(configuration = Tools.Gatk.Configuration.default_bqsr) bam = let input_bam = get_bam (AF.get_file bam) in let output_bam = let (bqsr, preads) = configuration in Name_file.from_path input_bam#product#path ~readable_suffix:"gatk_bqsr.bam" [ bqsr.Tools.Gatk.Configuration.Bqsr.name; preads.Tools.Gatk.Configuration.Print_reads.name; ] in let config_bqsr, config_print_reads = configuration in Bam ( Tools.Gatk.base_quality_score_recalibrator ~configuration ~run_with ~input_bam ~output_bam ) |> AF.with_provenance "gatk-bqsr" ["bam", AF.get_provenance bam] ~string_arguments:[ "bqsr-configuration-name", config_bqsr.Tools.Gatk.Configuration.Bqsr.name; "print-reads-configuration-name", config_print_reads.Tools.Gatk.Configuration.Print_reads.name; ] ~json_arguments:[ "bqsr-configuration", Tools.Gatk.Configuration.Bqsr.to_json config_bqsr; "print-reads-configuration", Tools.Gatk.Configuration.Print_reads.to_json config_print_reads; ] let seqtk_shift_phred_scores fastq = let input_fastq = get_fastq (AF.get_file fastq) in let output_folder = Name_file.from_path input_fastq#product#sample_name ~readable_suffix:"phred33-fastq" [ "seqtk"; ] in Fastq ( Tools.Seqtk.shift_phred_scores ~run_with ~output_postfix:".fastq" ~input_fastq ~output_folder:(Config.work_dir // output_folder) ) |> AF.with_provenance "seqtk-shift-phred-scores" ["fastq", AF.get_provenance fastq] let seq2hla fq = let fastq = get_fastq (AF.get_file fq) in let r1 = KEDSL.read_1_file_node fastq in let r2 = match KEDSL.read_2_file_node fastq with | Some r -> r | None -> failf "Seq2HLA doesn't support Single_end_sample(s)." in let work_dir = Name_file.in_directory Config.work_dir ~readable_suffix:"seq2hla-workdir" [ fastq#product#escaped_sample_name; fastq#product#fragment_id_forced; ] in Seq2hla_result ( Tools.Seq2HLA.hla_type ~work_dir ~run_with ~run_name:fastq#product#escaped_sample_name ~r1 ~r2 ) |> AF.with_provenance "seq2hla" ["fastq", AF.get_provenance fq] let hlarp input = let out o = Name_file.from_path o ~readable_suffix:"hlarp.csv" [] in let hlarp = Tools.Hlarp.run ~run_with in let hla_result, output_path, prov_argument = match input with | `Seq2hla v -> let r = get_seq2hla_result (AF.get_file v) in `Seq2hla r, out r#product#work_dir_path, ("seq2hla", AF.get_provenance v) | `Optitype v -> let r = get_optitype_result (AF.get_file v) in `Optitype r, out r#product#path, ("optitype", AF.get_provenance v) in let res = hlarp ~hla_result ~output_path in MHC_alleles res |> AF.with_provenance "hlarp" [prov_argument] let filter_to_region vcf bed = let vcff = get_vcf (AF.get_file vcf) in let bedf = get_bed (AF.get_file bed) in let output = Name_file.from_path vcff#product#path ~readable_suffix:"_intersect.vcf" [Filename.basename bedf#product#path |> Filename.chop_extension] in Vcf (Tools.Bedtools.intersect ~primary:vcff ~intersect_with:[bedf] ~run_with output) |> AF.with_provenance "filter-to-region" ["vcf", AF.get_provenance vcf; "bed", AF.get_provenance bed] let bam_left_align ~reference_build bamf = let bam = get_bam (AF.get_file bamf) in let output = Name_file.from_path bam#product#path ~readable_suffix:"_left-aligned.bam" [Filename.basename bam#product#path] in Bam (Tools.Freebayes.bam_left_align ~reference_build ~bam ~run_with output) |> AF.with_provenance "bam-left-align" ["bam", AF.get_provenance bamf] ~string_arguments:["reference-build", reference_build] let sambamba_filter ~filter bami = let bam = get_bam (AF.get_file bami) in let output = Name_file.from_path bam#product#path ~readable_suffix:"_filtered.bam" [Filename.basename bam#product#path; Tools.Sambamba.Filter.to_string filter] in Bam (Tools.Sambamba.view ~bam ~run_with ~filter output) |> AF.with_provenance "sambamba-filter" ["bam", AF.get_provenance bami] (** The filter's [to_json] function is just for now the identity: *) ~json_arguments:["filter", (filter :> Yojson.Basic.json)] let fastqc fq = let fastq = get_fastq (AF.get_file fq) in let output_folder = Name_file.in_directory ~readable_suffix:"fastqc_result" Config.work_dir [ fastq#product#escaped_sample_name; fastq#product#fragment_id_forced; ] in Fastqc_result (Tools.Fastqc.run ~run_with ~fastq ~output_folder) |> AF.with_provenance "fastqc" ["fastq", AF.get_provenance fq] let flagstat bami = let bam = get_bam (AF.get_file bami) in Flagstat_result (Tools.Samtools.flagstat ~run_with bam) |> AF.with_provenance "flagstat" ["bam", AF.get_provenance bami] let vcf_annotate_polyphen vcf = let v = get_vcf (AF.get_file vcf) in let output_vcf = (Filename.chop_extension v#product#path) ^ "_polyphen.vcf" in let reference_build = v#product#reference_build in Vcf ( Tools.Vcfannotatepolyphen.run ~run_with ~reference_build ~vcf:v ~output_vcf ) |> AF.with_provenance "vcf-annotate-polyphen" ["vcf", AF.get_provenance vcf] let snpeff vcf = let open KEDSL in let v = get_vcf (AF.get_file vcf) in let reference_build = v#product#reference_build in let out_folder = Name_file.in_directory ~readable_suffix:"snpeff" Config.work_dir [ Filename.basename v#product#path; reference_build; ] in let snpeff_run = Tools.Snpeff.annotate ~run_with ~reference_build ~input_vcf:v ~out_folder in Vcf ( workflow_node (transform_vcf v#product ~path:(snpeff_run#product#path)) ~name:(sprintf "Fetch annotated VCF: %s" v#render#name) ~edges:[ depends_on snpeff_run; ] ) |> AF.with_provenance "snpeff" ["vcf", AF.get_provenance vcf] let isovar ?(configuration=Tools.Isovar.Configuration.default) vcf bam = let v = get_vcf (AF.get_file vcf) in let b = get_bam (AF.get_file bam) in let reference_build = if v#product#reference_build = b#product#reference_build then v#product#reference_build else ksprintf failwith "VCF and Bam do not agree on their reference build: \ bam: %s Vs vcf: %s" b#product#reference_build v#product#reference_build in let output_file = Name_file.in_directory ~readable_suffix:"isovar.csv" Config.work_dir [ Tools.Isovar.Configuration.name configuration; reference_build; (Filename.chop_extension (Filename.basename v#product#path)); (Filename.chop_extension (Filename.basename b#product#path)); ] in Isovar_result ( Tools.Isovar.run ~configuration ~run_with ~reference_build ~vcf:v ~bam:b ~output_file ) |> AF.with_provenance "isovar" ["vcf", AF.get_provenance vcf; "bam", AF.get_provenance bam] ~string_arguments:[ "reference-build", reference_build; "configuration-name", Tools.Isovar.Configuration.name configuration; ] ~json_arguments:[ "configuration", Tools.Isovar.Configuration.to_json configuration; ] let topiary ?(configuration=Tools.Topiary.Configuration.default) vcfs predictor alleles = let vs = List.map ~f:(fun x -> AF.get_file x |> get_vcf) vcfs in let refs = vs |> List.map ~f:(fun v -> v#product#reference_build) |> List.dedup in let reference_build = if List.length refs > 1 then ksprintf failwith "VCFs do not agree on their reference build: %s" (String.concat ~sep:", " refs) else List.nth_exn refs 0 in let mhc = get_mhc_alleles (AF.get_file alleles) in let output_file = Name_file.in_directory ~readable_suffix:"topiary.tsv" Config.work_dir ([ Hla_utilities.predictor_to_string predictor; Tools.Topiary.Configuration.name configuration; Filename.chop_extension (Filename.basename mhc#product#path); ] @ (List.map vs ~f:(fun v -> v#product#path))) in Topiary_result ( Tools.Topiary.run ~configuration ~run_with ~reference_build ~vcfs:vs ~predictor ~alleles_file:mhc ~output:(`CSV output_file) ) |> AF.with_provenance "topiary" (("alleles", AF.get_provenance alleles) :: List.mapi vcfs ~f:(fun i v -> sprintf "vcf_%d" i, AF.get_provenance v)) ~string_arguments:[ "predictor", Hla_utilities.predictor_to_string predictor; "configuration-name", Tools.Topiary.Configuration.name configuration; ] ~json_arguments:[ "configuration", Tools.Topiary.Configuration.to_json configuration; ] let vaxrank ?(configuration=Tools.Vaxrank.Configuration.default) vcfs bam predictor alleles = let vs = List.map ~f:(fun x -> AF.get_file x |> get_vcf) vcfs in let b = get_bam (AF.get_file bam) in let reference_build = if List.exists vs ~f:(fun v -> v#product#reference_build <> b#product#reference_build) then ksprintf failwith "VCFs and Bam do not agree on their reference build: \ bam: %s Vs vcfs: %s" b#product#reference_build (List.map vs ~f:(fun v -> v#product#reference_build) |> String.concat ~sep:", ") else b#product#reference_build in let mhc = get_mhc_alleles (AF.get_file alleles) in let outdir = Name_file.in_directory ~readable_suffix:"vaxrank" Config.work_dir ([ Tools.Vaxrank.Configuration.name configuration; Hla_utilities.predictor_to_string predictor; (Filename.chop_extension (Filename.basename mhc#product#path)); ] @ (List.map vs ~f:(fun v -> (Filename.chop_extension (Filename.basename v#product#path)))) ) in Vaxrank_result ( Tools.Vaxrank.run ~configuration ~run_with ~reference_build ~vcfs:vs ~bam:b ~predictor ~alleles_file:mhc ~output_folder:outdir ) |> AF.with_provenance "vaxrank" (("alleles", AF.get_provenance alleles) :: ("bam", AF.get_provenance bam) :: List.mapi vcfs ~f:(fun i v -> sprintf "vcf_%d" i, AF.get_provenance v)) ~string_arguments:[ "predictor", Hla_utilities.predictor_to_string predictor; "configuration-name", Tools.Vaxrank.Configuration.name configuration; ] ~json_arguments:[ "configuration", Tools.Vaxrank.Configuration.to_json configuration; ] let optitype how fq = let fastq = get_fastq (AF.get_file fq) in let intput_type = match how with `RNA -> "RNA" | `DNA -> "DNA" in let work_dir = Name_file.in_directory Config.work_dir ~readable_suffix:"optitype.d" [ fastq#product#escaped_sample_name; intput_type; fastq#product#fragment_id_forced; ] in let run_name = fastq#product#escaped_sample_name in Optitype_result ( match how with | `DNA -> Tools.Optitype.dna_hla_type_with_bwamem ~configuration:Tools.Bwa.Configuration.Mem.default ~work_dir ~run_with ~fastq ~run_name | `RNA -> Tools.Optitype.hla_type ~work_dir ~run_with ~fastq ~run_name `RNA ) |> AF.with_provenance "optitype" ["fastq", AF.get_provenance fq] ~string_arguments:["input-type", intput_type] let gatk_haplotype_caller bam = let input_bam = get_bam (AF.get_file bam) in let result_prefix = Filename.chop_extension input_bam#product#path ^ sprintf "_gatkhaplo" in Vcf ( Tools.Gatk.haplotype_caller ~run_with ~input_bam ~result_prefix `Map_reduce ) |> AF.with_provenance "gatk-haplotype-caller" ["bam", AF.get_provenance bam] let bam_to_fastq ?fragment_id how bam = let input_bam = get_bam (AF.get_file bam) in let sample_type = match how with `SE -> `Single_end | `PE -> `Paired_end in let sample_type_str = match how with `PE -> "PE" | `SE -> "SE" in let output_prefix = Config.work_dir // sprintf "%s-b2fq-%s" Filename.(chop_extension (basename input_bam#product#path)) sample_type_str in Fastq ( Tools.Picard.bam_to_fastq ~run_with ~sample_type ~output_prefix input_bam ) |> AF.with_provenance "bam-to-fastq" ["bam", AF.get_provenance bam] ~string_arguments:(("sample-type", sample_type_str) :: Option.value_map ~default:[] fragment_id ~f:(fun fi -> ["fragment-id", fi])) let somatic_vc name confname confjson default_conf runfun ?configuration ~normal ~tumor () = let normal_bam = get_bam (AF.get_file normal) in let tumor_bam = get_bam (AF.get_file tumor) in let configuration = Option.value configuration ~default:default_conf in let result_prefix = Name_file.from_path tumor_bam#product#path ~readable_suffix:(sprintf "_%s-%s" name (confname configuration)) [ normal_bam#product#path |> Filename.basename |> Filename.chop_extension; ] in Vcf ( runfun ~configuration ~run_with ~normal:normal_bam ~tumor:tumor_bam ~result_prefix ) |> AF.with_provenance name ["normal", AF.get_provenance normal; "tumor", AF.get_provenance tumor] ~string_arguments:[ "configuration-name", confname configuration; ] ~json_arguments:[ "configuration", confjson configuration; ] let mutect = somatic_vc "mutect" Tools.Mutect.Configuration.name Tools.Mutect.Configuration.to_json Tools.Mutect.Configuration.default (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Mutect.run ~configuration ~run_with ~normal ~tumor ~result_prefix `Map_reduce) let mutect2 = somatic_vc "mutect2" Tools.Gatk.Configuration.Mutect2.name Tools.Gatk.Configuration.Mutect2.to_json Tools.Gatk.Configuration.Mutect2.default (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Gatk.mutect2 ~configuration ~run_with ~input_normal_bam:normal ~input_tumor_bam:tumor ~result_prefix `Map_reduce) let somaticsniper = somatic_vc "somaticsniper" Tools.Somaticsniper.Configuration.name Tools.Somaticsniper.Configuration.to_json Tools.Somaticsniper.Configuration.default (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Somaticsniper.run ~configuration ~run_with ~normal ~tumor ~result_prefix ()) let strelka = somatic_vc "strelka" Tools.Strelka.Configuration.name Tools.Strelka.Configuration.to_json Tools.Strelka.Configuration.default (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Strelka.run ~configuration ~normal ~tumor ~run_with ~result_prefix ()) let varscan_somatic ?adjust_mapq = somatic_vc "varscan_somatic" (fun () -> sprintf "Amq%s" (Option.value_map adjust_mapq ~default:"N" ~f:Int.to_string)) (fun () -> `Assoc [ "adjust-mapq", Option.value_map adjust_mapq ~default:(`String "None") ~f:(fun i -> `Int i) ]) () (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Varscan.somatic_map_reduce ?adjust_mapq ~run_with ~normal ~tumor ~result_prefix ()) ~configuration:() let muse = somatic_vc "muse" Tools.Muse.Configuration.name Tools.Muse.Configuration.to_json (Tools.Muse.Configuration.default `WGS) (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Muse.run ~configuration ~run_with ~normal ~tumor ~result_prefix `Map_reduce) let virmid = somatic_vc "virmid" Tools.Virmid.Configuration.name Tools.Virmid.Configuration.to_json Tools.Virmid.Configuration.default (fun ~configuration ~run_with ~normal ~tumor ~result_prefix -> Tools.Virmid.run ~configuration ~normal ~tumor ~run_with ~result_prefix ()) end end module Transform_applications = struct (** We use here the {!Optimization_framework} module to apply a bit more than {{:https://en.wikipedia.org/wiki/Lambda_calculus}Β-reduction}: we also try to apply the [List_repr.map], and build [List_repr.make] values. This optimization-pass is a good example of use of the {!Optimization_framework}. *) open Nonstd module Apply_optimization_framework (Input : Semantics.Bioinformatics_base) = struct (** The “core” of the transformation is implemented here. {!Input} is the input language. {!Transformation_types} defines a GADT that encodes the subset of the EDSL that is of interest to the transformation. ['a Transformation_types.term] is designed to allow us to pattern match (in the function {!Transformation_types.bwd}). All the terms of the input language that are not relevant to the transformation are wrapped in {!Transformation_types.Unknown}. *) module Transformation_types = struct type 'a from = 'a Input.repr type 'a term = | Unknown : 'a from -> 'a term | Apply : ('a -> 'b) term * 'a term -> 'b term | Lambda : ('a term -> 'b term) -> ('a -> 'b) term | List_make: ('a term) list -> 'a list term | List_map: ('a list term * ('a -> 'b) term) -> 'b list term | Pair: 'a term * 'b term -> ('a * 'b) term | Fst: ('a * 'b) term -> 'a term | Snd: ('a * 'b) term -> 'b term let fwd x = Unknown x let rec bwd : type a. a term -> a from = function | Apply (Lambda f, v) -> bwd (f v) | Apply (other, v) -> Input.apply (bwd other) (bwd v) | List_map (List_make l, Lambda f) -> Input.list (List.map ~f:(fun x -> bwd (f x)) l) | List_map (x, f) -> Input.list_map ~f:(bwd f) (bwd x) | Lambda f -> Input.lambda (fun x -> (bwd (f (fwd x)))) | List_make l -> Input.list (List.map ~f:bwd l) | Fst (Pair (a, b)) -> bwd a | Snd (Pair (a, b)) -> bwd b | Pair (a, b) -> Input.pair (bwd a) (bwd b) | Fst b -> Input.pair_first (bwd b) | Snd b -> Input.pair_second (bwd b) | Unknown x -> x end (** Applying this functor just adds functions that we don't use yet; so this could be simplified in the future. *) module Transformation = Optimization_framework.Define_transformation(Transformation_types) open Transformation (** {!Language_delta} is where we “intercept” the terms of the language that are interesting. For all the other ones {!Transformation_types.fwd} will be used by {!Optimization_framework.Generic_optimizer}. *) module Language_delta = struct open Transformation_types let apply f x = Apply (f, x) let lambda f = Lambda f let list l = List_make l let list_map l ~f = List_map (l, f) let pair a b = Pair (a, b) let pair_first p = Fst p let pair_second p = Snd p end end (** {!Apply} is the entry point that transforms EDSL terms. *) module Apply (Input : Semantics.Bioinformatics_base) = struct module The_pass = Apply_optimization_framework(Input) (** We populate the module with the default implementations (which do nothing). *) include Optimization_framework.Generic_optimizer(The_pass.Transformation)(Input) (** We override the few functions we're interested in. *) include The_pass.Language_delta end end