struct
let ensure_one_flowcell_max files =
let open Pvem_lwt_unix.Deferred_result in
let flowcell_re = Re.compile (Re_posix.re {re|.*(\..+)?\.fastq\.gz|re}) in
let flowcells =
Pvem_lwt_unix.Deferred_list.while_sequential files
~f:(fun f -> match Re.Group.all (Re.exec flowcell_re f) with
| [|_; flowcell|] -> return flowcell
| _ -> fail (`Re_group_error
"Re.Group.all returned an unexpected number of groups."))
in
flowcells >>| List.dedup >>= fun flowcells ->
if List.length flowcells > 1
then fail (`Multiple_flowcells flowcells)
else return files
let fastqs ?(paired_end=true) ~host dir =
let open Pvem_lwt_unix.Deferred_result in
(* We only want to select fastq.gzs in the directory: *)
let fastq_re = Re.compile (Re_posix.re {re|.*\.fastq\.gz|re}) in
let fastq_p = Re.execp fastq_re in
let r1_to_r2 s =
let r1_re = Re.compile (Re_posix.re "_R1_") in
Re.replace_string r1_re ~by:"_R2_" s
in
(* If we're expecting paired_end reads, make sure we have a R2 in the
directory for each R1 fragment. Returns the list of R1 fragments. *)
let ensure_matching_r2s files =
if paired_end then
(List.fold ~init:(return []) files ~f:(fun acc r1 ->
acc >>= fun r1s ->
match String.index_of_string r1 ~sub:"_R1_" with
| None -> acc
| Some _ ->
if List.mem (r1_to_r2 r1) ~set:files
then return (r1 :: r1s)
else fail (`R2_expected_for_r1 r1)))
else return files
in
let as_fragments read1s_filenames =
List.mapi read1s_filenames
~f:(fun i r1 ->
if paired_end then
let r2 = dir // (r1_to_r2 r1) in
let r1 = dir // r1 in
let fragment_id = sprintf "fragment-%d" i in
pe ~fragment_id r1 r2
else
let r1 = dir // r1 in
let fragment_id = sprintf "fragment-%d" i in
se ~fragment_id r1)
|> return
in
let cmd = (sprintf "ls '%s' | sort" dir) in
Ketrew.Host_io.(get_shell_command_output (create ()) ~host:host cmd)
>>= fun (out, err) ->
String.split ~on:(`Character '\n') out |> List.filter ~f:fastq_p |> return
>>= ensure_matching_r2s
>>= ensure_one_flowcell_max
>>= as_fragments
end