diff --git a/lib_eio/cancel.ml b/lib_eio/cancel.ml index 0dac95f..28f0f3d 100644 --- a/lib_eio/cancel.ml +++ b/lib_eio/cancel.ml @@ -26,7 +26,6 @@ type state = If cancelled, this is done by calling the cancellation function. *) type t = { mutable state : state; - parent : t; children : t Lwt_dllist.t; fibres : fibre_context Lwt_dllist.t; protected : bool; @@ -38,15 +37,6 @@ and fibre_context = { cancel_fn : (exn -> unit) option Atomic.t; } -(* A dummy value for bootstrapping *) -let rec boot = { - state = Finished; - parent = boot; - children = Lwt_dllist.create (); - fibres = Lwt_dllist.create (); - protected = false; -} - type _ eff += Get_context : fibre_context eff let cancelled t = @@ -78,11 +68,15 @@ let move_fibre_to t fibre = Option.iter Lwt_dllist.remove fibre.cancel_node; (* Remove from old context *) fibre.cancel_node <- Some new_node -(* Runs [fn] with a fresh cancellation context. *) -let with_cc ~ctx:fibre ~parent ~protected fn = +(* Note: the new value is not linked into the cancellation tree. *) +let create ~protected = let children = Lwt_dllist.create () in let fibres = Lwt_dllist.create () in - let t = { state = On; parent; children; protected; fibres } in + { state = On; children; protected; fibres } + +(* Runs [fn] with a fresh cancellation context. *) +let with_cc ~ctx:fibre ~parent ~protected fn = + let t = create ~protected in let node = Lwt_dllist.add_r t parent.children in move_fibre_to t fibre; let cleanup () = @@ -131,14 +125,15 @@ and cancel_child ex t acc = let sub fn = let ctx = perform Get_context in - with_cc ~ctx ~parent:ctx.cancel_context ~protected:false @@ fun t -> + let parent = ctx.cancel_context in + with_cc ~ctx ~parent ~protected:false @@ fun t -> let x = match fn t with | x -> - check t.parent; + check parent; x | exception ex -> - check t.parent; + check parent; raise ex in match t.state with @@ -150,9 +145,10 @@ let sub fn = (instead, return the parent context on exit so the caller can check that) *) let sub_unchecked fn = let ctx = perform Get_context in - with_cc ~ctx ~parent:ctx.cancel_context ~protected:false @@ fun t -> + let parent = ctx.cancel_context in + with_cc ~ctx ~parent ~protected:false @@ fun t -> fn t; - t.parent + parent module Fibre_context = struct type t = fibre_context @@ -169,11 +165,15 @@ module Fibre_context = struct let clear_cancel_fn t = Atomic.exchange t.cancel_fn None <> None - let make ~tid ~cc = + let make ~cc = + let tid = Ctf.mint_id () in + Ctf.note_created tid Ctf.Task; let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = Atomic.make None } in t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibres); t + let make_root () = make ~cc:(create ~protected:false) + let destroy t = Option.iter Lwt_dllist.remove t.cancel_node end diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index ce8cbb6..bfb466d 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -298,5 +298,4 @@ module Private = struct | Get_context = Cancel.Get_context | Trace = Std.Trace end - let boot_cancel = Cancel.boot end diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 72fb92f..ab33266 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -608,8 +608,11 @@ module Private : sig module Fibre_context : sig type t - val make : tid:Ctf.id -> cc:Cancel.t -> t - (** [make ~tid ~cc] is a new context with the given thread ID and cancellation context. *) + val make_root : unit -> t + (** Make a new root context for a new domain. *) + + val make : cc:Cancel.t -> t + (** [make ~cc] is a new fibre context, initially attached to the given cancellation context. *) val destroy : t -> unit (** [destroy t] removes [t] from its cancellation context. *) @@ -652,10 +655,10 @@ module Private : sig passing it the suspended fibre's context and a function to resume it. [fn] should arrange for [enqueue] to be called once the thread is ready to run again. *) - | Fork : (Fibre_context.t -> 'a) -> 'a Promise.t eff + | Fork : Fibre_context.t * (unit -> 'a) -> 'a Promise.t eff (** See {!Fibre.fork} *) - | Fork_ignore : (Fibre_context.t -> unit) -> unit eff + | Fork_ignore : Fibre_context.t * (unit -> unit) -> unit eff (** See {!Fibre.fork_ignore} *) | Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff @@ -666,8 +669,4 @@ module Private : sig | Get_context : Fibre_context.t eff end - - val boot_cancel : Cancel.t - (** A dummy context which is useful briefly during start up before the backend calls {!Cancel.protect} - to install a proper context. *) end diff --git a/lib_eio/fibre.ml b/lib_eio/fibre.ml index bf5ccc4..12c72d1 100644 --- a/lib_eio/fibre.ml +++ b/lib_eio/fibre.ml @@ -1,27 +1,22 @@ open EffectHandlers -type _ eff += Fork : (Cancel.fibre_context -> 'a) -> 'a Promise.t eff +type _ eff += Fork : Cancel.fibre_context * (unit -> 'a) -> 'a Promise.t eff let fork ~sw f = - let f child = - Switch.with_op sw @@ fun () -> - Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> - f () - in - perform (Fork f) + let f () = Switch.with_op sw f in + let new_fibre = Cancel.Fibre_context.make ~cc:sw.cancel in + perform (Fork (new_fibre, f)) -type _ eff += Fork_ignore : (Cancel.fibre_context -> unit) -> unit eff +type _ eff += Fork_ignore : Cancel.fibre_context * (unit -> unit) -> unit eff let fork_ignore ~sw f = - let f child = + let f () = Switch.with_op sw @@ fun () -> - try - Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> - f () - with ex -> - Switch.turn_off sw ex + try f () + with ex -> Switch.turn_off sw ex in - perform (Fork_ignore f) + let new_fibre = Cancel.Fibre_context.make ~cc:sw.cancel in + perform (Fork_ignore (new_fibre, f)) let yield () = let fibre = Suspend.enter (fun fibre enqueue -> enqueue (Ok fibre)) in @@ -39,7 +34,10 @@ let pair f g = try f () with ex -> Cancel.cancel cancel ex; raise ex in - let x = perform (Fork f) in + let x = + let new_fibre = Cancel.Fibre_context.make ~cc:cancel in + perform (Fork (new_fibre, f)) + in match g () with | gr -> Promise.await x, gr (* [g] succeeds - just report [f]'s result *) | exception gex -> @@ -80,18 +78,18 @@ let await_cancel () = let any fs = let r = ref `None in let parent_c = - Cancel.sub_unchecked (fun c -> - let wrap h _fibre = + Cancel.sub_unchecked (fun cc -> + let wrap h () = match h () with | x -> begin match !r with - | `None -> r := `Ok x; Cancel.cancel c Not_first + | `None -> r := `Ok x; Cancel.cancel cc Not_first | `Ex _ | `Ok _ -> () end - | exception Cancel.Cancelled _ when Cancel.cancelled c -> () + | exception Cancel.Cancelled _ when Cancel.cancelled cc -> () | exception ex -> begin match !r with - | `None -> r := `Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel c ex + | `None -> r := `Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel cc ex | `Ok _ -> r := `Ex (ex, Printexc.get_raw_backtrace ()) | `Ex (e1, bt) -> r := `Ex (Multiple_exn.T [e1; ex], bt) end @@ -100,7 +98,8 @@ let any fs = | [] -> await_cancel () | [f] -> wrap f (); [] | f :: fs -> - let p = perform (Fork (wrap f)) in + let new_fibre = Cancel.Fibre_context.make ~cc in + let p = perform (Fork (new_fibre, wrap f)) in p :: aux fs in let ps = aux fs in diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index a570c1a..115cd89 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -994,10 +994,9 @@ let rec run ?(queue_depth=64) ?(block_size=4096) main = let eventfd = FD.placeholder ~seekable:false in let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; io_jobs = 0 } in Log.debug (fun l -> l "starting main thread"); - let rec fork ~tid ~cancel:initial_cancel fn = - Ctf.note_switch tid; - let fibre = Fibre_context.make ~tid ~cc:initial_cancel in - match_with fn fibre + let rec fork ~new_fibre:fibre fn = + Ctf.note_switch (Fibre_context.tid fibre); + match_with fn () { retc = (fun () -> Fibre_context.destroy fibre; schedule st); exnc = (fun ex -> Fibre_context.destroy fibre; @@ -1053,34 +1052,29 @@ let rec run ?(queue_depth=64) ?(block_size=4096) main = ); schedule st ) - | Eio.Private.Effects.Fork f -> Some (fun k -> + | Eio.Private.Effects.Fork (new_fibre, f) -> Some (fun k -> let k = { Suspended.k; fibre } in - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Task; - let promise, resolver = Promise.create_with_id id in + let promise, resolver = Promise.create_with_id (Fibre_context.tid new_fibre) in enqueue_thread st k promise; fork - ~tid:id - ~cancel:(Fibre_context.cancellation_context fibre) - (fun new_fibre -> - match f new_fibre with + ~new_fibre + (fun () -> + match f () with | x -> Promise.fulfill resolver x | exception ex -> Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex); Promise.break resolver ex ) ) - | Eio.Private.Effects.Fork_ignore f -> Some (fun k -> + | Eio.Private.Effects.Fork_ignore (new_fibre, f) -> Some (fun k -> let k = { Suspended.k; fibre } in enqueue_thread st k (); - let child = Ctf.note_fork () in - Ctf.note_switch child; - fork ~tid:child ~cancel:(Fibre_context.cancellation_context fibre) (fun new_fibre -> - match f new_fibre with + fork ~new_fibre (fun () -> + match f () with | () -> - Ctf.note_resolved child ~ex:None + Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:None | exception ex -> - Ctf.note_resolved child ~ex:(Some ex) + Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:(Some ex) ) ) | Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln) @@ -1096,7 +1090,8 @@ let rec run ?(queue_depth=64) ?(block_size=4096) main = } in let `Exit_scheduler = - fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun _fibre -> + let new_fibre = Fibre_context.make_root () in + fork ~new_fibre (fun () -> Switch.run_protected (fun sw -> let fd = eio_eventfd 0 in st.eventfd.fd <- `Open fd; diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index 7ca0233..5723a0b 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -612,10 +612,9 @@ let rec run main = let async = Luv.Async.init ~loop (fun _async -> wakeup run_q) |> or_raise in let st = { loop; async; run_q } in let stdenv = Objects.stdenv ~run_event_loop:run in - let rec fork ~tid ~cancel:initial_cancel fn = - Ctf.note_switch tid; - let fibre = Fibre_context.make ~tid ~cc:initial_cancel in - match_with fn fibre + let rec fork ~new_fibre:fibre fn = + Ctf.note_switch (Fibre_context.tid fibre); + match_with fn () { retc = (fun () -> Fibre_context.destroy fibre); exnc = (fun e -> Fibre_context.destroy fibre; raise e); effc = fun (type a) (e : a eff) -> @@ -626,35 +625,30 @@ let rec run main = fn loop fibre (enqueue_thread st k)) | Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln) - | Eio.Private.Effects.Fork f -> + | Eio.Private.Effects.Fork (new_fibre, f) -> Some (fun k -> let k = { Suspended.k; fibre } in - let id = Ctf.mint_id () in - Ctf.note_created id Ctf.Task; - let promise, resolver = Promise.create_with_id id in + let promise, resolver = Promise.create_with_id (Fibre_context.tid new_fibre) in enqueue_thread st k promise; fork - ~tid:id - ~cancel:(Fibre_context.cancellation_context fibre) - (fun new_fibre -> - match f new_fibre with + ~new_fibre + (fun () -> + match f () with | x -> Promise.fulfill resolver x | exception ex -> Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex); Promise.break resolver ex )) - | Eio.Private.Effects.Fork_ignore f -> + | Eio.Private.Effects.Fork_ignore (new_fibre, f) -> Some (fun k -> let k = { Suspended.k; fibre } in enqueue_thread st k (); - let child = Ctf.note_fork () in - Ctf.note_switch child; - fork ~tid:child ~cancel:(Fibre_context.cancellation_context fibre) (fun new_fibre -> - match f new_fibre with + fork ~new_fibre (fun () -> + match f () with | () -> - Ctf.note_resolved child ~ex:None + Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:None | exception ex -> - Ctf.note_resolved child ~ex:(Some ex) + Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:(Some ex) )) | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fibre) | Enter_unchecked fn -> Some (fun k -> @@ -674,8 +668,9 @@ let rec run main = } in let main_status = ref `Running in - fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun _new_fibre -> - begin match Eio.Cancel.protect (fun () -> main stdenv) with + let new_fibre = Fibre_context.make_root () in + fork ~new_fibre (fun () -> + begin match main stdenv with | () -> main_status := `Done | exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ()) end;