struct
let while_sequential:
'a list -> f:('a -> ('c, 'b) t) -> ('c list, 'b) t
= fun (type b) (l: 'a list) ~(f: 'a -> ('c, b) t) ->
let module Map_sequential = struct
exception Local_exception of b
let ms l f =
wrap_deferred
(fun () ->
Lwt_list.map_s (fun o ->
Lwt.bind (f o) (function
| `Ok oo -> Lwt.return oo
| `Error ee -> Lwt.fail (Local_exception ee))) l)
~on_exn:(function
| Local_exception e -> e
| e ->
ksprintf failwith "Expecting only Local_exception, but got: %s"
(Printexc.to_string e) ())
end in
Map_sequential.ms l f
let for_sequential:
'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
= fun l ~f ->
let oks = ref [] in
let errors = ref [] in
List.fold_left l ~init:(return ()) ~f:(fun prevm elt ->
prevm >>= fun () ->
f elt >>< function
| `Ok o -> oks := o :: !oks; return ()
| `Error e -> errors := e :: !errors; return ())
>>= fun () ->
return (List.rev !oks, List.rev !errors)
let for_concurrent:
'a list -> f:('a -> ('c, 'b) t) -> ('c list * 'b list, 'd) t
= fun l ~f ->
let oks = ref [] in
let errors = ref [] in
Lwt.(
Lwt_list.map_p (fun elt ->
f elt >>= function
| `Ok o -> oks := o :: !oks; return ()
| `Error e -> errors := e :: !errors; return ()) l
>>= fun _ ->
return (`Ok ())
)
>>= fun () ->
return (List.rev !oks, List.rev !errors)
let for_concurrent_with_index l ~f =
let with_indexes = List.mapi l ~f:(fun i a -> (i, a)) in
for_concurrent with_indexes ~f:(fun (i, a) -> f i a)
let pick_and_cancel: ('a, 'error) t list -> ('a, 'error) t = fun l ->
Lwt.pick l
end