: sig module Input : sig type t = | Fastq of fastq | Bam of bam and fastq_fragment = (string option * fastq_data) and fastq = { fastq_sample_name : string; files : fastq_fragment list; } and bam = { bam_sample_name: string; path: string; how: how; sorting: sorting option; reference_build: string; } and fastq_data = | PE of string * string | SE of string | Of_bam of how * sorting option * string * string and sorting = [ `Coordinate | `Read_name ] and how = [ `PE | `SE ] val pp : Format.formatter -> t -> unit val show : t -> string val pe : ?fragment_id:string -> string -> string -> fastq_fragment val se : ?fragment_id:string -> string -> fastq_fragment val fastq_of_bam : ?fragment_id: string -> ?sorted:[ `Coordinate | `Read_name ] -> reference_build:string -> [ `PE | `SE ] -> string -> fastq_fragment val fastq_sample : sample_name:string -> fastq_fragment list -> t val bam_sample : sample_name:string -> ?sorting:sorting -> reference_build:string -> how:how -> string -> t val to_yojson : t -> Yojson.Safe.json val of_yojson : Yojson.Safe.json -> (t, string) Pvem.Result.t module Derive : sig (** Make it easy to create Input.t from directories on hosts. *) val ensure_one_flowcell_max : string list -> (string list, [> `Multiple_flowcells of string list | `Re_group_error of string ]) Pvem_lwt_unix.Deferred_result.t (** This will error out immediately if more than one flowcell is in a directory. We want to make sure only one flowcell is in this directory, otherwise this'll give us bad ordering: for now, we'll just fail with an informative exception: if it turns out we need to handle this case, we can do it here. The proper ordering should be grouping and ordering with flowcell groups, and arbitrary order between groups. *) val fastqs : ?paired_end:bool -> host:Ketrew_pure.Host.t -> string -> (fastq_fragment list, [> `Host of _ Ketrew.Host_io.Error.execution Ketrew.Host_io.Error.non_zero_execution | `Multiple_flowcells of string list | `R2_expected_for_r1 of string | `Re_group_error of string]) Pvem_lwt_unix.Deferred_result.t (** This gives us lexicographically sorted FASTQ filenames in a given directory on the host. This assumes Illumina-style file-naming: Sample_name_barcode_lane#_read#_fragment#.(flowcell)?.fastq.gz N.B. If a given directory contains FASTQs with multiple flowcells, this will fail. *) end end module Make: functor (Bfx : Semantics.Bioinformatics_base) -> sig val fastq_of_files : sample_name:string -> ?fragment_id:string -> r1:string -> ?r2:string -> unit -> [ `Fastq ] Bfx.repr val fastq_of_input : Input.t -> [ `Fastq ] list Bfx.repr val bam_of_input_exn : Input.t -> [ `Bam ] Bfx.repr val bwa_mem_opt_inputs_exn: Input.t -> [ `Bam of [ `Bam ] Bfx.repr * [ `PE | `SE ] | `Fastq of [ `Fastq ] Bfx.repr | `Fastq_gz of [ `Gz of [ `Fastq ] ] Bfx.repr ] list end end = struct (** Reusable and useful pieces of pipeline. *) open Biokepi_run_environment.Common module Input = struct type t = | Fastq of fastq | Bam of bam and fastq = { fastq_sample_name : string; files : fastq_fragment list; } and bam = { bam_sample_name: string; path: string; how: how; sorting: sorting option; reference_build: string; } and fastq_fragment = (string option * fastq_data) and fastq_data = | PE of string * string | SE of string | Of_bam of how * sorting option * string * string and sorting = [ `Coordinate | `Read_name ] and how = [ `PE | `SE ] [@@deriving show,yojson] let pe ?fragment_id a b = (fragment_id, PE (a, b)) let se ?fragment_id a = (fragment_id, SE a) let fastq_of_bam ?fragment_id ?sorted ~reference_build how s = (fragment_id, Of_bam (how, sorted, reference_build, s)) let fastq_sample ~sample_name files = Fastq {fastq_sample_name = sample_name; files} let bam_sample ~sample_name ?sorting ~reference_build ~how path = Bam {bam_sample_name = sample_name; path; how; reference_build; sorting } let tag_v0 = "biokepi-input-v0" let current_version_tag = tag_v0 let to_yojson = let string s = `String s in let option f o : Yojson.Safe.json = Option.value_map o ~default:`Null ~f in let data_to_yojson = function | PE (r1, r2) -> `Assoc ["paired-end", `Assoc ["r1", string r1; "r2", string r2]] | SE f -> `Assoc ["single-end", string f] | Of_bam (how, sorto, refb, fil) -> `Assoc ["bam", `Assoc ["kind", string (match how with | `PE -> "paired-end" | `SE -> "single-end"); "sorting", (match sorto with | None -> `Null | Some `Read_name -> `String "read-name" | Some `Coordinate -> `String "coordinate"); "reference-genome", string refb; "path", string fil]] in let file_to_yojson (fragment_option, data) = `Assoc [ "fragment-id", option string fragment_option; "data", data_to_yojson data; ] in let files_to_yojson files = `List (List.map ~f:file_to_yojson files) in function | Fastq {fastq_sample_name; files} -> `Assoc [current_version_tag, `Assoc ["fastq", `Assoc [ "sample-name", string fastq_sample_name; "fragments", files_to_yojson files; ]]] | Bam bam -> `Assoc [current_version_tag, `Assoc ["bam", bam_to_yojson bam]] let of_yojson j = let open Pvem.Result in let error ?json fmt = ksprintf (fun s -> fail (sprintf "%s%s" s (Option.value_map ~default:"" json ~f:(fun j -> sprintf " but got %s" @@ Yojson.Safe.pretty_to_string ~std:true j))) ) fmt in let data_of_yojson = function | `Assoc ["paired-end", `Assoc ["r1", `String r1; "r2", `String r2]] -> return (PE (r1, r2)) | `Assoc ["single-end", `String file] -> SE file |> return | `Assoc ["bam", `Assoc ["kind", `String kind; "sorting", sorting; "reference-genome", `String refb; "path", `String path;]] -> begin match sorting with | `Null -> return None | `String "coordinate" -> Some `Coordinate |> return | `String "read-name" -> Some `Read_name |> return | other -> error ~json:other "Expecting %S, %S or null (in \"sorting\": ...)" "coordinate" "read-name" end >>= fun sorting -> begin match kind with | "single-end" -> return `SE | "paired-end" -> return `PE | other -> error "Kind in bam must be \"SE\" or \"PE\"" end >>= fun kind -> return (Of_bam (kind, sorting, refb, path)) | other -> error ~json:other "Expecting string or null (in \"fragment\": ...)" in let fragment_of_yojson = function | `Assoc ["fragment-id", frag; "data", data] -> begin match frag with | `String s -> return (Some s) | `Null -> return (None) | other -> error ~json:other "Expecting string or null (in \"fragment\": ...)" end >>= fun fragment_id -> data_of_yojson data >>= fun data_parsed -> return (fragment_id, data_parsed) | other -> error ~json:other "Expecting {\"fragment\": ... , \"data\": ...}" in match j with | `Assoc [vtag, more] when vtag = current_version_tag -> begin match more with | `Assoc ["fastq", `Assoc ["sample-name", `String sample; "fragments", `List frgs]] -> List.fold ~init:(return []) frgs ~f:(fun prev frag -> prev >>= fun p -> fragment_of_yojson frag >>= fun more -> return (more :: p)) >>= fun l -> return (Fastq { fastq_sample_name = sample; files = List.rev l }) | `Assoc ["bam", bam] -> begin match bam_of_yojson bam with | Result.Ok ok -> return ok | Result.Error err -> fail err end >>= fun bam -> return (Bam bam) | other -> error ~json:other "Expecting Fastq or Bam" end | other -> error ~json:other "Expecting Biokepi_input_v0" module Derive = struct let ensure_one_flowcell_max files = let open Pvem_lwt_unix.Deferred_result in let flowcell_re = Re.compile (Re_posix.re {re|.*(\..+)?\.fastq\.gz|re}) in let flowcells = Pvem_lwt_unix.Deferred_list.while_sequential files ~f:(fun f -> match Re.Group.all (Re.exec flowcell_re f) with | [|_; flowcell|] -> return flowcell | _ -> fail (`Re_group_error "Re.Group.all returned an unexpected number of groups.")) in flowcells >>| List.dedup >>= fun flowcells -> if List.length flowcells > 1 then fail (`Multiple_flowcells flowcells) else return files let fastqs ?(paired_end=true) ~host dir = let open Pvem_lwt_unix.Deferred_result in (* We only want to select fastq.gzs in the directory: *) let fastq_re = Re.compile (Re_posix.re {re|.*\.fastq\.gz|re}) in let fastq_p = Re.execp fastq_re in let r1_to_r2 s = let r1_re = Re.compile (Re_posix.re "_R1_") in Re.replace_string r1_re ~by:"_R2_" s in (* If we're expecting paired_end reads, make sure we have a R2 in the directory for each R1 fragment. Returns the list of R1 fragments. *) let ensure_matching_r2s files = if paired_end then (List.fold ~init:(return []) files ~f:(fun acc r1 -> acc >>= fun r1s -> match String.index_of_string r1 ~sub:"_R1_" with | None -> acc | Some _ -> if List.mem (r1_to_r2 r1) ~set:files then return (r1 :: r1s) else fail (`R2_expected_for_r1 r1))) else return files in let as_fragments read1s_filenames = List.mapi read1s_filenames ~f:(fun i r1 -> if paired_end then let r2 = dir // (r1_to_r2 r1) in let r1 = dir // r1 in let fragment_id = sprintf "fragment-%d" i in pe ~fragment_id r1 r2 else let r1 = dir // r1 in let fragment_id = sprintf "fragment-%d" i in se ~fragment_id r1) |> return in let cmd = (sprintf "ls '%s' | sort" dir) in Ketrew.Host_io.(get_shell_command_output (create ()) ~host:host cmd) >>= fun (out, err) -> String.split ~on:(`Character '\n') out |> List.filter ~f:fastq_p |> return >>= ensure_matching_r2s >>= ensure_one_flowcell_max >>= as_fragments end end module Make (Bfx : Semantics.Bioinformatics_base) = struct (** This functions guesses whether to use [fastq] or [fastq_gz] given the file name extension. If [r1] and [r2] dot match, it fails with an exception. *) let fastq_of_files ~sample_name ?fragment_id ~r1 ?r2 () = let is_gz r = Filename.check_suffix r ".gz" || Filename.check_suffix r ".fqz" in match is_gz r1, Option.map ~f:is_gz r2 with | true, None | true, Some true -> let r1 = Bfx.input_url r1 in let r2 = Option.map ~f:Bfx.input_url r2 in Bfx.(fastq_gz ~sample_name ?fragment_id ~r1 ?r2 () |> gunzip) | false, None | false, Some false -> let r1 = Bfx.input_url r1 in let r2 = Option.map ~f:Bfx.input_url r2 in Bfx.(fastq ~sample_name ?fragment_id ~r1 ?r2 ()) | _ -> failwithf "fastq_of_files: cannot handle mixed gzipped \ and non-gzipped fastq pairs (for a given same fragment)" (** Transform a value of type {!Input.t} into a list of BAMs. *) let bam_of_input_exn u = let open Input in match u with | Fastq _ -> failwith "Can't pass Input.t Fastq to bam_of_input_exn" | Bam {bam_sample_name; path; how; sorting; reference_build} -> let f = Bfx.input_url path in Bfx.bam ~sample_name:bam_sample_name ?sorting ~reference_build f (** Transform a value of type {!Input.t} into a pipeline returning FASTQ data, providing the necessary intermediary steps. *) let fastq_of_input u = let open Input in match u with | Bam {bam_sample_name; path; how; sorting; reference_build} -> let f = Bfx.input_url path in let bam = Bfx.bam ~sample_name:bam_sample_name ?sorting ~reference_build f in Bfx.list [Bfx.bam_to_fastq how bam] | Fastq {fastq_sample_name; files} -> let sample_name = fastq_sample_name in List.map files ~f:(fun (fragment_id, source) -> match source with | PE (r1, r2) -> fastq_of_files ~sample_name ?fragment_id ~r1 ~r2 () | SE r -> fastq_of_files ~sample_name ?fragment_id ~r1:r () | Of_bam (how, sorting, reference_build, path) -> let f = Bfx.input_url path in let bam = Bfx.bam ~sample_name ?sorting ~reference_build f in Bfx.bam_to_fastq ?fragment_id how bam ) |> Bfx.list let bwa_mem_opt_inputs_exn inp = let open Input in let is_gz r = Filename.check_suffix r ".gz" || Filename.check_suffix r ".fqz" in let inputs = match inp with | Bam _ -> failwith "Can't pass Input.t Bam to bwa_mem_opt_inputs" | Fastq {fastq_sample_name; files} -> let sample_name = fastq_sample_name in List.map files ~f:(fun (fragment_id, source) -> match source with | PE (r1, r2) when is_gz r1 && is_gz r2 -> `Fastq_gz Bfx.( let r1 = input_url r1 in let r2 = input_url r2 in Bfx.fastq_gz ~sample_name ?fragment_id ~r1 ~r2 () ) | PE (r1, r2) when not (is_gz r1 || is_gz r2) -> `Fastq Bfx.( let r1 = input_url r1 in let r2 = input_url r2 in Bfx.fastq ~sample_name ?fragment_id ~r1 ~r2 () ) | PE _ (* heterogeneous *) -> failwithf "Heterogeneous gzipped / non-gzipped input paired-end \ FASTQs not implemented" | SE r when is_gz r -> `Fastq_gz Bfx.( let r1 = input_url r in Bfx.fastq_gz ~sample_name ?fragment_id ~r1 () ) | SE r (* when not (is_gz r) *) -> `Fastq_gz Bfx.( let r1 = input_url r in Bfx.fastq_gz ~sample_name ?fragment_id ~r1 () ) | Of_bam (how, sorting, reference_build, path) -> let f = Bfx.input_url path in let bam = Bfx.bam ~sample_name ?sorting ~reference_build f in `Bam (bam, how) ) in inputs end end