mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-10-09 00:04:06 -04:00
Set cancelletion context correctly in Fibre.fork
In should inherit from the switch, not from the current context.
This commit is contained in:
parent
9c41d9fdf2
commit
0116670732
@ -32,7 +32,7 @@ let rec boot = {
|
|||||||
state = Finished;
|
state = Finished;
|
||||||
parent = boot;
|
parent = boot;
|
||||||
children = Lwt_dllist.create ();
|
children = Lwt_dllist.create ();
|
||||||
protected = true;
|
protected = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
type _ eff += Get_context : fibre_context eff
|
type _ eff += Get_context : fibre_context eff
|
||||||
@ -61,20 +61,19 @@ let is_finished t =
|
|||||||
| On _ | Cancelling _ -> false
|
| On _ | Cancelling _ -> false
|
||||||
|
|
||||||
(* Runs [fn] with a fresh cancellation context. *)
|
(* Runs [fn] with a fresh cancellation context. *)
|
||||||
let with_cc ~ctx ?parent ~protected fn =
|
let with_cc ~ctx ~parent ~protected fn =
|
||||||
let q = Lwt_dllist.create () in
|
let q = Lwt_dllist.create () in
|
||||||
let parent = Option.value parent ~default:ctx.cancel in
|
|
||||||
let children = Lwt_dllist.create () in
|
let children = Lwt_dllist.create () in
|
||||||
let t = { state = On q; parent; children; protected } in
|
let t = { state = On q; parent; children; protected } in
|
||||||
let node = Lwt_dllist.add_r t parent.children in
|
let node = Lwt_dllist.add_r t parent.children in
|
||||||
ctx.cancel <- t;
|
ctx.cancel <- t;
|
||||||
match fn t with
|
match fn t with
|
||||||
| x -> ctx.cancel <- t.parent; t.state <- Finished; Lwt_dllist.remove node; x
|
| x -> ctx.cancel <- t.parent; t.state <- Finished; Lwt_dllist.remove node; x
|
||||||
| exception ex -> ctx.cancel <- t.parent; t.state <- Finished; Lwt_dllist.remove node; raise ex
|
| exception ex -> ctx.cancel <- t.parent; t.state <- Finished; Lwt_dllist.remove node; raise ex
|
||||||
|
|
||||||
let protect fn =
|
let protect fn =
|
||||||
let ctx = perform Get_context in
|
let ctx = perform Get_context in
|
||||||
with_cc ~ctx ?parent:None ~protected:true @@ fun t ->
|
with_cc ~ctx ~parent:ctx.cancel ~protected:true @@ fun t ->
|
||||||
let x = fn () in
|
let x = fn () in
|
||||||
check t;
|
check t;
|
||||||
x
|
x
|
||||||
@ -114,7 +113,7 @@ and cancel_child ex t acc =
|
|||||||
|
|
||||||
let sub fn =
|
let sub fn =
|
||||||
let ctx = perform Get_context in
|
let ctx = perform Get_context in
|
||||||
with_cc ~ctx ?parent:None ~protected:false @@ fun t ->
|
with_cc ~ctx ~parent:ctx.cancel ~protected:false @@ fun t ->
|
||||||
let x =
|
let x =
|
||||||
match fn t with
|
match fn t with
|
||||||
| x ->
|
| x ->
|
||||||
@ -133,6 +132,6 @@ let sub fn =
|
|||||||
(instead, return the parent context on exit so the caller can check that) *)
|
(instead, return the parent context on exit so the caller can check that) *)
|
||||||
let sub_unchecked fn =
|
let sub_unchecked fn =
|
||||||
let ctx = perform Get_context in
|
let ctx = perform Get_context in
|
||||||
with_cc ~ctx ?parent:None ~protected:false @@ fun t ->
|
with_cc ~ctx ~parent:ctx.cancel ~protected:false @@ fun t ->
|
||||||
fn t;
|
fn t;
|
||||||
t.parent
|
t.parent
|
||||||
|
@ -613,7 +613,7 @@ module Private : sig
|
|||||||
passing it the suspended fibre's context and a function to resume it.
|
passing it the suspended fibre's context and a function to resume it.
|
||||||
[fn] should arrange for [enqueue] to be called once the thread is ready to run again. *)
|
[fn] should arrange for [enqueue] to be called once the thread is ready to run again. *)
|
||||||
|
|
||||||
| Fork : (unit -> 'a) -> 'a Promise.t eff
|
| Fork : (context -> 'a) -> 'a Promise.t eff
|
||||||
(** See {!Fibre.fork} *)
|
(** See {!Fibre.fork} *)
|
||||||
|
|
||||||
| Fork_ignore : (context -> unit) -> unit eff
|
| Fork_ignore : (context -> unit) -> unit eff
|
||||||
|
@ -1,11 +1,13 @@
|
|||||||
open EffectHandlers
|
open EffectHandlers
|
||||||
|
|
||||||
type _ eff += Fork : (unit -> 'a) -> 'a Promise.t eff
|
type _ eff += Fork : (Cancel.fibre_context -> 'a) -> 'a Promise.t eff
|
||||||
|
|
||||||
let fork ~sw ~exn_turn_off f =
|
let fork ~sw ~exn_turn_off f =
|
||||||
let f () =
|
let f child =
|
||||||
Switch.with_op sw @@ fun () ->
|
Switch.with_op sw @@ fun () ->
|
||||||
try f ()
|
try
|
||||||
|
Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t ->
|
||||||
|
f ()
|
||||||
with ex ->
|
with ex ->
|
||||||
if exn_turn_off then Switch.turn_off sw ex;
|
if exn_turn_off then Switch.turn_off sw ex;
|
||||||
raise ex
|
raise ex
|
||||||
@ -41,7 +43,7 @@ let both f g = all [f; g]
|
|||||||
|
|
||||||
let pair f g =
|
let pair f g =
|
||||||
Cancel.sub @@ fun cancel ->
|
Cancel.sub @@ fun cancel ->
|
||||||
let f () =
|
let f _fibre =
|
||||||
try f ()
|
try f ()
|
||||||
with ex -> Cancel.cancel cancel ex; raise ex
|
with ex -> Cancel.cancel cancel ex; raise ex
|
||||||
in
|
in
|
||||||
@ -88,7 +90,7 @@ let any fs =
|
|||||||
let r = ref `None in
|
let r = ref `None in
|
||||||
let parent_c =
|
let parent_c =
|
||||||
Cancel.sub_unchecked (fun c ->
|
Cancel.sub_unchecked (fun c ->
|
||||||
let wrap h () =
|
let wrap h _fibre =
|
||||||
match h () with
|
match h () with
|
||||||
| x ->
|
| x ->
|
||||||
begin match !r with
|
begin match !r with
|
||||||
|
@ -990,8 +990,8 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
|
|||||||
fork
|
fork
|
||||||
~tid:id
|
~tid:id
|
||||||
~cancel:fibre.cancel
|
~cancel:fibre.cancel
|
||||||
(fun _fibre ->
|
(fun new_fibre ->
|
||||||
match f () with
|
match f new_fibre with
|
||||||
| x -> Promise.fulfill resolver x
|
| x -> Promise.fulfill resolver x
|
||||||
| exception ex ->
|
| exception ex ->
|
||||||
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
|
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
|
||||||
|
@ -607,8 +607,8 @@ let run main =
|
|||||||
fork
|
fork
|
||||||
~tid:id
|
~tid:id
|
||||||
~cancel:fibre.cancel
|
~cancel:fibre.cancel
|
||||||
(fun _new_fibre ->
|
(fun new_fibre ->
|
||||||
match f () with
|
match f new_fibre with
|
||||||
| x -> Promise.fulfill resolver x
|
| x -> Promise.fulfill resolver x
|
||||||
| exception ex ->
|
| exception ex ->
|
||||||
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
|
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
|
||||||
|
@ -218,3 +218,31 @@ Exception: Stdlib.Exit.
|
|||||||
+Caught: Cancelled: Failure("simulated error")
|
+Caught: Cancelled: Failure("simulated error")
|
||||||
Exception: Failure "simulated error".
|
Exception: Failure "simulated error".
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# Fibre.fork
|
||||||
|
|
||||||
|
`Fibre.fork ~sw` inherits the cancellation context from `sw`, not from the current fibre:
|
||||||
|
|
||||||
|
```ocaml
|
||||||
|
# run @@ fun () ->
|
||||||
|
let switch = ref None in
|
||||||
|
Fibre.both
|
||||||
|
(fun () ->
|
||||||
|
Switch.run @@ fun sw ->
|
||||||
|
switch := Some sw;
|
||||||
|
Fibre.await_cancel ()
|
||||||
|
)
|
||||||
|
(fun () ->
|
||||||
|
let sw = Option.get !switch in
|
||||||
|
Eio.Cancel.protect @@ fun () ->
|
||||||
|
let child = Fibre.fork ~sw ~exn_turn_off:true (fun () ->
|
||||||
|
traceln "Forked child";
|
||||||
|
Fibre.await_cancel ()
|
||||||
|
) in
|
||||||
|
Switch.turn_off sw Exit;
|
||||||
|
Promise.await child
|
||||||
|
);
|
||||||
|
"not reached";;
|
||||||
|
+Forked child
|
||||||
|
Exception: Stdlib.Exit.
|
||||||
|
```
|
||||||
|
Loading…
x
Reference in New Issue
Block a user