struct
type 'a pipeline = 'a t
type workflow_option_failure_mode = [ `Silent | `Fail_if_not_happening ]
type workflow_option = [
| `Multi_sample_indel_realignment of workflow_option_failure_mode
| `Parallel_alignment_over_fastq_fragments of
[ `Bwa_mem | `Bwa | `Mosaik | `Star | `Hisat ] list
* workflow_option_failure_mode
| `Map_reduce of [ `Gatk_indel_realigner ]
]
type t = {
reference_build: Reference_genome.name;
work_dir: string;
machine : Machine.t;
options: workflow_option list;
wrap_bam_node:
bam pipeline ->
KEDSL.bam_file KEDSL.workflow_node ->
KEDSL.bam_file KEDSL.workflow_node;
wrap_vcf_node:
vcf pipeline ->
KEDSL.vcf_file KEDSL.workflow_node ->
KEDSL.vcf_file KEDSL.workflow_node;
wrap_gtf_node:
gtf pipeline ->
KEDSL.single_file KEDSL.workflow_node ->
KEDSL.single_file KEDSL.workflow_node;
}
let create
?(wrap_bam_node = fun _ x -> x)
?(wrap_vcf_node = fun _ x -> x)
?(wrap_gtf_node = fun _ x -> x)
?(options=[])
~reference_build ~work_dir ~machine () =
{reference_build; work_dir; machine; options;
wrap_bam_node; wrap_vcf_node; wrap_gtf_node}
let has_option {options; _} f =
List.exists options ~f
let apply_with_metadata ~metadata_spec wf =
begin match metadata_spec with
| `Add_tags tgs -> KEDSL.add_tags ~recursive:false wf tgs
| `Add_tags_rec tgs -> KEDSL.add_tags ~recursive:true wf tgs
end;
wf
let rec fastq_step ~read ~compiler (pipeline: fastq pipeline) =
let {work_dir; machine ; _ } = compiler in
match pipeline with
| Fastq f -> f
| Concat_text (l: fastq pipeline list) ->
failwith "Compilation of Biokepi.Pipeline.Concat_text: not implemented"
| Gunzip_concat (l: fastq_gz pipeline list) ->
let fastqs =
let rec f =
function
| Fastq_gz t -> t
| With_metadata (metadata_spec, p) ->
apply_with_metadata ~metadata_spec (f p)
in
List.map l ~f in
let result_path = work_dir // to_file_prefix ~read pipeline ^ ".fastq" in
dbg "Result_Path: %S" result_path;
Workflow_utilities.Gunzip.concat ~run_with:machine fastqs ~result_path
| With_metadata (metadata_spec, pipeline) ->
fastq_step ~read ~compiler pipeline |> apply_with_metadata ~metadata_spec
let rec fastq_sample_step ~compiler (fs : fastq_sample pipeline) =
let {work_dir; machine ; _ } = compiler in
match fs with
| Bam_to_fastq (how, what) ->
let bam = compile_aligner_step ~compiler what in
let sample_type =
match how with `Single -> `Single_end | `Paired -> `Paired_end in
let fastq_pair =
let output_prefix = work_dir // to_file_prefix ?read:None what in
Picard.bam_to_fastq ~run_with:machine ~sample_type
~output_prefix bam
in
fastq_pair
| Paired_end_sample (info, l1, l2) ->
let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) l1 in
let r2 = fastq_step ~compiler ~read:(`R2 info.fragment_id) l2 in
let open KEDSL in
workflow_node (fastq_reads ~host:Machine.(as_host machine)
~name:info.sample_name
r1#product#path (Some r2#product#path))
~name:(sprintf "Pairing sample %s (%s and %s)"
info.sample_name
(Filename.basename r1#product#path)
(Filename.basename r2#product#path))
~edges:[ depends_on r1; depends_on r2 ]
| Single_end_sample (info, single) ->
let r1 = fastq_step ~compiler ~read:(`R1 info.fragment_id) single in
let open KEDSL in
workflow_node (fastq_reads ~host:Machine.(as_host machine)
~name:info.sample_name
r1#product#path None)
~equivalence:`None
~edges:[ depends_on r1 ]
~name:(sprintf "single-end %s (%s)"
info.sample_name
(Filename.basename r1#product#path))
| With_metadata (metadata_spec, p) ->
fastq_sample_step ~compiler p |> apply_with_metadata ~metadata_spec
and compile_aligner_step ~compiler (pipeline : bam pipeline) =
let {reference_build; work_dir; machine; _} = compiler in
let result_prefix = work_dir // to_file_prefix pipeline in
dbg "Result_Prefix: %S" result_prefix;
let rec parallelize_alignment ~make_aligner sample =
match sample with
| Paired_end_sample (info,
Gunzip_concat r1_list,
Gunzip_concat r2_list) ->
let exploded =
let count = ref 0 in
List.map2 r1_list r2_list
~f:(fun r1 r2 ->
match r1, r2 with
| (Fastq_gz wf1, Fastq_gz wf2) ->
let new_info =
incr count;
{info with
fragment_id =
sprintf "%s-fragment-%d" info.fragment_id !count} in
make_aligner (
Paired_end_sample (new_info,
Gunzip_concat [r1],
Gunzip_concat [r2]))
| other -> failwith "compile_aligner_step: not implemented"
) in
let bams = List.map exploded ~f:(compile_aligner_step ~compiler) in
Samtools.merge_bams ~run_with:machine bams
(result_prefix ^ "-merged.bam")
| With_metadata (metadata_spec, p) ->
parallelize_alignment ~make_aligner p
|> apply_with_metadata ~metadata_spec
| other ->
failwith "parallelize_alignment: Not fully implemented"
in
let catch_parallelize_aligner aligner sample =
let option =
List.find_map compiler.options
(function
| `Parallel_alignment_over_fastq_fragments (al, todo)
when List.mem ~set:al aligner -> Some todo
| _ -> None)
in
match sample with
| Paired_end_sample (name,
Gunzip_concat r1_list,
Gunzip_concat r2_list)
when List.length r1_list > 1 ->
begin match option with
| Some _ -> `Caught
| None -> `Not_caught
end
| Paired_end_sample (name,
Gunzip_concat r1_list,
Gunzip_concat r2_list)
when List.length r1_list = 1 -> `Not_caught
| other ->
begin match option with
| Some `Silent
| None -> `Not_caught
| Some `Fail_if_not_happening ->
failwithf "Option Parallel_alignment_over_fastq_fragments is set to Fail_if_not_happening and it didn't happen:\n%s"
(to_json other |> Yojson.Basic.pretty_to_string)
end
in
let perform_aligner_parallelization aligner_tag ~make_aligner
~make_workflow sample =
begin match catch_parallelize_aligner aligner_tag sample with
| `Caught -> parallelize_alignment ~make_aligner sample
| `Not_caught ->
let fastq = fastq_sample_step ~compiler sample in
make_workflow fastq
end
in
let bam_node =
match pipeline with
| Bam_sample (name, bam_target) -> bam_target
| Gatk_indel_realigner (configuration, bam)
when has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner)) ->
let input_bam = compile_aligner_step ~compiler bam in
Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false
~configuration (KEDSL.Single_bam input_bam)
| Gatk_indel_realigner (configuration, bam) ->
let input_bam = compile_aligner_step ~compiler bam in
Gatk.indel_realigner ~run_with:machine ~compress:false
~configuration (KEDSL.Single_bam input_bam)
| Gatk_bqsr (configuration, bam) ->
let input_bam = compile_aligner_step ~compiler bam in
let output_bam = result_prefix ^ ".bam" in
Gatk.base_quality_score_recalibrator
~configuration ~run_with:machine ~input_bam ~output_bam
| Picard_mark_duplicates (settings, bam) ->
let input_bam = compile_aligner_step ~compiler bam in
let output_bam = result_prefix ^ ".bam" in
Picard.mark_duplicates ~settings
~run_with:machine ~input_bam output_bam
| Bwa_mem (bwa_mem_config, fq_sample) ->
perform_aligner_parallelization
`Bwa_mem ~make_aligner:(fun pe -> Bwa_mem (bwa_mem_config, pe))
fq_sample
~make_workflow:(fun fastq ->
Bwa.mem_align_to_sam ~reference_build
~configuration:bwa_mem_config
~fastq ~result_prefix ~run_with:machine ()
|> Samtools.sam_to_bam ~reference_build ~run_with:machine)
| Bwa (bwa_config, what) ->
perform_aligner_parallelization
`Bwa ~make_aligner:(fun pe -> Bwa (bwa_config, pe)) what
~make_workflow:(fun fastq ->
Bwa.align_to_sam ~reference_build
~configuration:bwa_config
~fastq ~result_prefix ~run_with:machine ()
|> Samtools.sam_to_bam ~reference_build ~run_with:machine)
| Mosaik (what) ->
perform_aligner_parallelization
`Mosaik ~make_aligner:(fun pe -> Mosaik (pe)) what
~make_workflow:(fun fastq -> Mosaik.align ~reference_build
~fastq ~result_prefix ~run_with:machine ())
| Star (configuration, what) ->
perform_aligner_parallelization
`Star ~make_aligner:(fun pe -> Star (configuration, pe)) what
~make_workflow:(fun fastq ->
Star.align ~configuration ~reference_build
~fastq ~result_prefix ~run_with:machine ())
| Hisat (configuration, what) ->
perform_aligner_parallelization
`Hisat ~make_aligner:(fun pe -> Hisat (configuration, pe)) what
~make_workflow:(fun fastq ->
Hisat.align ~configuration ~reference_build
~fastq ~result_prefix ~run_with:machine ()
|> Samtools.sam_to_bam ~reference_build ~run_with:machine
)
| With_metadata (metadata_spec, p) ->
compile_aligner_step ~compiler p
|> apply_with_metadata ~metadata_spec
in
compiler.wrap_bam_node pipeline bam_node
let rec compile_bam_pair ~compiler (pipeline : bam_pair pipeline) :
[ `Normal of KEDSL.bam_file KEDSL.workflow_node ] *
[ `Tumor of KEDSL.bam_file KEDSL.workflow_node ] *
[ `Pipeline of bam_pair pipeline ]
=
let {reference_build; work_dir; machine; _} = compiler in
begin match pipeline with
| Bam_pair (
(Gatk_bqsr (n_bqsr_config, Gatk_indel_realigner (n_gir_conf, n_bam)))
,
(Gatk_bqsr (t_bqsr_config, Gatk_indel_realigner (t_gir_conf, t_bam)))
)
when
has_option compiler
(function `Multi_sample_indel_realignment _ -> true | _ -> false)
&& n_gir_conf = t_gir_conf ->
let normal = compile_aligner_step ~compiler n_bam in
let tumor = compile_aligner_step ~compiler t_bam in
let bam_list_node =
if has_option compiler ((=) (`Map_reduce `Gatk_indel_realigner))
then (
Gatk.indel_realigner_map_reduce ~run_with:machine ~compress:false
~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor])
) else
Gatk.indel_realigner ~run_with:machine ~compress:false
~configuration:n_gir_conf (KEDSL.Bam_workflow_list [normal; tumor])
in
begin match KEDSL.explode_bam_list_node bam_list_node with
| [realigned_normal; realigned_tumor] ->
let new_pipeline =
Bam_pair (
Gatk_bqsr (n_bqsr_config,
Bam_sample (Filename.chop_extension realigned_normal#product#path,
realigned_normal)),
Gatk_bqsr (t_bqsr_config,
Bam_sample (Filename.chop_extension realigned_tumor#product#path,
realigned_tumor)))
in
compile_bam_pair ~compiler new_pipeline
| other ->
failwithf "Gatk.indel_realigner did not return the correct list of length 2 (tumor, normal): it gave %d bams"
(List.length other)
end
| Bam_pair ( Gatk_bqsr (_, Gatk_indel_realigner (_, _)),
Gatk_bqsr (_, Gatk_indel_realigner (_, _))) as bam_pair
when
has_option compiler
((=) (`Multi_sample_indel_realignment `Fail_if_not_happening)) ->
failwithf "Option (`Multi_sample_indel_realignment `Fail_if_not_happening) is set and this pipeline does not qualify:\n%s"
(to_json bam_pair |> Yojson.Basic.pretty_to_string)
| Bam_pair (normal_t, tumor_t) as final_pipeline ->
let normal = compile_aligner_step ~compiler normal_t in
let tumor = compile_aligner_step ~compiler tumor_t in
(`Normal normal, `Tumor tumor, `Pipeline final_pipeline)
| With_metadata (metadata_spec, p) ->
let `Normal normal, `Tumor tumor, `Pipeline p =
compile_bam_pair ~compiler p in
(`Normal (apply_with_metadata ~metadata_spec normal),
`Tumor (apply_with_metadata ~metadata_spec tumor),
`Pipeline p)
end
let rec compile_variant_caller_step ~compiler (pipeline: vcf pipeline) =
let {reference_build; work_dir; machine; _} = compiler in
let vcf_node =
match pipeline with
| Somatic_variant_caller (som_vc, bam_pair) ->
let (`Normal normal, `Tumor tumor, `Pipeline new_bam_pair) =
compile_bam_pair ~compiler bam_pair in
let result_prefix =
work_dir
// to_file_prefix (Somatic_variant_caller (som_vc, new_bam_pair)) in
dbg "Result_Prefix: %S" result_prefix;
som_vc.Variant_caller.make_target
~run_with:machine ~input:(Variant_caller.Somatic {normal; tumor})
~result_prefix ()
| Germline_variant_caller (gvc, bam) ->
let result_prefix = work_dir // to_file_prefix pipeline in
dbg "Result_Prefix: %S" result_prefix;
let input_bam = compile_aligner_step ~compiler bam in
gvc.Variant_caller.make_target
~run_with:machine ~input:(Variant_caller.Germline input_bam)
~result_prefix ()
| With_metadata (metadata_spec, p) ->
compile_variant_caller_step ~compiler p
|> apply_with_metadata ~metadata_spec
in
compiler.wrap_vcf_node pipeline vcf_node
let rec compile_gtf_step ~compiler (pipeline: gtf pipeline) =
let {reference_build; work_dir; machine; _} = compiler in
let result_prefix = work_dir // to_file_prefix pipeline in
dbg "Result_Prefix: %S" result_prefix;
let gtf_node =
match pipeline with
| Stringtie (configuration, bam) ->
let bam = compile_aligner_step ~compiler bam in
Stringtie.run ~configuration
~bam ~result_prefix ~run_with:machine ()
| With_metadata (metadata_spec, p) ->
compile_gtf_step ~compiler p
|> apply_with_metadata ~metadata_spec
in
compiler.wrap_gtf_node pipeline gtf_node
let rec seq2hla_hla_types_step ~compiler (pipeline : seq2hla_hla_types pipeline) =
let { machine ; work_dir; _ } = compiler in
match pipeline with
| Seq2HLA sample ->
let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in
let fastq_pair = fastq_sample_step ~compiler sample in
let sample_name = fastq_pair#product#sample_name in
let r1 = KEDSL.read_1_file_node fastq_pair in
let r2 =
match KEDSL.read_2_file_node fastq_pair with
| Some r2 -> r2
| _ -> failwithf "Seq2HLA doesn't support Single_end_sample(s)."
in
Seq2HLA.hla_type
~work_dir ~run_with:machine ~run_name:sample_name ~r1 ~r2
| With_metadata (metadata_spec, p) ->
seq2hla_hla_types_step ~compiler p
|> apply_with_metadata ~metadata_spec
let rec optitype_hla_types_step ~compiler (pipeline : optitype_hla_types pipeline) =
let { machine ; work_dir; _ } = compiler in
match pipeline with
| Optitype (kind, sample) ->
let work_dir = work_dir // (to_file_prefix pipeline) ^ "_work_dir" in
let fastq = fastq_sample_step ~compiler sample in
let sample_name = fastq#product#sample_name in
Optitype.hla_type
~work_dir ~run_with:machine ~run_name:sample_name ~fastq kind
| With_metadata (metadata_spec, p) ->
optitype_hla_types_step ~compiler p
|> apply_with_metadata ~metadata_spec
end