Merge pull request #115 from talex5/context-first

Create fibre context before forking
This commit is contained in:
Thomas Leonard 2021-12-13 15:27:15 +00:00 committed by GitHub
commit 84d66fa050
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 78 additions and 91 deletions

View File

@ -26,7 +26,6 @@ type state =
If cancelled, this is done by calling the cancellation function. *) If cancelled, this is done by calling the cancellation function. *)
type t = { type t = {
mutable state : state; mutable state : state;
parent : t;
children : t Lwt_dllist.t; children : t Lwt_dllist.t;
fibres : fibre_context Lwt_dllist.t; fibres : fibre_context Lwt_dllist.t;
protected : bool; protected : bool;
@ -38,15 +37,6 @@ and fibre_context = {
cancel_fn : (exn -> unit) option Atomic.t; cancel_fn : (exn -> unit) option Atomic.t;
} }
(* A dummy value for bootstrapping *)
let rec boot = {
state = Finished;
parent = boot;
children = Lwt_dllist.create ();
fibres = Lwt_dllist.create ();
protected = false;
}
type _ eff += Get_context : fibre_context eff type _ eff += Get_context : fibre_context eff
let cancelled t = let cancelled t =
@ -78,11 +68,15 @@ let move_fibre_to t fibre =
Option.iter Lwt_dllist.remove fibre.cancel_node; (* Remove from old context *) Option.iter Lwt_dllist.remove fibre.cancel_node; (* Remove from old context *)
fibre.cancel_node <- Some new_node fibre.cancel_node <- Some new_node
(* Runs [fn] with a fresh cancellation context. *) (* Note: the new value is not linked into the cancellation tree. *)
let with_cc ~ctx:fibre ~parent ~protected fn = let create ~protected =
let children = Lwt_dllist.create () in let children = Lwt_dllist.create () in
let fibres = Lwt_dllist.create () in let fibres = Lwt_dllist.create () in
let t = { state = On; parent; children; protected; fibres } in { state = On; children; protected; fibres }
(* Runs [fn] with a fresh cancellation context. *)
let with_cc ~ctx:fibre ~parent ~protected fn =
let t = create ~protected in
let node = Lwt_dllist.add_r t parent.children in let node = Lwt_dllist.add_r t parent.children in
move_fibre_to t fibre; move_fibre_to t fibre;
let cleanup () = let cleanup () =
@ -131,14 +125,15 @@ and cancel_child ex t acc =
let sub fn = let sub fn =
let ctx = perform Get_context in let ctx = perform Get_context in
with_cc ~ctx ~parent:ctx.cancel_context ~protected:false @@ fun t -> let parent = ctx.cancel_context in
with_cc ~ctx ~parent ~protected:false @@ fun t ->
let x = let x =
match fn t with match fn t with
| x -> | x ->
check t.parent; check parent;
x x
| exception ex -> | exception ex ->
check t.parent; check parent;
raise ex raise ex
in in
match t.state with match t.state with
@ -150,9 +145,10 @@ let sub fn =
(instead, return the parent context on exit so the caller can check that) *) (instead, return the parent context on exit so the caller can check that) *)
let sub_unchecked fn = let sub_unchecked fn =
let ctx = perform Get_context in let ctx = perform Get_context in
with_cc ~ctx ~parent:ctx.cancel_context ~protected:false @@ fun t -> let parent = ctx.cancel_context in
with_cc ~ctx ~parent ~protected:false @@ fun t ->
fn t; fn t;
t.parent parent
module Fibre_context = struct module Fibre_context = struct
type t = fibre_context type t = fibre_context
@ -169,11 +165,15 @@ module Fibre_context = struct
let clear_cancel_fn t = let clear_cancel_fn t =
Atomic.exchange t.cancel_fn None <> None Atomic.exchange t.cancel_fn None <> None
let make ~tid ~cc = let make ~cc =
let tid = Ctf.mint_id () in
Ctf.note_created tid Ctf.Task;
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = Atomic.make None } in let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = Atomic.make None } in
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibres); t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibres);
t t
let make_root () = make ~cc:(create ~protected:false)
let destroy t = let destroy t =
Option.iter Lwt_dllist.remove t.cancel_node Option.iter Lwt_dllist.remove t.cancel_node
end end

View File

