From 9c41d9fdf22a5b205f6b88df2151ce3d0c5a0b51 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 17 Nov 2021 14:49:08 +0000 Subject: [PATCH] Keep an explicit tree of cancellation contexts This is slightly more efficient, and might also be useful to allow dumping out the tree for debugging. --- lib_eio/cancel.ml | 89 ++++++++++++++++++++------------------ lib_eio/eio.ml | 4 +- lib_eio/eio.mli | 8 +--- lib_eio/fibre.ml | 10 ++--- lib_eio/suspend.ml | 7 +-- lib_eio/switch.ml | 3 +- lib_eio_linux/eio_linux.ml | 16 +++---- lib_eio_luv/eio_luv.ml | 17 +++----- 8 files changed, 69 insertions(+), 85 deletions(-) diff --git a/lib_eio/cancel.ml b/lib_eio/cancel.ml index 3748080..7924500 100644 --- a/lib_eio/cancel.ml +++ b/lib_eio/cancel.ml @@ -17,14 +17,25 @@ type state = type t = { mutable state : state; + parent : t; + children : t Lwt_dllist.t; + protected : bool; +} + +type fibre_context = { + tid : Ctf.id; + mutable cancel : t; } (* A dummy value for bootstrapping *) -let boot = { +let rec boot = { state = Finished; + parent = boot; + children = Lwt_dllist.create (); + protected = true; } -type _ eff += Set_cancel : t -> t eff +type _ eff += Get_context : fibre_context eff let cancelled t = match t.state with @@ -49,70 +60,68 @@ let is_finished t = | Finished -> true | On _ | Cancelling _ -> false -(* Runs [fn] with a fresh cancellation context value (but does not install it). *) -let with_cc fn = +(* Runs [fn] with a fresh cancellation context. *) +let with_cc ~ctx ?parent ~protected fn = let q = Lwt_dllist.create () in - let t = { state = On q } in - Fun.protect (fun () -> fn t) - ~finally:(fun () -> t.state <- Finished) + let parent = Option.value parent ~default:ctx.cancel in + let children = Lwt_dllist.create () in + let t = { state = On q; parent; children; protected } in + let node = Lwt_dllist.add_r t parent.children in + ctx.cancel <- t; + match fn t with + | x -> ctx.cancel <- t.parent; t.state <- Finished; Lwt_dllist.remove node; x + | exception ex -> ctx.cancel <- t.parent; t.state <- Finished; Lwt_dllist.remove node; raise ex -let protect_full fn = - with_cc @@ fun t -> - let x = - let old = perform (Set_cancel t) in - Fun.protect (fun () -> fn t) - ~finally:(fun () -> ignore (perform (Set_cancel old))) - in +let protect fn = + let ctx = perform Get_context in + with_cc ~ctx ?parent:None ~protected:true @@ fun t -> + let x = fn () in check t; x -let protect fn = protect_full (fun (_ : t) -> fn ()) - -let add_hook_unwrapped t hook = +let add_hook t hook = match t.state with | Finished -> invalid_arg "Cancellation context finished!" - | Cancelling (ex, _) -> protect (fun () -> hook ex); Hook.null + | Cancelling (ex, _) -> protect (fun () -> hook (Cancelled ex)); Hook.null | On q -> let node = Lwt_dllist.add_r hook q in (fun () -> Lwt_dllist.remove node) -let add_hook t hook = add_hook_unwrapped t (fun ex -> hook (Cancelled ex)) - -let cancel t ex = +let rec cancel t ex = match t.state with | Finished -> invalid_arg "Cancellation context finished!" | Cancelling _ -> () | On q -> let bt = Printexc.get_raw_backtrace () in t.state <- Cancelling (ex, bt); + let cex = Cancelled ex in let rec aux () = match Lwt_dllist.take_opt_r q with - | None -> [] + | None -> Lwt_dllist.fold_r (cancel_child ex) t.children [] | Some f -> - match f ex with + match f cex with | () -> aux () | exception ex2 -> ex2 :: aux () in match protect aux with | [] -> () | exns -> raise (Cancel_hook_failed exns) +and cancel_child ex t acc = + if t.protected then acc + else match cancel t ex with + | () -> acc + | exception ex -> ex :: acc let sub fn = - with_cc @@ fun t -> + let ctx = perform Get_context in + with_cc ~ctx ?parent:None ~protected:false @@ fun t -> let x = - (* Can't use Fun.protect here because of [Fun.Finally_raised]. *) - let old = perform (Set_cancel t) in - match - let unhook = add_hook_unwrapped old (cancel t) in - Fun.protect (fun () -> fn t) ~finally:unhook - with + match fn t with | x -> - ignore (perform (Set_cancel old)); - check old; + check t.parent; x | exception ex -> - ignore (perform (Set_cancel old)); - check old; + check t.parent; raise ex in match t.state with @@ -123,11 +132,7 @@ let sub fn = (* Like [sub], but it's OK if the new context is cancelled. (instead, return the parent context on exit so the caller can check that) *) let sub_unchecked fn = - with_cc @@ fun t -> - let old = perform (Set_cancel t) in - Fun.protect (fun () -> - let unhook = add_hook_unwrapped old (cancel t) in - Fun.protect (fun () -> fn t) ~finally:unhook - ) - ~finally:(fun () -> ignore (perform (Set_cancel old))); - old + let ctx = perform Get_context in + with_cc ~ctx ?parent:None ~protected:false @@ fun t -> + fn t; + t.parent diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index 91e0f81..e050efb 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -257,7 +257,7 @@ module Stdenv = struct end module Private = struct - type context = Suspend.context = { + type context = Cancel.fibre_context = { tid : Ctf.id; mutable cancel : Cancel.t; } @@ -268,8 +268,8 @@ module Private = struct | Suspend = Suspend.Suspend | Fork = Fibre.Fork | Fork_ignore = Fibre.Fork_ignore + | Get_context = Cancel.Get_context | Trace = Std.Trace - | Set_cancel = Cancel.Set_cancel end let boot_cancel = Cancel.boot end diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 61201dc..e740460 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -283,9 +283,6 @@ module Cancel : sig This can be used to clean up resources on cancellation. However, it is usually better to use {!Switch.on_release} (which calls this for you). *) - val protect_full : (t -> 'a) -> 'a - (** [protect_full fn] is like {!protect}, but also gives access to the new context. *) - val check : t -> unit (** [check t] checks that [t] hasn't been cancelled. @raise Cancelled If the context has been cancelled. *) @@ -619,7 +616,7 @@ module Private : sig | Fork : (unit -> 'a) -> 'a Promise.t eff (** See {!Fibre.fork} *) - | Fork_ignore : (unit -> unit) -> unit eff + | Fork_ignore : (context -> unit) -> unit eff (** See {!Fibre.fork_ignore} *) | Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff @@ -628,8 +625,7 @@ module Private : sig If the system is not ready to receive the trace output, the whole domain must block until it is. *) - | Set_cancel : Cancel.t -> Cancel.t eff - (** [Set_cancel c] sets the running fibre's cancel context to [c] and returns the previous context. *) + | Get_context : context eff end val boot_cancel : Cancel.t diff --git a/lib_eio/fibre.ml b/lib_eio/fibre.ml index 5bde1ef..4012b60 100644 --- a/lib_eio/fibre.ml +++ b/lib_eio/fibre.ml @@ -12,16 +12,14 @@ let fork ~sw ~exn_turn_off f = in perform (Fork f) -type _ eff += Fork_ignore : (unit -> unit) -> unit eff +type _ eff += Fork_ignore : (Cancel.fibre_context -> unit) -> unit eff let fork_ignore ~sw f = - let f () = + let f child = Switch.with_op sw @@ fun () -> try - Cancel.protect_full @@ fun c -> - let hook = Switch.add_cancel_hook_unwrapped sw (Cancel.cancel c) in - Fun.protect f - ~finally:(fun () -> Hook.remove hook) + Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> + f () with ex -> Switch.turn_off sw ex in diff --git a/lib_eio/suspend.ml b/lib_eio/suspend.ml index 78db3d3..c9f67b4 100644 --- a/lib_eio/suspend.ml +++ b/lib_eio/suspend.ml @@ -1,12 +1,7 @@ open EffectHandlers -type context = { - tid : Ctf.id; - mutable cancel : Cancel.t; -} - type 'a enqueue = ('a, exn) result -> unit -type _ eff += Suspend : (context -> 'a enqueue -> unit) -> 'a eff +type _ eff += Suspend : (Cancel.fibre_context -> 'a enqueue -> unit) -> 'a eff let enter_unchecked fn = perform (Suspend fn) diff --git a/lib_eio/switch.ml b/lib_eio/switch.ml index 964b948..4de0a65 100644 --- a/lib_eio/switch.ml +++ b/lib_eio/switch.ml @@ -31,7 +31,6 @@ let rec turn_off t ex = Cancel.cancel t.cancel ex let add_cancel_hook t hook = Cancel.add_hook t.cancel hook -let add_cancel_hook_unwrapped t hook = Cancel.add_hook_unwrapped t.cancel hook let with_op t fn = check t; @@ -43,7 +42,7 @@ let with_op t fn = Waiters.wake_all t.waiter (Ok ()) ) -let await_internal waiters id (ctx:Suspend.context) enqueue = +let await_internal waiters id (ctx:Cancel.fibre_context) enqueue = let cleanup_hooks = Queue.create () in let when_resolved r = Queue.iter Waiters.remove_waiter cleanup_hooks; diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index fab7d01..5b2c1a1 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -927,7 +927,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let rec fork ~tid ~cancel:initial_cancel fn = Ctf.note_switch tid; let fibre = { Eio.Private.tid; cancel = initial_cancel } in - match_with fn () + match_with fn fibre { retc = (fun () -> schedule st); exnc = raise; effc = fun (type a) (e : a eff) -> @@ -972,11 +972,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = ); schedule st ) - | Eio.Private.Effects.Set_cancel cancel -> Some (fun k -> - let old = fibre.cancel in - fibre.cancel <- cancel; - continue k old - ) + | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fibre) | Eio.Private.Effects.Suspend f -> Some (fun k -> let k = { Suspended.k; fibre } in f fibre (function @@ -994,7 +990,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = fork ~tid:id ~cancel:fibre.cancel - (fun () -> + (fun _fibre -> match f () with | x -> Promise.fulfill resolver x | exception ex -> @@ -1007,8 +1003,8 @@ let run ?(queue_depth=64) ?(block_size=4096) main = enqueue_thread st k (); let child = Ctf.note_fork () in Ctf.note_switch child; - fork ~tid:child ~cancel:fibre.cancel (fun () -> - match f () with + fork ~tid:child ~cancel:fibre.cancel (fun new_fibre -> + match f new_fibre with | () -> Ctf.note_resolved child ~ex:None | exception ex -> @@ -1029,7 +1025,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = in let main_done = ref false in let `Exit_scheduler = - fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun () -> + fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun _fibre -> Fun.protect (fun () -> Eio.Cancel.protect (fun () -> main stdenv)) ~finally:(fun () -> main_done := true) ) in diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index 40303d1..7edbe77 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -586,7 +586,7 @@ let run main = let rec fork ~tid ~cancel:initial_cancel fn = Ctf.note_switch tid; let fibre = { Eio.Private.tid; cancel = initial_cancel } in - match_with fn () + match_with fn fibre { retc = (fun () -> ()); exnc = (fun e -> raise e); effc = fun (type a) (e : a eff) -> @@ -607,7 +607,7 @@ let run main = fork ~tid:id ~cancel:fibre.cancel - (fun () -> + (fun _new_fibre -> match f () with | x -> Promise.fulfill resolver x | exception ex -> @@ -620,19 +620,14 @@ let run main = enqueue_thread k (); let child = Ctf.note_fork () in Ctf.note_switch child; - fork ~tid:child ~cancel:fibre.cancel (fun () -> - match f () with + fork ~tid:child ~cancel:fibre.cancel (fun new_fibre -> + match f new_fibre with | () -> Ctf.note_resolved child ~ex:None | exception ex -> Ctf.note_resolved child ~ex:(Some ex) )) - | Eio.Private.Effects.Set_cancel cancel -> - Some (fun k -> - let old = fibre.cancel in - fibre.cancel <- cancel; - continue k old - ) + | Eio.Private.Effects.Get_context -> Some (fun k -> continue k fibre) | Enter_unchecked fn -> Some (fun k -> fn { Suspended.k; fibre } ) @@ -650,7 +645,7 @@ let run main = } in let main_status = ref `Running in - fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun () -> + fork ~tid:(Ctf.mint_id ()) ~cancel:Eio.Private.boot_cancel (fun _new_fibre -> match Eio.Cancel.protect (fun () -> main stdenv) with | () -> main_status := `Done | exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ())