struct
let _long_running_action_error t ~error ~bookkeeping ~previous_attempts =
let should_kill = Configuration.is_unix_ssh_failure_fatal t.configuration in
match error, should_kill with
| `Recoverable str, true
| `Fatal str, _ -> `Fatal, str, bookkeeping
| `Recoverable str, false when
previous_attempts >=
Ketrew_configuration.maximum_successive_attempts t.configuration ->
`Fatal, str, bookkeeping
| `Recoverable str, false -> `Try_again, str, bookkeeping
let _start_running_target t ~target ~bookkeeping =
let {Target.Automaton. plugin_name; run_parameters} = bookkeeping in
let previous_attempts =
Target.(state target |> State.Count.consecutive_recent_attempts) in
begin match Ketrew_plugin.find_plugin plugin_name with
| Some m ->
let module Long_running = (val m : LONG_RUNNING) in
begin
begin
try return (Long_running.deserialize_exn run_parameters)
with e ->
fail (_long_running_action_error t
~error:(`Fatal (fmt "Deserialize-long-running: %s"
(Printexc.to_string e)))
~bookkeeping ~previous_attempts)
end
>>= fun run_parameters ->
Long_running.start run_parameters
>>< function
| `Ok rp ->
let run_parameters = Long_running.serialize rp in
return { Target.Automaton. plugin_name; run_parameters}
| `Error e ->
fail (_long_running_action_error t ~error:e ~bookkeeping
~previous_attempts)
end
| None ->
let error = `Recoverable (fmt "Missing plugin %S" plugin_name) in
fail (_long_running_action_error t ~error ~bookkeeping ~previous_attempts)
end
let _check_and_activate_dependencies t ~dependency_of ~ids =
Deferred_list.for_concurrent ids ~f:(fun dep ->
get_target t dep >>< function
| `Ok dependency ->
begin match Target.state dependency |> Target.State.simplify with
| `Activable ->
activate_target t ~target:dependency
~reason:(`Dependency dependency_of)
>>= fun () ->
return (dep, `In_progress)
| `In_progress
| `Successful
| `Failed as c ->
return (dep, c)
end
| `Error (`Database _ as e)
| `Error (`Missing_data _ as e) ->
let errlog =
match e with
| `Database e -> Log.s (Database_error.to_string e)
| `Missing_data id -> Log.(s "Missing target: " % quote id) in
Log.(s "Error while activating dependencies: " % errlog @ error);
Log.(s "return (dep, `Failed)" @ verbose);
return (dep, `Failed)
| `Error (`Target _ as e) -> fail e)
>>= begin
let is a b =
match b with
| (_, s) when s = a -> true
| _ -> false in
let all_successful = List.for_all ~f:(is `Successful) in
let one_failed = List.exists ~f:(is `Failed) in
function
| (oks, []) when all_successful oks -> return `All_succeeded
| (oks, []) when one_failed oks ->
let failed_ones = List.filter oks ~f:(is `Failed) |> List.map ~f:fst in
Log.(s "Targets " % OCaml.list s failed_ones % s " considered failed"
@ verbose);
Log.(s "return (`At_least_one_failed failed_ones)" @ verbose);
return (`At_least_one_failed failed_ones)
| (oks, []) ->
return `Still_processing
| (_, errors) ->
Log.(s "Some errors while activating dependencies: " %n
% separate n
(List.map ~f:(fun x -> s (Ketrew_error.to_string x)) errors)
@ error);
return (`At_least_one_failed [])
end
let _attempt_to_kill t ~target ~bookkeeping =
let {Target.Automaton. plugin_name; run_parameters} = bookkeeping in
let previous_attempts =
Target.(state target |> State.Count.consecutive_recent_attempts) in
begin match Ketrew_plugin.find_plugin plugin_name with
| Some m ->
let module Long_running = (val m : LONG_RUNNING) in
let run_parameters = Long_running.deserialize_exn run_parameters in
begin Long_running.kill run_parameters
>>< function
| `Ok (`Killed rp) ->
let run_parameters = Long_running.serialize rp in
return { Target.Automaton. plugin_name; run_parameters}
| `Error e ->
fail (_long_running_action_error t
~error:e ~bookkeeping ~previous_attempts)
end
| None ->
let error = `Recoverable (fmt "Missing plugin %S" plugin_name) in
fail (_long_running_action_error t
~error ~bookkeeping ~previous_attempts)
end
let _check_process t ~target ~bookkeeping =
let {Target.Automaton. plugin_name; run_parameters} = bookkeeping in
begin match Ketrew_plugin.find_plugin plugin_name with
| Some m ->
let module Long_running = (val m : LONG_RUNNING) in
let run_parameters = Long_running.deserialize_exn run_parameters in
begin Long_running.update run_parameters
>>< function
| `Ok (`Still_running run_parameters) ->
let run_parameters = Long_running.serialize run_parameters in
return (`Still_running
{ bookkeeping with
Target.Automaton.run_parameters = run_parameters })
| `Ok (`Succeeded run_parameters) ->
let run_parameters = Long_running.serialize run_parameters in
return (`Successful
{ bookkeeping with
Target.Automaton.run_parameters = run_parameters })
| `Ok (`Failed (run_parameters, msg)) ->
let run_parameters = Long_running.serialize run_parameters in
Log.(s (Target.id target) % s " failed: " % s msg @ very_verbose);
fail (`Fatal, msg,
{ bookkeeping with
Target.Automaton.run_parameters = run_parameters })
| `Error e ->
let previous_attempts =
Target.(state target |> State.Count.consecutive_recent_attempts) in
fail (_long_running_action_error t
~error:e ~bookkeeping ~previous_attempts)
end
| None ->
let error = `Recoverable (fmt "Missing plugin %S" plugin_name) in
let previous_attempts =
Target.(state target |> State.Count.consecutive_recent_attempts) in
fail (_long_running_action_error t ~error ~bookkeeping ~previous_attempts)
end
let _process_automaton_transition t target =
begin match Target.Automaton.transition target with
| `Do_nothing make_new_target ->
return (make_new_target ())
| `Kill (bookkeeping, make_new_target) ->
_attempt_to_kill t ~target ~bookkeeping
>>< fun murder_attempt_result ->
let new_target =
make_new_target ~log:"Attempted to kill" murder_attempt_result in
return new_target
| `Check_and_activate_dependencies make_new_target ->
let ids = Target.depends_on target in
let log =
fmt "Check-and-Activation of [%s]" (String.concat ~sep:", " ids)
in
_check_and_activate_dependencies t
~dependency_of:(Target.id target) ~ids
>>| (make_new_target ~log)
| `Start_running (bookkeeping, make_new_target) ->
_start_running_target t ~target ~bookkeeping
>>< fun starting_attemp ->
return (make_new_target ~log:("Attempt to start") starting_attemp)
| `Eval_condition (condition, make_new_target) ->
begin
Ketrew_eval_condition.bool condition
>>< function
| `Ok answer ->
return (make_new_target ?log:None (`Ok answer))
| `Error e ->
let attempts =
Target.(state target |> State.Count.consecutive_recent_attempts) in
let log = Ketrew_error.to_string e in
let severity =
match e with
| `Volume _ -> `Fatal
| `Host _ ->
if attempts >=
Ketrew_configuration.maximum_successive_attempts t.configuration
then `Fatal else `Try_again
in
return (make_new_target ?log:None (`Error (severity, log)))
end
| `Activate (ids, make_new_target) ->
_check_and_activate_dependencies t
~dependency_of:(Target.id target) ~ids
>>< fun (_ : (_, [`Empty]) Result.t) ->
return (make_new_target ())
| `Check_process (bookkeeping, make_new_target) ->
_check_process t ~target ~bookkeeping
>>< fun result ->
return (make_new_target result)
end
let step t: (bool, _) Deferred_result.t =
fold_active_targets t ~init:[] ~f:begin fun previous_happenings ~target ->
begin match Target.state target with
| s when Target.State.Is.finished s ->
move_target_to_finished_collection t ~target
>>= fun () ->
return []
| other ->
_process_automaton_transition t target
>>< function
| `Ok (new_target, progress) ->
add_or_update_targets t [new_target]
>>= fun () ->
Log.(s "Transition for target: "
% Target.log target
% s "Done: " % n
% Target.(State.log ~depth:2 (state new_target))
@ very_verbose);
return (progress :: previous_happenings)
| `Error `Empty_should_not_exist ->
return []
end
end
>>| List.exists ~f:((=) `Changed_state)
>>= fun has_progressed ->
Killing_targets.proceed_to_mass_killing t
>>= fun killing_did_something ->
Adding_targets.check_and_really_add_targets t
>>= fun adding_did_something ->
return (has_progressed || adding_did_something || killing_did_something)
let fix_point state =
let rec fix_point ~count =
step state
>>= fun progressed ->
let count = count + 1 in
begin match progressed with
| true -> fix_point ~count
| false -> return count
end
in
fix_point ~count:0
>>= fun (count) ->
return (`Steps count)
end