Draft implementation of fiber-local storage

This commit is contained in:
Jonathan Coates 2022-07-19 21:12:16 +01:00
parent b46ffba2c5
commit afab28c088
No known key found for this signature in database
GPG Key ID: B9E431FF07C98D06
9 changed files with 150 additions and 6 deletions

View File

@ -19,6 +19,7 @@
(optint (>= 0.1.0))
(psq (>= 0.2.0))
(fmt (>= 0.8.9))
(hmap (>= 0.8.1))
(astring (and (>= 0.8.5) :with-test))
(crowbar (and (>= 0.2) :with-test))
(mtime (>= 1.2.0))

View File

@ -17,6 +17,7 @@ depends: [
"optint" {>= "0.1.0"}
"psq" {>= "0.2.0"}
"fmt" {>= "0.8.9"}
"hmap" {>= "0.8.1"}
"astring" {>= "0.8.5" & with-test}
"crowbar" {>= "0.2" & with-test}
"mtime" {>= "1.2.0"}

View File

@ -27,6 +27,7 @@ and fiber_context = {
mutable cancel_context : t;
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
cancel_fn : (exn -> unit) option Atomic.t;
mutable context : Hmap.t;
}
type _ Effect.t += Get_context : fiber_context Effect.t
@ -178,6 +179,36 @@ let sub_unchecked fn =
fn t;
parent
module Context = struct
type context = Hmap.t
let get_context () =
let fiber = Effect.perform Get_context in
fiber.context
let with_context_in ~fiber context fn =
let old_context = fiber.context in
fiber.context <- context;
let cleanup () = fiber.context <- old_context in
match fn () with
| x -> cleanup (); x
| exception ex -> cleanup (); raise ex
let with_context context fn =
let fiber = Effect.perform Get_context in
with_context_in ~fiber context fn
type 'a key = 'a Hmap.key
let create_key () = Hmap.Key.create ()
let get key = Hmap.find key (get_context ())
let with_value var value fn =
let fiber = Effect.perform Get_context in
with_context_in ~fiber (Hmap.add var value fiber.context) fn
end
module Fiber_context = struct
type t = fiber_context
@ -193,18 +224,24 @@ module Fiber_context = struct
let clear_cancel_fn t =
Atomic.exchange t.cancel_fn None <> None
let make ~cc =
let make_raw ~cc ~context =
let tid = Ctf.mint_id () in
Ctf.note_created tid Ctf.Task;
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = Atomic.make None } in
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = Atomic.make None; context } in
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
t
let make ~cc =
let ctx = Effect.perform Get_context in
make_raw ~cc ~context:ctx.context
let make_root () =
let cc = create ~protected:false in
cc.state <- On;
make ~cc
make_raw ~cc ~context:Hmap.empty
let destroy t =
Option.iter Lwt_dllist.remove t.cancel_node
let context_store t = t.context
end

View File

@ -1,4 +1,4 @@
(library
(name eio__core)
(public_name eio.core)
(libraries cstruct lwt-dllist fmt))
(libraries cstruct hmap lwt-dllist fmt))

View File

@ -3,6 +3,7 @@ module Fiber = Fiber
module Switch = Switch
module Cancel = Cancel
module Exn = Exn
module Context = Cancel.Context
module Private = struct
module Suspend = Suspend
module Waiters = Waiters
@ -11,7 +12,7 @@ module Private = struct
module Effects = struct
type 'a enqueue = 'a Suspend.enqueue
type _ Effect.t +=
type _ Effect.t +=
| Suspend = Suspend.Suspend
| Fork = Fiber.Fork
| Get_context = Cancel.Get_context

View File

