Compare commits

...

2 Commits

Author SHA1 Message Date
Thomas Leonard
387fb6d2b9
Merge pull request #610 from talex5/domains-variants
Use variant types for domain manager
2023-08-28 17:43:56 +01:00
Thomas Leonard
c8abd7ca78 Use variant types for domain manager
Also, move the mock domain manager to eio.mock so it can be used more
widely, and add `Eio.Debug.with_trace_prefix` (which it needs).
2023-08-28 08:57:43 +01:00
19 changed files with 190 additions and 97 deletions

View File

@ -1,4 +1,5 @@
#require "eio_main";;
#require "eio.mock";;
module Eio_main = struct
open Eio.Std
@ -28,18 +29,9 @@ module Eio_main = struct
let handler = Eio.Time.Pi.clock (module Fake_clock) in
fun real_clock -> Eio.Resource.T (Fake_clock.make real_clock, handler)
(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = object (_ : #Eio.Domain_manager.t)
method run fn =
(* Since we're in the same domain, cancelling the calling fiber will
cancel the fake spawned one automatically. *)
let cancelled, _ = Promise.create () in
fn ~cancelled
method run_raw fn = fn ()
end
let run fn =
(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = Eio_mock.Domain_manager.create () in
Eio_main.run @@ fun env ->
fn @@ object
method net = env#net

View File

@ -24,13 +24,21 @@ let default_traceln ?__POS__:pos fmt =
in
Format.kdprintf k ("@[" ^^ fmt)
let traceln ?__POS__ fmt =
let traceln =
match Fiber.get traceln_key with
| Some { traceln } -> traceln
| None
| exception (Effect.Unhandled _) -> default_traceln
let get () =
match Fiber.get traceln_key with
| Some traceln -> traceln
| None
| exception (Effect.Unhandled _) -> { traceln = default_traceln }
let with_trace_prefix prefix fn =
let { traceln } = get () in
let traceln ?__POS__ fmt =
traceln ?__POS__ ("%t" ^^ fmt) prefix
in
Fiber.with_binding traceln_key { traceln } fn
let traceln ?__POS__ fmt =
let { traceln } = get () in
traceln ?__POS__ fmt
type t = <

View File

@ -715,6 +715,9 @@ module Private : sig
('a, Format.formatter, unit, unit) format4 -> 'a
(** Writes trace logging using the current fiber's configured traceln function. *)
val with_trace_prefix : (Format.formatter -> unit) -> (unit -> 'a) -> 'a
(** [with_trace_prefix fmt fn] runs [fn ()] with a traceln that outputs [fmt] before each message. *)
val traceln_mutex : Stdlib.Mutex.t
(** The mutex used to prevent two domains writing to stderr at once.

View File

@ -1,12 +1,29 @@
class virtual t = object
method virtual run : 'a. (cancelled:exn Promise.t -> 'a) -> 'a
method virtual run_raw : 'a. (unit -> 'a) -> 'a
open Std
type ty = [`Domain_mgr]
type 'a t = ([> ty] as 'a) r
module Pi = struct
module type MGR = sig
type t
val run : t -> (cancelled:exn Promise.t -> 'a) -> 'a
val run_raw : t -> (unit -> 'a) -> 'a
end
type (_, _, _) Resource.pi +=
| Mgr : ('t, (module MGR with type t = 't), [> ty]) Resource.pi
let mgr (type t) (module X : MGR with type t = t) =
Resource.handler [H (Mgr, (module X))]
end
let run_raw (t : #t) = t#run_raw
let run_raw (Resource.T (t, ops)) fn =
let module X = (val (Resource.get ops Pi.Mgr)) in
X.run_raw t fn
let run (t : #t) fn =
t#run @@ fun ~cancelled ->
let run (Resource.T (t, ops)) fn =
let module X = (val (Resource.get ops Pi.Mgr)) in
X.run t @@ fun ~cancelled ->
(* If the spawning fiber is cancelled, [cancelled] gets set to the exception. *)
try
Fiber.first

View File

@ -0,0 +1,37 @@
type ty = [`Domain_mgr]
type 'a t = ([> ty] as 'a) Resource.t
val run : _ t -> (unit -> 'a) -> 'a
(** [run t f] runs [f ()] in a newly-created domain and returns the result.
Other fibers in the calling domain can run in parallel with the new domain.
Warning: [f] must only access thread-safe values from the calling domain,
but this is not enforced by the type system.
If the calling fiber is cancelled, this is propagated to the spawned domain. *)
val run_raw : _ t -> (unit -> 'a) -> 'a
(** [run_raw t f] is like {!run}, but does not run an event loop in the new domain,
and so cannot perform IO, fork fibers, etc. *)
(** {2 Provider Interface} *)
module Pi : sig
module type MGR = sig
type t
val run : t -> (cancelled:exn Promise.t -> 'a) -> 'a
(** [run t fn] runs [fn ~cancelled] in a new domain.
If the calling fiber is cancelled, [cancelled] becomes resolved to the {!Cancel.Cancelled} exception.
[fn] should cancel itself in this case. *)
val run_raw : t -> (unit -> 'a) -> 'a
end
type (_, _, _) Resource.pi +=
| Mgr : ('t, (module MGR with type t = 't), [> ty]) Resource.pi
val mgr : (module MGR with type t = 't) -> ('t, ty) Resource.handler
end

View File

@ -29,7 +29,7 @@ module Stdenv = struct
let stderr (t : <stderr : _ Flow.sink; ..>) = t#stderr
let net (t : <net : _ Net.t; ..>) = t#net
let process_mgr (t : <process_mgr : _ Process.mgr; ..>) = t#process_mgr
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
let domain_mgr (t : <domain_mgr : _ Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : _ Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : _ Time.Mono.t; ..>) = t#mono_clock
let secure_random (t: <secure_random : _ Flow.source; ..>) = t#secure_random

View File

@ -75,31 +75,7 @@ module Net = Net
module Process = Process
(** Parallel computation across multiple CPU cores. *)
module Domain_manager : sig
class virtual t : object
method virtual run_raw : 'a. (unit -> 'a) -> 'a
method virtual run : 'a. (cancelled:exn Promise.t -> 'a) -> 'a
(** [t#run fn] runs [fn ~cancelled] in a new domain.
If the calling fiber is cancelled, [cancelled] becomes resolved to the {!Cancel.Cancelled} exception.
[fn] should cancel itself in this case. *)
end
val run : #t -> (unit -> 'a) -> 'a
(** [run t f] runs [f ()] in a newly-created domain and returns the result.
Other fibers in the calling domain can run in parallel with the new domain.
Warning: [f] must only access thread-safe values from the calling domain,
but this is not enforced by the type system.
If the calling fiber is cancelled, this is propagated to the spawned domain. *)
val run_raw : #t -> (unit -> 'a) -> 'a
(** [run_raw t f] is like {!run}, but does not run an event loop in the new domain,
and so cannot perform IO, fork fibers, etc. *)
end
module Domain_manager = Domain_manager
(** Clocks, time, sleeping and timeouts. *)
module Time = Time
@ -144,6 +120,9 @@ module Debug : sig
If the system is not ready to receive the trace output,
the whole domain must block until it is. *)
val with_trace_prefix : (Format.formatter -> unit) -> (unit -> 'a) -> 'a
(** [with_trace_prefix fmt fn] runs [fn ()] with a traceln that outputs [fmt] before each message. *)
type t = <
traceln : traceln Fiber.key;
>
@ -211,7 +190,7 @@ module Stdenv : sig
To use this, see {!Domain_manager}.
*)
val domain_mgr : <domain_mgr : #Domain_manager.t as 'a; ..> -> 'a
val domain_mgr : <domain_mgr : _ Domain_manager.t as 'a; ..> -> 'a
(** [domain_mgr t] allows running code on other cores. *)
(** {1 Time}

View File

@ -0,0 +1,38 @@
open Eio.Std
let id = Fiber.create_key ()
let with_domain_tracing fn =
Eio.Debug.with_trace_prefix (fun f ->
Fiber.get id |> Option.iter (fun id -> Fmt.pf f "[%s] " id)
) fn
module Fake_domain_mgr = struct
type t = {
mutable next_domain_id : int;
}
let create () = { next_domain_id = 1 }
let run t fn =
let self = t.next_domain_id in
t.next_domain_id <- t.next_domain_id + 1;
let cancelled, _ = Promise.create () in
Fiber.with_binding id (string_of_int self)
(fun () -> fn ~cancelled)
let run_raw t fn =
let self = t.next_domain_id in
t.next_domain_id <- t.next_domain_id + 1;
Fiber.with_binding id (string_of_int self) fn
end
let create =
let handler = Eio.Domain_manager.Pi.mgr (module Fake_domain_mgr) in
fun () -> Eio.Resource.T (Fake_domain_mgr.create (), handler)
let run fn =
let dm = create () in
with_domain_tracing @@ fun () ->
Fiber.with_binding id "0" @@ fun () ->
fn dm

View File

@ -0,0 +1,23 @@
open Eio.Std
val create : unit -> Eio.Domain_manager.ty r
(** [create ()] is a mock domain manager.
When asked to run a new Eio domain, it just runs it in the parent domain.
It runs the function in a context where {!id} is a fresh domain ID
(assigned sequentially starting from 1). *)
val run : (Eio.Domain_manager.ty r -> 'a) -> 'a
(** [run fn] runs [fn dm], where [dm] is a new fake domain manager.
It also runs {!with_domain_tracing} to display domain IDs in trace output.
[fn] itself runs with {!id} set to "0". *)
val id : string Fiber.key
(** [id] is used to get or set the current fake domain's ID.
This is used in traceln output. *)
val with_domain_tracing : (unit -> 'a) -> 'a
(** [with_domain_tracing fn] runs [fn ()] with a modified [traceln] function that
prefixes the current {!id} (if any) to each trace message. *)

View File

@ -3,6 +3,7 @@ module Handler = Handler
module Flow = Flow
module Net = Net
module Clock = Clock
module Domain_manager = Domain_manager
module Backend = Backend
type Eio.Exn.Backend.t += Simulated_failure

View File

@ -145,6 +145,9 @@ end
(** A mock {!Eio.Time} clock for testing timeouts. *)
module Clock = Clock
(** A mock {!Eio.Domain_manager} that runs everything in a single domain. *)
module Domain_manager = Domain_manager
(** {2 Backend for mocks}
The mocks can be used with any backend, but if you don't need any IO then you can use this one

View File

@ -209,7 +209,7 @@ val accept_fork :
val run_server :
?max_connections:int ->
?additional_domains:(#Domain_manager.t * int) ->
?additional_domains:(_ Domain_manager.t * int) ->
?stop:'a Promise.t ->
on_error:(exn -> unit) ->
[> 'tag listening_socket_ty ] r ->

View File

@ -38,7 +38,7 @@ module Stdenv = struct
stdout : sink_ty r;
stderr : sink_ty r;
net : [`Unix | `Generic] Eio.Net.ty r;
domain_mgr : Eio.Domain_manager.t;
domain_mgr : Eio.Domain_manager.ty r;
process_mgr : Process.mgr_ty r;
clock : float Eio.Time.clock_ty r;
mono_clock : Eio.Time.Mono.ty r;

View File

@ -69,7 +69,7 @@ module Stdenv : sig
stdout : sink_ty r;
stderr : sink_ty r;
net : [`Unix | `Generic] Eio.Net.ty r;
domain_mgr : Eio.Domain_manager.t;
domain_mgr : Eio.Domain_manager.ty r;
process_mgr : Process.mgr_ty r;
clock : float Eio.Time.clock_ty r;
mono_clock : Eio.Time.Mono.ty r;

View File

@ -386,19 +386,23 @@ let unwrap_backtrace = function
| Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
let domain_mgr ~run_event_loop = object
inherit Eio.Domain_manager.t
module Domain_mgr = struct
type t = {
run_event_loop : (unit -> unit) -> unit -> unit;
}
method run_raw fn =
let make ~run_event_loop = { run_event_loop }
let run_raw _t fn =
let domain = ref None in
Sched.enter (fun t k ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> Sched.enqueue_thread t k ())))
);
unwrap_backtrace (Domain.join (Option.get !domain))
method run fn =
let run t fn =
let domain = ref None in
Sched.enter (fun t k ->
Sched.enter (fun sched k ->
let cancelled, set_cancelled = Promise.create () in
Fiber_context.set_cancel_fn k.fiber (Promise.resolve set_cancelled);
domain := Some (Domain.spawn (fun () ->
@ -406,14 +410,18 @@ let domain_mgr ~run_event_loop = object
(fun () ->
let result = ref None in
let fn = wrap_backtrace (fun () -> fn ~cancelled) in
run_event_loop (fun () -> result := Some (fn ())) ();
t.run_event_loop (fun () -> result := Some (fn ())) ();
Option.get !result
)
~finally:(fun () -> Sched.enqueue_thread t k ())))
~finally:(fun () -> Sched.enqueue_thread sched k ())))
);
unwrap_backtrace (Domain.join (Option.get !domain))
end
let domain_mgr ~run_event_loop =
let handler = Eio.Domain_manager.Pi.mgr (module Domain_mgr) in
Eio.Resource.T (Domain_mgr.make ~run_event_loop, handler)
module Mono_clock = struct
type t = unit
type time = Mtime.t

View File

@ -87,17 +87,17 @@ let unwrap_backtrace = function
| Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
let v = object
inherit Eio.Domain_manager.t
module Impl = struct
type t = unit
method run_raw fn =
let run_raw () fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun _ctx enqueue ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
);
unwrap_backtrace (Domain.join (Option.get !domain))
method run fn =
let run () fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in
@ -108,3 +108,7 @@ let v = object
);
unwrap_backtrace (Domain.join (Option.get !domain))
end
let v =
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
Eio.Resource.T ((), handler)

View File

@ -88,17 +88,17 @@ let unwrap_backtrace = function
| Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
let v = object
inherit Eio.Domain_manager.t
module Impl = struct
type t = unit
method run_raw fn =
let run_raw () fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun _ctx enqueue ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
);
unwrap_backtrace (Domain.join (Option.get !domain))
method run fn =
let run () fn =
let domain = ref None in
Eio.Private.Suspend.enter (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in
@ -109,3 +109,7 @@ let v = object
);
unwrap_backtrace (Domain.join (Option.get !domain))
end
let v =
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
Eio.Resource.T ((), handler)

View File

@ -7,7 +7,7 @@
```ocaml
open Eio.Std
let run (fn : Eio.Domain_manager.t -> unit) =
let run (fn : Eio.Domain_manager.ty r -> unit) =
Eio_main.run @@ fun env ->
fn (Eio.Stdenv.domain_mgr env)
```

View File

@ -860,42 +860,18 @@ Non-graceful shutdown, closing all connections still in progress:
Exception: Failure "Simulated error".
```
To test support for multiple domains, we just run everything in one domain
to keep the output deterministic. We override `traceln` to log the (fake)
domain ID too:
```ocaml
let with_domain_tracing id fn =
let traceln ?__POS__ fmt =
Eio.Private.Debug.default_traceln ?__POS__ ("[%d] " ^^ fmt) id
in
Fiber.with_binding Eio.Private.Debug.v#traceln { traceln } fn
let fake_domain_mgr () = object (_ : #Eio.Domain_manager.t)
val mutable next_domain_id = 1
method run fn =
let self = next_domain_id in
next_domain_id <- next_domain_id + 1;
let cancelled, _ = Promise.create () in
with_domain_tracing self (fun () -> fn ~cancelled)
method run_raw _ = assert false
end
```
Handling the connections with 3 domains, with a graceful shutdown:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
with_domain_tracing 0 @@ fun () ->
Eio_mock.Domain_manager.run @@ fun fake_domain_mgr ->
let n_domains = 3 in
let listening_socket = mock_listener ~n_clients:10 ~n_domains in
let stop, set_stop = Promise.create () in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~additional_domains:(fake_domain_mgr (), n_domains - 1)
~additional_domains:(fake_domain_mgr, n_domains - 1)
~max_connections:10
~on_error:raise
~stop
@ -938,13 +914,13 @@ Handling the connections with 3 domains, aborting immediately:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
with_domain_tracing 0 @@ fun () ->
Eio_mock.Domain_manager.run @@ fun fake_domain_mgr ->
let n_domains = 3 in
let listening_socket = mock_listener ~n_clients:10 ~n_domains in
Fiber.both
(fun () ->
Eio.Net.run_server listening_socket handle_connection
~additional_domains:(fake_domain_mgr (), n_domains - 1)
~additional_domains:(fake_domain_mgr, n_domains - 1)
~max_connections:10
~on_error:raise
)