struct
type t =
| Fastq of fastq
| Bam of bam
and fastq = {
fastq_sample_name : string;
files : fastq_fragment list;
}
and bam = {
bam_sample_name: string;
path: string;
how: how;
sorting: sorting option;
reference_build: string;
}
and fastq_fragment = (string option * fastq_data)
and fastq_data =
| PE of string * string
| SE of string
| Of_bam of how * sorting option * string * string
and sorting = [ `Coordinate | `Read_name ]
and how = [ `PE | `SE ]
[@@deriving show,yojson]
let pe ?fragment_id a b = (fragment_id, PE (a, b))
let se ?fragment_id a = (fragment_id, SE a)
let fastq_of_bam ?fragment_id ?sorted ~reference_build how s =
(fragment_id, Of_bam (how, sorted, reference_build, s))
let fastq_sample ~sample_name files = Fastq {fastq_sample_name = sample_name; files}
let bam_sample ~sample_name ?sorting ~reference_build ~how path =
Bam {bam_sample_name = sample_name; path; how; reference_build; sorting }
let tag_v0 = "biokepi-input-v0"
let current_version_tag = tag_v0
let to_yojson =
let string s = `String s in
let option f o : Yojson.Safe.json = Option.value_map o ~default:`Null ~f in
let data_to_yojson =
function
| PE (r1, r2) ->
`Assoc ["paired-end", `Assoc ["r1", string r1; "r2", string r2]]
| SE f -> `Assoc ["single-end", string f]
| Of_bam (how, sorto, refb, fil) ->
`Assoc ["bam",
`Assoc ["kind",
string (match how with
| `PE -> "paired-end"
| `SE -> "single-end");
"sorting",
(match sorto with
| None -> `Null
| Some `Read_name -> `String "read-name"
| Some `Coordinate -> `String "coordinate");
"reference-genome", string refb;
"path", string fil]]
in
let file_to_yojson (fragment_option, data) =
`Assoc [
"fragment-id", option string fragment_option;
"data", data_to_yojson data;
] in
let files_to_yojson files = `List (List.map ~f:file_to_yojson files) in
function
| Fastq {fastq_sample_name; files} ->
`Assoc [current_version_tag,
`Assoc ["fastq", `Assoc [
"sample-name", string fastq_sample_name;
"fragments", files_to_yojson files;
]]]
| Bam bam ->
`Assoc [current_version_tag,
`Assoc ["bam", bam_to_yojson bam]]
let of_yojson j =
let open Pvem.Result in
let error ?json fmt =
ksprintf (fun s ->
fail (sprintf "%s%s" s
(Option.value_map ~default:"" json
~f:(fun j ->
sprintf " but got %s" @@
Yojson.Safe.pretty_to_string ~std:true j)))
) fmt in
let data_of_yojson =
function
| `Assoc ["paired-end", `Assoc ["r1", `String r1; "r2", `String r2]] ->
return (PE (r1, r2))
| `Assoc ["single-end", `String file] -> SE file |> return
| `Assoc ["bam", `Assoc ["kind", `String kind;
"sorting", sorting;
"reference-genome", `String refb;
"path", `String path;]] ->
begin match sorting with
| `Null -> return None
| `String "coordinate" -> Some `Coordinate |> return
| `String "read-name" -> Some `Read_name |> return
| other ->
error ~json:other "Expecting %S, %S or null (in \"sorting\": ...)"
"coordinate" "read-name"
end
>>= fun sorting ->
begin match kind with
| "single-end" -> return `SE
| "paired-end" -> return `PE
| other -> error "Kind in bam must be \"SE\" or \"PE\""
end
>>= fun kind ->
return (Of_bam (kind, sorting, refb, path))
| other ->
error ~json:other "Expecting string or null (in \"fragment\": ...)"
in
let fragment_of_yojson =
function
| `Assoc ["fragment-id", frag; "data", data] ->
begin match frag with
| `String s -> return (Some s)
| `Null -> return (None)
| other ->
error ~json:other "Expecting string or null (in \"fragment\": ...)"
end
>>= fun fragment_id ->
data_of_yojson data
>>= fun data_parsed ->
return (fragment_id, data_parsed)
| other -> error ~json:other "Expecting {\"fragment\": ... , \"data\": ...}"
in
match j with
| `Assoc [vtag, more] when vtag = current_version_tag ->
begin match more with
| `Assoc ["fastq",
`Assoc ["sample-name", `String sample; "fragments", `List frgs]] ->
List.fold ~init:(return []) frgs ~f:(fun prev frag ->
prev >>= fun p ->
fragment_of_yojson frag
>>= fun more ->
return (more :: p))
>>= fun l ->
return (Fastq { fastq_sample_name = sample; files = List.rev l })
| `Assoc ["bam", bam] ->
begin match bam_of_yojson bam with
| Result.Ok ok -> return ok
| Result.Error err -> fail err
end
>>= fun bam ->
return (Bam bam)
| other ->
error ~json:other "Expecting Fastq or Bam"
end
| other ->
error ~json:other "Expecting Biokepi_input_v0"
module Derive = struct
let ensure_one_flowcell_max files =
let open Pvem_lwt_unix.Deferred_result in
let flowcell_re = Re.compile (Re_posix.re {re|.*(\..+)?\.fastq\.gz|re}) in
let flowcells =
Pvem_lwt_unix.Deferred_list.while_sequential files
~f:(fun f -> match Re.Group.all (Re.exec flowcell_re f) with
| [|_; flowcell|] -> return flowcell
| _ -> fail (`Re_group_error
"Re.Group.all returned an unexpected number of groups."))
in
flowcells >>| List.dedup >>= fun flowcells ->
if List.length flowcells > 1
then fail (`Multiple_flowcells flowcells)
else return files
let fastqs ?(paired_end=true) ~host dir =
let open Pvem_lwt_unix.Deferred_result in
(* We only want to select fastq.gzs in the directory: *)
let fastq_re = Re.compile (Re_posix.re {re|.*\.fastq\.gz|re}) in
let fastq_p = Re.execp fastq_re in
let r1_to_r2 s =
let r1_re = Re.compile (Re_posix.re "_R1_") in
Re.replace_string r1_re ~by:"_R2_" s
in
(* If we're expecting paired_end reads, make sure we have a R2 in the
directory for each R1 fragment. Returns the list of R1 fragments. *)
let ensure_matching_r2s files =
if paired_end then
(List.fold ~init:(return []) files ~f:(fun acc r1 ->
acc >>= fun r1s ->
match String.index_of_string r1 ~sub:"_R1_" with
| None -> acc
| Some _ ->
if List.mem (r1_to_r2 r1) ~set:files
then return (r1 :: r1s)
else fail (`R2_expected_for_r1 r1)))
else return files
in
let as_fragments read1s_filenames =
List.mapi read1s_filenames
~f:(fun i r1 ->
if paired_end then
let r2 = dir // (r1_to_r2 r1) in
let r1 = dir // r1 in
let fragment_id = sprintf "fragment-%d" i in
pe ~fragment_id r1 r2
else
let r1 = dir // r1 in
let fragment_id = sprintf "fragment-%d" i in
se ~fragment_id r1)
|> return
in
let cmd = (sprintf "ls '%s' | sort" dir) in
Ketrew.Host_io.(get_shell_command_output (create ()) ~host:host cmd)
>>= fun (out, err) ->
String.split ~on:(`Character '\n') out |> List.filter ~f:fastq_p |> return
>>= ensure_matching_r2s
>>= ensure_one_flowcell_max
>>= as_fragments
end
end