Tidy up forking API

There was some code for recording the fibre result duplicated in each
backend. Also, it didn't do much because we handle errors before that.
And for `fork_promise` we want to report the result against the promise.

Also, optimise Cancel.cancel slightly. No need to make a new context for
the cancel functions if there aren't any.
This commit is contained in:
Thomas Leonard 2022-02-17 09:08:39 +00:00
parent 631e9d150d
commit 4031d6eadd
8 changed files with 54 additions and 52 deletions

View File

@ -55,13 +55,7 @@ module Eio_null = struct
(* Arrange for the forking fibre to run immediately after the new one. *)
t.run_q <- Deep.continue k :: t.run_q;
(* Create and run the new fibre (using fibre context [new_fibre]). *)
fork ~new_fibre (fun () ->
try f ()
with _ ->
(* Fibre.fork handles exceptions for us.
This is just to allow for tracing. *)
()
)
fork ~new_fibre f
)
| Eio.Private.Effects.Get_context -> Some (fun k ->
Deep.continue k fibre

View File

@ -158,9 +158,11 @@ let cancel t ex =
| () -> aux fns
| exception ex2 -> ex2 :: aux fns
in
match protect (fun () -> aux fns) with
| [] -> ()
| exns -> raise (Cancel_hook_failed exns)
if fns <> [] then (
match protect (fun () -> aux fns) with
| [] -> ()
| exns -> raise (Cancel_hook_failed exns)
)
let sub fn =
let ctx = perform Get_context in

View File

@ -1309,7 +1309,8 @@ module Private : sig
[fn] should arrange for [enqueue] to be called once the thread is ready to run again. *)
| Fork : Fibre_context.t * (unit -> unit) -> unit eff
(** See {!Fibre.fork} *)
(** [perform (Fork new_context f)] creates a new fibre and runs [f] in it, with context [new_context].
[f] must not raise an exception. See {!Fibre.fork}. *)
| Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff
(** [perform Trace fmt] writes trace logging to the configured trace output.
@ -1318,6 +1319,7 @@ module Private : sig
the whole domain must block until it is. *)
| Get_context : Fibre_context.t eff
(** [perform Get_context] immediately returns the current fibre's context (without switching fibres). *)
end
(** Temporary hack for compatibility with ocaml.4.12+domains *)

View File

@ -6,27 +6,33 @@ let yield () =
let fibre = Suspend.enter (fun fibre enqueue -> enqueue (Ok fibre)) in
Cancel.check fibre.cancel_context
let fork_raw cc f =
let new_fibre = Cancel.Fibre_context.make ~cc in
(* Note: [f] must not raise an exception, as that would terminate the whole scheduler. *)
let fork_raw new_fibre f =
perform (Fork (new_fibre, f))
let fork ~sw f =
Switch.check_our_domain sw;
fork_raw sw.Switch.cancel @@ fun () ->
Switch.with_op sw @@ fun () ->
try f ()
with ex -> Switch.fail sw ex
if Cancel.is_on sw.cancel then (
let new_fibre = Cancel.Fibre_context.make ~cc:sw.cancel in
fork_raw new_fibre @@ fun () ->
Switch.with_op sw @@ fun () ->
match f () with
| () ->
Ctf.note_resolved (Cancel.Fibre_context.tid new_fibre) ~ex:None
| exception ex ->
Switch.fail sw ex; (* The [with_op] ensures this will succeed *)
Ctf.note_resolved (Cancel.Fibre_context.tid new_fibre) ~ex:(Some ex)
) (* else the fibre should report the error to [sw], but [sw] is failed anyway *)
let fork_promise ~sw f =
Switch.check_our_domain sw;
let new_fibre = Cancel.Fibre_context.make ~cc:sw.Switch.cancel in
let p, r = Promise.create_with_id (Cancel.Fibre_context.tid new_fibre) in
let f () =
match Switch.with_op sw f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex
in
perform (Fork (new_fibre, f));
fork_raw new_fibre (fun () ->
match Switch.with_op sw f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex (* Can't fail; only we have [r] *)
);
p
let fork_on_accept ~on_handler_error ~sw:adopting_sw accept handle =
@ -66,17 +72,22 @@ let fork_on_accept ~on_handler_error ~sw:adopting_sw accept handle =
| x ->
(* Accept succeeded. Fork a new fibre into [adopting_sw] and
run it with [child_switch] as its context. *)
fork_raw child_switch.cancel @@ fun () ->
try run_child (fun () -> Switch.check child_switch; handle child_switch x)
with ex ->
let new_fibre = Cancel.Fibre_context.make ~cc:child_switch.cancel in
fork_raw new_fibre @@ fun () ->
match run_child (fun () -> Switch.check child_switch; handle child_switch x) with
| () ->
Ctf.note_resolved (Cancel.Fibre_context.tid new_fibre) ~ex:None
| exception ex ->
(* No point reporting an error if we're being cancelled. Also, nowhere to run it. *)
if Cancel.is_on adopting_sw.cancel then (
Switch.run_in adopting_sw @@ fun () ->
try on_handler_error ex
with ex2 ->
(* The [run_in] ensures [adopting_sw] isn't finished here *)
Switch.fail adopting_sw ex;
Switch.fail adopting_sw ex2
)
);
Ctf.note_resolved (Cancel.Fibre_context.tid new_fibre) ~ex:(Some ex)
let all xs =
Switch.run @@ fun sw ->
@ -101,6 +112,7 @@ let fork_sub ~sw ~on_error f =
Switch.run_in sw @@ fun () ->
try on_error ex
with ex2 ->
(* The [run_in] ensures [adopting_sw] isn't finished here *)
Switch.fail sw ex;
Switch.fail sw ex2
)
@ -141,13 +153,12 @@ let any fs =
| [f] -> wrap f; []
| f :: fs ->
let new_fibre = Cancel.Fibre_context.make ~cc in
let p, r = Promise.create () in
let f () =
match wrap f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex
in
perform (Fork (new_fibre, f));
let p, r = Promise.create_with_id (Cancel.Fibre_context.tid new_fibre) in
fork_raw new_fibre (fun () ->
match wrap f with
| x -> Promise.resolve_ok r x
| exception ex -> Promise.resolve_error r ex
);
p :: aux fs
in
let ps = aux fs in

View File

@ -27,9 +27,12 @@ let dump f t =
let is_finished t = Cancel.is_finished t.cancel
(* Check switch belongs to this domain (and isn't finished). It's OK if it's cancelling. *)
let check_our_domain t =
if is_finished t then invalid_arg "Switch finished!";
if Domain.self () <> t.cancel.domain then invalid_arg "Switch accessed from wrong domain!"
(* Check isn't cancelled (or finished). *)
let check t =
if is_finished t then invalid_arg "Switch finished!";
Cancel.check t.cancel
@ -41,6 +44,7 @@ let combine_exn ex = function
| None -> ex
| Some ex1 -> Exn.combine ex1 ex
(* Note: raises if [t] is finished or called from wrong domain. *)
let fail ?(bt=Printexc.get_raw_backtrace ()) t ex =
check_our_domain t;
if t.exs = None then

View File

@ -1097,13 +1097,7 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main =
| Eio.Private.Effects.Fork (new_fibre, f) -> Some (fun k ->
let k = { Suspended.k; fibre } in
enqueue_at_head st k ();
fork ~new_fibre (fun () ->
match f () with
| () ->
Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:None
| exception ex ->
Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:(Some ex)
)
fork ~new_fibre f
)
| Eio.Private.Effects.Trace -> Some (fun k -> continue k Eio_utils.Trace.default_traceln)
| Eio_unix.Private.Await_readable fd -> Some (fun k ->

View File

@ -717,15 +717,10 @@ let rec run main =
Some (fun k -> continue k Eio_utils.Trace.default_traceln)
| Eio.Private.Effects.Fork (new_fibre, f) ->
Some (fun k ->
let k = { Suspended.k; fibre } in
enqueue_at_head st k ();
fork ~new_fibre (fun () ->
match f () with
| () ->
Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:None
| exception ex ->
Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:(Some ex)
))
let k = { Suspended.k; fibre } in
enqueue_at_head st k ();
fork ~new_fibre f
)
| Eio.Private.Effects.Get_context -> Some (fun k -> continue k fibre)
| Enter_unchecked fn -> Some (fun k ->
fn st { Suspended.k; fibre }

View File

@ -76,11 +76,11 @@ Exception: Failure "Failed".
Exception: Failure "Failed".
```
`Fibre.both`, first fails but the other doesn't stop:
`Fibre.both`, first fails immediately and the other doesn't start:
```ocaml
# run (fun sw ->
Fibre.both (fun () -> failwith "Failed") ignore;
Fibre.both (fun () -> failwith "Failed") (fun () -> traceln "Second OK");
traceln "Not reached"
);;
Exception: Failure "Failed".