Compare commits

..

2 Commits

Author SHA1 Message Date
Thomas Leonard
387fb6d2b9
Merge pull request #610 from talex5/domains-variants
Use variant types for domain manager
2023-08-28 17:43:56 +01:00
Thomas Leonard
c8abd7ca78 Use variant types for domain manager
Also, move the mock domain manager to eio.mock so it can be used more
widely, and add `Eio.Debug.with_trace_prefix` (which it needs).
2023-08-28 08:57:43 +01:00
19 changed files with 190 additions and 97 deletions

View File

@ -1,4 +1,5 @@
#require "eio_main";; #require "eio_main";;
#require "eio.mock";;
module Eio_main = struct module Eio_main = struct
open Eio.Std open Eio.Std
@ -28,18 +29,9 @@ module Eio_main = struct
let handler = Eio.Time.Pi.clock (module Fake_clock) in let handler = Eio.Time.Pi.clock (module Fake_clock) in
fun real_clock -> Eio.Resource.T (Fake_clock.make real_clock, handler) fun real_clock -> Eio.Resource.T (Fake_clock.make real_clock, handler)
(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = object (_ : #Eio.Domain_manager.t)
method run fn =
(* Since we're in the same domain, cancelling the calling fiber will
cancel the fake spawned one automatically. *)
let cancelled, _ = Promise.create () in
fn ~cancelled
method run_raw fn = fn ()
end
let run fn = let run fn =
(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = Eio_mock.Domain_manager.create () in
Eio_main.run @@ fun env -> Eio_main.run @@ fun env ->
fn @@ object fn @@ object
method net = env#net method net = env#net

View File

@ -24,13 +24,21 @@ let default_traceln ?__POS__:pos fmt =
in in
Format.kdprintf k ("@[" ^^ fmt) Format.kdprintf k ("@[" ^^ fmt)
let traceln ?__POS__ fmt = let get () =
let traceln = match Fiber.get traceln_key with
match Fiber.get traceln_key with | Some traceln -> traceln
| Some { traceln } -> traceln | None
| None | exception (Effect.Unhandled _) -> { traceln = default_traceln }
| exception (Effect.Unhandled _) -> default_traceln
let with_trace_prefix prefix fn =
let { traceln } = get () in
let traceln ?__POS__ fmt =
traceln ?__POS__ ("%t" ^^ fmt) prefix
in in
Fiber.with_binding traceln_key { traceln } fn
let traceln ?__POS__ fmt =
let { traceln } = get () in
traceln ?__POS__ fmt traceln ?__POS__ fmt
type t = < type t = <

View File

@ -715,6 +715,9 @@ module Private : sig
('a, Format.formatter, unit, unit) format4 -> 'a ('a, Format.formatter, unit, unit) format4 -> 'a
(** Writes trace logging using the current fiber's configured traceln function. *) (** Writes trace logging using the current fiber's configured traceln function. *)
val with_trace_prefix : (Format.formatter -> unit) -> (unit -> 'a) -> 'a
(** [with_trace_prefix fmt fn] runs [fn ()] with a traceln that outputs [fmt] before each message. *)
val traceln_mutex : Stdlib.Mutex.t val traceln_mutex : Stdlib.Mutex.t
(** The mutex used to prevent two domains writing to stderr at once. (** The mutex used to prevent two domains writing to stderr at once.

View File

@ -1,12 +1,29 @@
class virtual t = object open Std
method virtual run : 'a. (cancelled:exn Promise.t -> 'a) -> 'a
method virtual run_raw : 'a. (unit -> 'a) -> 'a type ty = [`Domain_mgr]
type 'a t = ([> ty] as 'a) r
module Pi = struct
module type MGR = sig
type t
val run : t -> (cancelled:exn Promise.t -> 'a) -> 'a
val run_raw : t -> (unit -> 'a) -> 'a
end
type (_, _, _) Resource.pi +=
| Mgr : ('t, (module MGR with type t = 't), [> ty]) Resource.pi
let mgr (type t) (module X : MGR with type t = t) =
Resource.handler [H (Mgr, (module X))]
end end
let run_raw (t : #t) = t#run_raw let run_raw (Resource.T (t, ops)) fn =
let module X = (val (Resource.get ops Pi.Mgr)) in
X.run_raw t fn
let run (t : #t) fn = let run (Resource.T (t, ops)) fn =
t#run @@ fun ~cancelled -> let module X = (val (Resource.get ops Pi.Mgr)) in
X.run t @@ fun ~cancelled ->
(* If the spawning fiber is cancelled, [cancelled] gets set to the exception. *) (* If the spawning fiber is cancelled, [cancelled] gets set to the exception. *)
try try
Fiber.first Fiber.first

View File

@ -0,0 +1,37 @@
type ty = [`Domain_mgr]
type 'a t = ([> ty] as 'a) Resource.t
val run : _ t -> (unit -> 'a) -> 'a
(** [run t f] runs [f ()] in a newly-created domain and returns the result.
Other fibers in the calling domain can run in parallel with the new domain.
Warning: [f] must only access thread-safe values from the calling domain,
but this is not enforced by the type system.
If the calling fiber is cancelled, this is propagated to the spawned domain. *)
val run_raw : _ t -> (unit -> 'a) -> 'a
(** [run_raw t f] is like {!run}, but does not run an event loop in the new domain,
and so cannot perform IO, fork fibers, etc. *)
(** {2 Provider Interface} *)
module Pi : sig
module type MGR = sig
type t
val run : t -> (cancelled:exn Promise.t -> 'a) -> 'a
(** [run t fn] runs [fn ~cancelled] in a new domain.
If the calling fiber is cancelled, [cancelled] becomes resolved to the {!Cancel.Cancelled} exception.
[fn] should cancel itself in this case. *)
val run_raw : t -> (unit -> 'a) -> 'a
end
type (_, _, _) Resource.pi +=
| Mgr : ('t, (module MGR with type t = 't), [> ty]) Resource.pi
val mgr : (module MGR with type t = 't) -> ('t, ty) Resource.handler
end

View File

@ -29,7 +29,7 @@ module Stdenv = struct
let stderr (t : <stderr : _ Flow.sink; ..>) = t#stderr let stderr (t : <stderr : _ Flow.sink; ..>) = t#stderr
let net (t : <net : _ Net.t; ..>) = t#net let net (t : <net : _ Net.t; ..>) = t#net
let process_mgr (t : <process_mgr : _ Process.mgr; ..>) = t#process_mgr let process_mgr (t : <process_mgr : _ Process.mgr; ..>) = t#process_mgr
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr let domain_mgr (t : <domain_mgr : _ Domain_manager.t; ..>) = t#domain_mgr
let clock (t : <clock : _ Time.clock; ..>) = t#clock let clock (t : <clock : _ Time.clock; ..>) = t#clock
let mono_clock (t : <mono_clock : _ Time.Mono.t; ..>) = t#mono_clock let mono_clock (t : <mono_clock : _ Time.Mono.t; ..>) = t#mono_clock
let secure_random (t: <secure_random : _ Flow.source; ..>) = t#secure_random let secure_random (t: <secure_random : _ Flow.source; ..>) = t#secure_random

View File

@ -75,31 +75,7 @@ module Net = Net
module Process = Process module Process = Process
(** Parallel computation across multiple CPU cores. *) (** Parallel computation across multiple CPU cores. *)
module Domain_manager : sig module Domain_manager = Domain_manager
class virtual t : object
method virtual run_raw : 'a. (unit -> 'a) -> 'a
method virtual run : 'a. (cancelled:exn Promise.t -> 'a) -> 'a
(** [t#run fn] runs [fn ~cancelled] in a new domain.
If the calling fiber is cancelled, [cancelled] becomes resolved to the {!Cancel.Cancelled} exception.
[fn] should cancel itself in this case. *)
end
val run : #t -> (unit -> 'a) -> 'a
(** [run t f] runs [f ()] in a newly-created domain and returns the result.
Other fibers in the calling domain can run in parallel with the new domain.
Warning: [f] must only access thread-safe values from the calling domain,
but this is not enforced by the type system.
If the calling fiber is cancelled, this is propagated to the spawned domain. *)
val run_raw : #t -> (unit -> 'a) -> 'a
(** [run_raw t f] is like {!run}, but does not run an event loop in the new domain,
and so cannot perform IO, fork fibers, etc. *)
end
(** Clocks, time, sleeping and timeouts. *) (** Clocks, time, sleeping and timeouts. *)
module Time = Time module Time = Time
@ -144,6 +120,9 @@ module Debug : sig
If the system is not ready to receive the trace output, If the system is not ready to receive the trace output,
the whole domain must block until it is. *) the whole domain must block until it is. *)
val with_trace_prefix : (Format.formatter -> unit) -> (unit -> 'a) -> 'a
(** [with_trace_prefix fmt fn] runs [fn ()] with a traceln that outputs [fmt] before each message. *)
type t = < type t = <
traceln : traceln Fiber.key; traceln : traceln Fiber.key;
> >
@ -211,7 +190,7 @@ module Stdenv : sig
To use this, see {!Domain_manager}. To use this, see {!Domain_manager}.
*) *)
val domain_mgr : <domain_mgr : #Domain_manager.t as 'a; ..> -> 'a val domain_mgr : <domain_mgr : _ Domain_manager.t as 'a; ..> -> 'a
(** [domain_mgr t] allows running code on other cores. *) (** [domain_mgr t] allows running code on other cores. *)
(** {1 Time} (** {1 Time}

View File

@ -0,0 +1,38 @@
open Eio.Std
let id = Fiber.create_key ()
let with_domain_tracing fn =
Eio.Debug.with_trace_prefix (fun f ->
Fiber.get id |> Option.iter (fun id -> Fmt.pf f "[%s] " id)
) fn
module Fake_domain_mgr = struct
type t = {
mutable next_domain_id : int;
}
let create () = { next_domain_id = 1 }
let run t fn =
let self = t.next_domain_id in
t.next_domain_id <- t.next_domain_id + 1;
let cancelled, _ = Promise.create () in
Fiber.with_binding id (string_of_int self)
(fun () -> fn ~cancelled)
let run_raw t fn =
let self = t.next_domain_id in
t.next_domain_id <- t.next_domain_id + 1;
Fiber.with_binding id (string_of_int self) fn
end
let create =
let handler = Eio.Domain_manager.Pi.mgr (module Fake_domain_mgr) in
fun () -> Eio.Resource.T (Fake_domain_mgr.create (), handler)
let run fn =
let dm = create () in
with_domain_tracing @@ fun () ->
Fiber.with_binding id "0" @@ fun () ->
fn dm

View File

@ -0,0 +1,23 @@
open Eio.Std
val create : unit -> Eio.Domain_manager.ty r
(** [create ()] is a mock domain manager.
When asked to run a new Eio domain, it just runs it in the parent domain.
It runs the function in a context where {!id} is a fresh domain ID
(assigned sequentially starting from 1). *)
val run : (Eio.Domain_manager.ty r -> 'a) -> 'a
(** [run fn] runs [fn dm], where [dm] is a new fake domain manager.
It also runs {!with_domain_tracing} to display domain IDs in trace output.
[fn] itself runs with {!id} set to "0". *)
val id : string Fiber.key
(** [id] is used to get or set the current fake domain's ID.
This is used in traceln output. *)
val with_domain_tracing : (unit -> 'a) -> 'a
(** [with_domain_tracing fn] runs [fn ()] with a modified [traceln] function that
prefixes the current {!id} (if any) to each trace message. *)

View File

@ -3,6 +3,7 @@ module Handler = Handler
module Flow = Flow module Flow = Flow
module Net = Net module Net = Net
module Clock = Clock module Clock = Clock
module Domain_manager = Domain_manager
module Backend = Backend module Backend = Backend
type Eio.Exn.Backend.t += Simulated_failure type Eio.Exn.Backend.t += Simulated_failure

View File

@ -145,6 +145,9 @@ end
(** A mock {!Eio.Time} clock for testing timeouts. *) (** A mock {!Eio.Time} clock for testing timeouts. *)
module Clock = Clock module Clock = Clock
(** A mock {!Eio.Domain_manager} that runs everything in a single domain. *)
module Domain_manager = Domain_manager
(** {2 Backend for mocks} (** {2 Backend for mocks}
The mocks can be used with any backend, but if you don't need any IO then you can use this one The mocks can be used with any backend, but if you don't need any IO then you can use this one

View File

@ -209,7 +209,7 @@ val accept_fork :
val run_server : val run_server :
?max_connections:int -> ?max_connections:int ->
?additional_domains:(#Domain_manager.t * int) -> ?additional_domains:(_ Domain_manager.t * int) ->
?stop:'a Promise.t -> ?stop:'a Promise.t ->
on_error:(exn -> unit) -> on_error:(exn -> unit) ->
[> 'tag listening_socket_ty ] r -> [> 'tag listening_socket_ty ] r ->

View File

@ -38,7 +38,7 @@ module Stdenv = struct
stdout : sink_ty r; stdout : sink_ty r;
stderr : sink_ty r; stderr : sink_ty r;
net : [`Unix | `Generic] Eio.Net.ty r; net : [`Unix | `Generic] Eio.Net.ty r;
domain_mgr : Eio.Domain_manager.t; domain_mgr : Eio.Domain_manager.ty r;
process_mgr : Process.mgr_ty r; process_mgr : Process.mgr_ty r;
clock : float Eio.Time.clock_ty r; clock : float Eio.Time.clock_ty r;
mono_clock : Eio.Time.Mono.ty r; mono_clock : Eio.Time.Mono.ty r;

View File

@ -69,7 +69,7 @@ module Stdenv : sig
stdout : sink_ty r; stdout : sink_ty r;
stderr : sink_ty r; stderr : sink_ty r;
net : [`Unix | `Generic] Eio.Net.ty r; net : [`Unix | `Generic] Eio.Net.ty r;
domain_mgr : Eio.Domain_manager.t; domain_mgr : Eio.Domain_manager.ty r;
process_mgr : Process.mgr_ty r; process_mgr : Process.mgr_ty r;
clock : float Eio.Time.clock_ty r; clock : float Eio.Time.clock_ty r;
mono_clock : Eio.Time.Mono.ty r; mono_clock : Eio.Time.Mono.ty r;

View File

@ -386,19 +386,23 @@ let unwrap_backtrace = function
| Ok x -> x | Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt | Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
let domain_mgr ~run_event_loop = object module Domain_mgr = struct
inherit Eio.Domain_manager.t type t = {
run_event_loop : (unit -> unit) -> unit -> unit;
}
method run_raw fn = let make ~run_event_loop = { run_event_loop }
let run_raw _t fn =
let domain = ref None in let domain = ref None in
Sched.enter (fun t k -> Sched.enter (fun t k ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> Sched.enqueue_thread t k ()))) domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> Sched.enqueue_thread t k ())))
); );
unwrap_backtrace (Domain.join (Option.get !domain)) unwrap_backtrace (Domain.join (Option.get !domain))
method run fn = let run t fn =
let domain = ref None in let domain = ref None in
Sched.enter (fun t k -> Sched.enter (fun sched k ->
let cancelled, set_cancelled = Promise.create () in let cancelled, set_cancelled = Promise.create () in
Fiber_context.set_cancel_fn k.fiber (Promise.resolve set_cancelled); Fiber_context.set_cancel_fn k.fiber (Promise.resolve set_cancelled);
domain := Some (Domain.spawn (fun () -> domain := Some (Domain.spawn (fun () ->
@ -406,14 +410,18 @@ let domain_mgr ~run_event_loop = object
(fun () -> (fun () ->
let result = ref None in let result = ref None in
let fn = wrap_backtrace (fun () -> fn ~cancelled) in let fn = wrap_backtrace (fun () -> fn ~cancelled) in
run_event_loop (fun () -> result := Some (fn ())) (); t.run_event_loop (fun () -> result := Some (fn ())) ();
Option.get !result Option.get !result
) )
~finally:(fun () -> Sched.enqueue_thread t k ()))) ~finally:(fun () -> Sched.enqueue_thread sched k ())))
); );
unwrap_backtrace (Domain.join (Option.get !domain)) unwrap_backtrace (Domain.join (Option.get !domain))
end end
let domain_mgr ~run_event_loop =
let handler = Eio.Domain_manager.Pi.mgr (module Domain_mgr) in
Eio.Resource.T (Domain_mgr.make ~run_event_loop, handler)
module Mono_clock = struct module Mono_clock = struct
type t = unit type t = unit
type time = Mtime.t type time = Mtime.t

View File

@ -87,17 +87,17 @@ let unwrap_backtrace = function
| Ok x -> x | Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt | Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
let v = object module Impl = struct
inherit Eio.Domain_manager.t type t = unit
method run_raw fn = let run_raw () fn =
let domain = ref None in let domain = ref None in
Eio.Private.Suspend.enter (fun _ctx enqueue -> Eio.Private.Suspend.enter (fun _ctx enqueue ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ())))) domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
); );
unwrap_backtrace (Domain.join (Option.get !domain)) unwrap_backtrace (Domain.join (Option.get !domain))
method run fn = let run () fn =
let domain = ref None in let domain = ref None in
Eio.Private.Suspend.enter (fun ctx enqueue -> Eio.Private.Suspend.enter (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in let cancelled, set_cancelled = Promise.create () in
@ -108,3 +108,7 @@ let v = object
); );
unwrap_backtrace (Domain.join (Option.get !domain)) unwrap_backtrace (Domain.join (Option.get !domain))
end end
let v =
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
Eio.Resource.T ((), handler)

View File

@ -88,17 +88,17 @@ let unwrap_backtrace = function
| Ok x -> x | Ok x -> x
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt | Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
let v = object module Impl = struct
inherit Eio.Domain_manager.t type t = unit
method run_raw fn = let run_raw () fn =
let domain = ref None in let domain = ref None in
Eio.Private.Suspend.enter (fun _ctx enqueue -> Eio.Private.Suspend.enter (fun _ctx enqueue ->
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ())))) domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
); );
unwrap_backtrace (Domain.join (Option.get !domain)) unwrap_backtrace (Domain.join (Option.get !domain))
method run fn = let run () fn =
let domain = ref None in let domain = ref None in
Eio.Private.Suspend.enter (fun ctx enqueue -> Eio.Private.Suspend.enter (fun ctx enqueue ->
let cancelled, set_cancelled = Promise.create () in let cancelled, set_cancelled = Promise.create () in
@ -109,3 +109,7 @@ let v = object
); );
unwrap_backtrace (Domain.join (Option.get !domain)) unwrap_backtrace (Domain.join (Option.get !domain))
end end
let v =
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
Eio.Resource.T ((), handler)

View File

@ -7,7 +7,7 @@
```ocaml ```ocaml
open Eio.Std open Eio.Std
let run (fn : Eio.Domain_manager.t -> unit) = let run (fn : Eio.Domain_manager.ty r -> unit) =
Eio_main.run @@ fun env -> Eio_main.run @@ fun env ->
fn (Eio.Stdenv.domain_mgr env) fn (Eio.Stdenv.domain_mgr env)
``` ```

View File

@ -860,42 +860,18 @@ Non-graceful shutdown, closing all connections still in progress:
Exception: Failure "Simulated error". Exception: Failure "Simulated error".
``` ```
To test support for multiple domains, we just run everything in one domain
to keep the output deterministic. We override `traceln` to log the (fake)
domain ID too:
```ocaml
let with_domain_tracing id fn =
let traceln ?__POS__ fmt =
Eio.Private.Debug.default_traceln ?__POS__ ("[%d] " ^^ fmt) id
in
Fiber.with_binding Eio.Private.Debug.v#traceln { traceln } fn
let fake_domain_mgr () = object (_ : #Eio.Domain_manager.t)
val mutable next_domain_id = 1
method run fn =
let self = next_domain_id in
next_domain_id <- next_domain_id + 1;
let cancelled, _ = Promise.create () in
with_domain_tracing self (fun () -> fn ~cancelled)
method run_raw _ = assert false
end
```
Handling the connections with 3 domains, with a graceful shutdown: Handling the connections with 3 domains, with a graceful shutdown:
```ocaml ```ocaml
# Eio_mock.Backend.run @@ fun () -> # Eio_mock.Backend.run @@ fun () ->
with_domain_tracing 0 @@ fun () -> Eio_mock.Domain_manager.run @@ fun fake_domain_mgr ->
let n_domains = 3 in let n_domains = 3 in
let listening_socket = mock_listener ~n_clients:10 ~n_domains in let listening_socket = mock_listener ~n_clients:10 ~n_domains in
let stop, set_stop = Promise.create () in let stop, set_stop = Promise.create () in
Fiber.both Fiber.both
(fun () -> (fun () ->
Eio.Net.run_server listening_socket handle_connection Eio.Net.run_server listening_socket handle_connection
~additional_domains:(fake_domain_mgr (), n_domains - 1) ~additional_domains:(fake_domain_mgr, n_domains - 1)
~max_connections:10 ~max_connections:10
~on_error:raise ~on_error:raise
~stop ~stop
@ -938,13 +914,13 @@ Handling the connections with 3 domains, aborting immediately:
```ocaml ```ocaml
# Eio_mock.Backend.run @@ fun () -> # Eio_mock.Backend.run @@ fun () ->
with_domain_tracing 0 @@ fun () -> Eio_mock.Domain_manager.run @@ fun fake_domain_mgr ->
let n_domains = 3 in let n_domains = 3 in
let listening_socket = mock_listener ~n_clients:10 ~n_domains in let listening_socket = mock_listener ~n_clients:10 ~n_domains in
Fiber.both Fiber.both
(fun () -> (fun () ->
Eio.Net.run_server listening_socket handle_connection Eio.Net.run_server listening_socket handle_connection
~additional_domains:(fake_domain_mgr (), n_domains - 1) ~additional_domains:(fake_domain_mgr, n_domains - 1)
~max_connections:10 ~max_connections:10
~on_error:raise ~on_error:raise
) )