@ -395,6 +395,30 @@ module Cancel : sig
(** Show the cancellation sub-tree rooted at [t], for debugging. *)
end
(** @canonical Eio.Context *)
module Context : sig
type 'a key
(** ['a key] is a key for a value of type ['a] stored within the fiber's context. *)
(** [create_key ()] creates a new context key. *)
val create_key : unit -> 'a key
val get : 'a key -> 'a option
(** [get key] reads [key] from the current context, returning its value or {!None} if it has not
been defined. *)
val with_value : 'a key -> 'a -> (unit -> 'b) -> 'b
(** [with_value key value fn] runs [fn] with the given key set to the assigned value. *)
type context
val get_context : unit -> context
(** Get the current context for this fiber. *)
val with_context : context -> (unit -> 'a) -> 'a
(** [with_context fn] runs [fn] with the given context available. *)
end
(** @canonical Eio.Private *)
module Private : sig
module Ctf = Ctf
@ -484,6 +508,8 @@ module Private : sig
val get_error : t -> exn option
(** [get_error t] is [Cancel.get_error (cancellation_context t)] *)
val context_store : t -> Context.context
end
module Effects : sig

View File

@ -13,6 +13,8 @@ let run (t : #t) fn =
let cancelled, set_cancelled = Promise.create () in
Private.Fiber_context.set_cancel_fn ctx (Promise.resolve set_cancelled);
(* If the spawning fiber is cancelled, [cancelled] gets set to the exception. *)
let store = Private.Fiber_context.context_store ctx in
match
t#run @@ fun () ->
Fiber.first
@ -21,7 +23,7 @@ let run (t : #t) fn =
| Cancel.Cancelled ex -> raise ex (* To avoid [Cancelled (Cancelled ex))] *)
| ex -> raise ex (* Shouldn't happen *)
)
fn
(fun () -> Context.with_context store fn)
with
| x ->
ignore (Private.Fiber_context.clear_cancel_fn ctx : bool);

View File

@ -40,6 +40,9 @@ module Stream = Stream
(** Cancelling fibers. *)
module Cancel = Eio__core.Cancel
(** Fiber-local storage. *)
module Context = Eio__core.Context
(** Commonly used standard features. This module is intended to be [open]ed. *)
module Std : sig
module Promise = Promise

View File

@ -189,3 +189,76 @@ Can't fork into another domain:
);;
Exception: Invalid_argument "Switch accessed from wrong domain!".
```
# Fiber-local storage
Creating a context key:
```ocaml
# let key : int Eio.Context.key = Eio.Context.create_key ();;
val key : int Eio.Context.key = <abstr>
# let trace_key () =
let value = Eio.Context.get key in
traceln "Key => %a" Fmt.(option ~none:(const string "<unset>") int) value;;
val trace_key : unit -> unit = <fun>
```
Keys default to being unset
```ocaml
# run @@ fun _ ->
trace_key ();;
+Key => <unset>
- : unit = ()
```
`with_value` can be used to define a key.
```ocaml
# run @@ fun _ ->
Eio.Context.with_value key 123 @@ fun () -> trace_key ();;
+Key => 123
- : unit = ()
```
`with_value` will shadow variables defined in outer scopes.
```ocaml
# run @@ fun _ ->
Eio.Context.with_value key 123 @@ fun () ->
trace_key ();
Eio.Context.with_value key 456 (fun () -> trace_key ());
trace_key ();;
+Key => 123
+Key => 456
+Key => 123
- : unit = ()
```
Values are propagated when forking, or sending fibers to other domains.
```ocaml
# run @@ fun _ ->
Eio.Context.with_value key 123 @@ fun () ->
Switch.run @@ fun sw ->
Fiber.fork ~sw trace_key;;
+Key => 123
- : unit = ()
# run @@ fun mgr->
Eio.Context.with_value key 123 @@ fun () ->
Eio.Domain_manager.run mgr @@ fun () ->
trace_key ();;
+Key => 123
- : unit = ()
```
Values are inherited from the currently running fiber, rather than the switch.
```ocaml
# run @@ fun _ ->
Switch.run @@ fun sw ->
Eio.Context.with_value key 123 @@ fun () ->
Fiber.fork ~sw trace_key;;
+Key => 123
- : unit = ()
```