@ -298,5 +298,4 @@ module Private = struct
| Get_context = Cancel.Get_context | Get_context = Cancel.Get_context
| Trace = Std.Trace | Trace = Std.Trace
end end
let boot_cancel = Cancel.boot
end end

View File

@ -608,8 +608,11 @@ module Private : sig
module Fibre_context : sig module Fibre_context : sig
type t type t
val make : tid:Ctf.id -> cc:Cancel.t -> t val make_root : unit -> t
(** [make ~tid ~cc] is a new context with the given thread ID and cancellation context. *) (** Make a new root context for a new domain. *)
val make : cc:Cancel.t -> t
(** [make ~cc] is a new fibre context, initially attached to the given cancellation context. *)
val destroy : t -> unit val destroy : t -> unit
(** [destroy t] removes [t] from its cancellation context. *) (** [destroy t] removes [t] from its cancellation context. *)
@ -652,10 +655,10 @@ module Private : sig
passing it the suspended fibre's context and a function to resume it. passing it the suspended fibre's context and a function to resume it.
[fn] should arrange for [enqueue] to be called once the thread is ready to run again. *) [fn] should arrange for [enqueue] to be called once the thread is ready to run again. *)
| Fork : (Fibre_context.t -> 'a) -> 'a Promise.t eff | Fork : Fibre_context.t * (unit -> 'a) -> 'a Promise.t eff
(** See {!Fibre.fork} *) (** See {!Fibre.fork} *)
| Fork_ignore : (Fibre_context.t -> unit) -> unit eff | Fork_ignore : Fibre_context.t * (unit -> unit) -> unit eff
(** See {!Fibre.fork_ignore} *) (** See {!Fibre.fork_ignore} *)
| Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff | Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff
@ -666,8 +669,4 @@ module Private : sig
| Get_context : Fibre_context.t eff | Get_context : Fibre_context.t eff
end end
val boot_cancel : Cancel.t
(** A dummy context which is useful briefly during start up before the backend calls {!Cancel.protect}
to install a proper context. *)
end end

View File

@ -1,27 +1,22 @@
open EffectHandlers open EffectHandlers
type _ eff += Fork : (Cancel.fibre_context -> 'a) -> 'a Promise.t eff type _ eff += Fork : Cancel.fibre_context * (unit -> 'a) -> 'a Promise.t eff
let fork ~sw f = let fork ~sw f =
let f child = let f () = Switch.with_op sw f in
Switch.with_op sw @@ fun () -> let new_fibre = Cancel.Fibre_context.make ~cc:sw.cancel in
Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> perform (Fork (new_fibre, f))
f ()
in
perform (Fork f)
type _ eff += Fork_ignore : (Cancel.fibre_context -> unit) -> unit eff type _ eff += Fork_ignore : Cancel.fibre_context * (unit -> unit) -> unit eff
let fork_ignore ~sw f = let fork_ignore ~sw f =
let f child = let f () =
Switch.with_op sw @@ fun () -> Switch.with_op sw @@ fun () ->
try try f ()
Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> with ex -> Switch.turn_off sw ex
f ()
with ex ->
Switch.turn_off sw ex
in in
perform (Fork_ignore f) let new_fibre = Cancel.Fibre_context.make ~cc:sw.cancel in
perform (Fork_ignore (new_fibre, f))
let yield () = let yield () =
let fibre = Suspend.enter (fun fibre enqueue -> enqueue (Ok fibre)) in let fibre = Suspend.enter (fun fibre enqueue -> enqueue (Ok fibre)) in
@ -39,7 +34,10 @@ let pair f g =
try f () try f ()
with ex -> Cancel.cancel cancel ex; raise ex with ex -> Cancel.cancel cancel ex; raise ex
in in
let x = perform (Fork f) in let x =
let new_fibre = Cancel.Fibre_context.make ~cc:cancel in
perform (Fork (new_fibre, f))
in
match g () with match g () with
| gr -> Promise.await x, gr (* [g] succeeds - just report [f]'s result *) | gr -> Promise.await x, gr (* [g] succeeds - just report [f]'s result *)
| exception gex -> | exception gex ->
@ -80,18 +78,18 @@ let await_cancel () =
let any fs = let any fs =
let r = ref `None in let r = ref `None in
let parent_c = let parent_c =
Cancel.sub_unchecked (fun c -> Cancel.sub_unchecked (fun cc ->
let wrap h _fibre = let wrap h () =
match h () with match h () with
| x -> | x ->
begin match !r with begin match !r with
| `None -> r := `Ok x; Cancel.cancel c Not_first | `None -> r := `Ok x; Cancel.cancel cc Not_first
| `Ex _ | `Ok _ -> () | `Ex _ | `Ok _ -> ()
end end
| exception Cancel.Cancelled _ when Cancel.cancelled c -> () | exception Cancel.Cancelled _ when Cancel.cancelled cc -> ()
| exception ex -> | exception ex ->
begin match !r with begin match !r with
| `None -> r := `Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel c ex | `None -> r := `Ex (ex, Printexc.get_raw_backtrace ()); Cancel.cancel cc ex
| `Ok _ -> r := `Ex (ex, Printexc.get_raw_backtrace ()) | `Ok _ -> r := `Ex (ex, Printexc.get_raw_backtrace ())
| `Ex (e1, bt) -> r := `Ex (Multiple_exn.T [e1; ex], bt) | `Ex (e1, bt) -> r := `Ex (Multiple_exn.T [e1; ex], bt)
end end
@ -100,7 +98,8 @@ let any fs =
| [] -> await_cancel () | [] -> await_cancel ()
| [f] -> wrap f (); [] | [f] -> wrap f (); []
| f :: fs -> | f :: fs ->
let p = perform (Fork (wrap f)) in let new_fibre = Cancel.Fibre_context.make ~cc in
let p = perform (Fork (new_fibre, wrap f)) in
p :: aux fs p :: aux fs
in in
let ps = aux fs in let ps = aux fs in

View File

@ -994,10 +994,9 @@ let rec run ?(queue_depth=64) ?(block_size=4096) main =
let eventfd = FD.placeholder ~seekable:false in let eventfd = FD.placeholder ~seekable:false in
let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; io_jobs = 0 } in let st = { mem; uring; run_q; eventfd_mutex; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; io_jobs = 0 } in
Log.debug (fun l -> l "starting main thread"); Log.debug (fun l -> l "starting main thread");
let rec fork ~tid ~cancel:initial_cancel fn = let rec fork ~new_fibre:fibre fn =
Ctf.note_switch tid; Ctf.note_switch (Fibre_context.tid fibre);
let fibre = Fibre_context.make ~tid ~cc:initial_cancel in match_with fn ()
match_with fn fibre
{ retc = (fun () -> Fibre_context.destroy fibre; schedule st); { retc = (fun () -> Fibre_context.destroy fibre; schedule st);
exnc = (fun ex -> exnc = (fun ex ->
Fibre_context.destroy fibre; Fibre_context.destroy fibre;
@ -1053,34 +1052,29 @@ let rec run ?(queue_depth=64) ?(block_size=4096) main =
); );
schedule st schedule st
) )
| Eio.Private.Effects.Fork f -> Some (fun k -> | Eio.Private.Effects.Fork (new_fibre, f) -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
let id = Ctf.mint_id () in let promise, resolver = Promise.create_with_id (Fibre_context.tid new_fibre) in
Ctf.note_created id Ctf.Task;
let promise, resolver = Promise.create_with_id id in
enqueue_thread st k promise; enqueue_thread st k promise;
fork fork
~tid:id ~new_fibre
~cancel:(Fibre_context.cancellation_context fibre) (fun () ->
(fun new_fibre -> match f () with
match f new_fibre with
| x -> Promise.fulfill resolver x | x -> Promise.fulfill resolver x
| exception ex -> | exception ex ->
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex); Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
Promise.break resolver ex Promise.break resolver ex
) )
) )
| Eio.Private.Effects.Fork_ignore f -> Some (fun k -> | Eio.Private.Effects.Fork_ignore (new_fibre, f) -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
enqueue_thread st k (); enqueue_thread st k ();
let child = Ctf.note_fork () in fork ~new_fibre (fun () ->
Ctf.note_switch child; match f () with
fork ~tid:child ~cancel:(Fibre_context.cancellation_context fibre) (fun new_fibre ->
match f new_fibre with
| () -> | () ->
Ctf.note_resolved child ~ex:None Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:None
| exception ex -> | exception ex ->
Ctf.note_resolved child ~ex:(Some ex) Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:(Some ex)
) )
) )
| Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln) | Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln)
@ -1096,7 +1090,8 @@ let rec run ?(queue_depth=64) ?(block_size=4096) main =
} }
in in
let `Exit_scheduler = let `Exit_scheduler =
fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun _fibre -> let new_fibre = Fibre_context.make_root () in
fork ~new_fibre (fun () ->
Switch.run_protected (fun sw -> Switch.run_protected (fun sw ->
let fd = eio_eventfd 0 in let fd = eio_eventfd 0 in
st.eventfd.fd <- `Open fd; st.eventfd.fd <- `Open fd;

View File

@ -612,10 +612,9 @@ let rec run main =
let async = Luv.Async.init ~loop (fun _async -> wakeup run_q) |> or_raise in let async = Luv.Async.init ~loop (fun _async -> wakeup run_q) |> or_raise in
let st = { loop; async; run_q } in let st = { loop; async; run_q } in
let stdenv = Objects.stdenv ~run_event_loop:run in let stdenv = Objects.stdenv ~run_event_loop:run in
let rec fork ~tid ~cancel:initial_cancel fn = let rec fork ~new_fibre:fibre fn =
Ctf.note_switch tid; Ctf.note_switch (Fibre_context.tid fibre);
let fibre = Fibre_context.make ~tid ~cc:initial_cancel in match_with fn ()
match_with fn fibre
{ retc = (fun () -> Fibre_context.destroy fibre); { retc = (fun () -> Fibre_context.destroy fibre);
exnc = (fun e -> Fibre_context.destroy fibre; raise e); exnc = (fun e -> Fibre_context.destroy fibre; raise e);
effc = fun (type a) (e : a eff) -> effc = fun (type a) (e : a eff) ->
@ -626,35 +625,30 @@ let rec run main =
fn loop fibre (enqueue_thread st k)) fn loop fibre (enqueue_thread st k))
| Eio.Private.Effects.Trace -> | Eio.Private.Effects.Trace ->
Some (fun k -> continue k Eunix.Trace.default_traceln) Some (fun k -> continue k Eunix.Trace.default_traceln)
| Eio.Private.Effects.Fork f -> | Eio.Private.Effects.Fork (new_fibre, f) ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
let id = Ctf.mint_id () in let promise, resolver = Promise.create_with_id (Fibre_context.tid new_fibre) in
Ctf.note_created id Ctf.Task;
let promise, resolver = Promise.create_with_id id in
enqueue_thread st k promise; enqueue_thread st k promise;
fork fork
~tid:id ~new_fibre
~cancel:(Fibre_context.cancellation_context fibre) (fun () ->
(fun new_fibre -> match f () with
match f new_fibre with
| x -> Promise.fulfill resolver x | x -> Promise.fulfill resolver x
| exception ex -> | exception ex ->
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex); Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
Promise.break resolver ex Promise.break resolver ex
)) ))
| Eio.Private.Effects.Fork_ignore f -> | Eio.Private.Effects.Fork_ignore (new_fibre, f) ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
enqueue_thread st k (); enqueue_thread st k ();
let child = Ctf.note_fork () in fork ~new_fibre (fun () ->
Ctf.note_switch child; match f () with
fork ~tid:child ~cancel:(Fibre_context.cancellation_context fibre) (fun new_fibre ->
match f new_fibre with
| () -> | () ->
Ctf.note_resolved child ~ex:None Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:None
| exception ex -> | exception ex ->
Ctf.note_resolved child ~ex:(Some ex) Ctf.note_resolved (Fibre_context.tid new_fibre) ~ex:(Some ex)
)) ))
| Eio.Private.Effects.Get_context -> Some (fun k -> continue k fibre) | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fibre)
| Enter_unchecked fn -> Some (fun k -> | Enter_unchecked fn -> Some (fun k ->
@ -674,8 +668,9 @@ let rec run main =
} }
in in
let main_status = ref `Running in let main_status = ref `Running in
fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun _new_fibre -> let new_fibre = Fibre_context.make_root () in
begin match Eio.Cancel.protect (fun () -> main stdenv) with fork ~new_fibre (fun () ->
begin match main stdenv with
| () -> main_status := `Done | () -> main_status := `Done
| exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ()) | exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ())
end; end;