mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-07-17 00:01:11 -04:00
Compare commits
2 Commits
286a1b743d
...
387fb6d2b9
Author | SHA1 | Date | |
---|---|---|---|
|
387fb6d2b9 | ||
|
c8abd7ca78 |
@ -1,4 +1,5 @@
|
||||
#require "eio_main";;
|
||||
#require "eio.mock";;
|
||||
|
||||
module Eio_main = struct
|
||||
open Eio.Std
|
||||
@ -28,18 +29,9 @@ module Eio_main = struct
|
||||
let handler = Eio.Time.Pi.clock (module Fake_clock) in
|
||||
fun real_clock -> Eio.Resource.T (Fake_clock.make real_clock, handler)
|
||||
|
||||
(* To avoid non-deterministic output, we run the examples a single domain. *)
|
||||
let fake_domain_mgr = object (_ : #Eio.Domain_manager.t)
|
||||
method run fn =
|
||||
(* Since we're in the same domain, cancelling the calling fiber will
|
||||
cancel the fake spawned one automatically. *)
|
||||
let cancelled, _ = Promise.create () in
|
||||
fn ~cancelled
|
||||
|
||||
method run_raw fn = fn ()
|
||||
end
|
||||
|
||||
let run fn =
|
||||
(* To avoid non-deterministic output, we run the examples a single domain. *)
|
||||
let fake_domain_mgr = Eio_mock.Domain_manager.create () in
|
||||
Eio_main.run @@ fun env ->
|
||||
fn @@ object
|
||||
method net = env#net
|
||||
|
@ -24,13 +24,21 @@ let default_traceln ?__POS__:pos fmt =
|
||||
in
|
||||
Format.kdprintf k ("@[" ^^ fmt)
|
||||
|
||||
let traceln ?__POS__ fmt =
|
||||
let traceln =
|
||||
match Fiber.get traceln_key with
|
||||
| Some { traceln } -> traceln
|
||||
| None
|
||||
| exception (Effect.Unhandled _) -> default_traceln
|
||||
let get () =
|
||||
match Fiber.get traceln_key with
|
||||
| Some traceln -> traceln
|
||||
| None
|
||||
| exception (Effect.Unhandled _) -> { traceln = default_traceln }
|
||||
|
||||
let with_trace_prefix prefix fn =
|
||||
let { traceln } = get () in
|
||||
let traceln ?__POS__ fmt =
|
||||
traceln ?__POS__ ("%t" ^^ fmt) prefix
|
||||
in
|
||||
Fiber.with_binding traceln_key { traceln } fn
|
||||
|
||||
let traceln ?__POS__ fmt =
|
||||
let { traceln } = get () in
|
||||
traceln ?__POS__ fmt
|
||||
|
||||
type t = <
|
||||
|
@ -715,6 +715,9 @@ module Private : sig
|
||||
('a, Format.formatter, unit, unit) format4 -> 'a
|
||||
(** Writes trace logging using the current fiber's configured traceln function. *)
|
||||
|
||||
val with_trace_prefix : (Format.formatter -> unit) -> (unit -> 'a) -> 'a
|
||||
(** [with_trace_prefix fmt fn] runs [fn ()] with a traceln that outputs [fmt] before each message. *)
|
||||
|
||||
val traceln_mutex : Stdlib.Mutex.t
|
||||
(** The mutex used to prevent two domains writing to stderr at once.
|
||||
|
||||
|
@ -1,12 +1,29 @@
|
||||
class virtual t = object
|
||||
method virtual run : 'a. (cancelled:exn Promise.t -> 'a) -> 'a
|
||||
method virtual run_raw : 'a. (unit -> 'a) -> 'a
|
||||
open Std
|
||||
|
||||
type ty = [`Domain_mgr]
|
||||
type 'a t = ([> ty] as 'a) r
|
||||
|
||||
module Pi = struct
|
||||
module type MGR = sig
|
||||
type t
|
||||
val run : t -> (cancelled:exn Promise.t -> 'a) -> 'a
|
||||
val run_raw : t -> (unit -> 'a) -> 'a
|
||||
end
|
||||
|
||||
type (_, _, _) Resource.pi +=
|
||||
| Mgr : ('t, (module MGR with type t = 't), [> ty]) Resource.pi
|
||||
|
||||
let mgr (type t) (module X : MGR with type t = t) =
|
||||
Resource.handler [H (Mgr, (module X))]
|
||||
end
|
||||
|
||||
let run_raw (t : #t) = t#run_raw
|
||||
let run_raw (Resource.T (t, ops)) fn =
|
||||
let module X = (val (Resource.get ops Pi.Mgr)) in
|
||||
X.run_raw t fn
|
||||
|
||||
let run (t : #t) fn =
|
||||
t#run @@ fun ~cancelled ->
|
||||
let run (Resource.T (t, ops)) fn =
|
||||
let module X = (val (Resource.get ops Pi.Mgr)) in
|
||||
X.run t @@ fun ~cancelled ->
|
||||
(* If the spawning fiber is cancelled, [cancelled] gets set to the exception. *)
|
||||
try
|
||||
Fiber.first
|
||||
|
37
lib_eio/domain_manager.mli
Normal file
37
lib_eio/domain_manager.mli
Normal file
@ -0,0 +1,37 @@
|
||||
type ty = [`Domain_mgr]
|
||||
type 'a t = ([> ty] as 'a) Resource.t
|
||||
|
||||
val run : _ t -> (unit -> 'a) -> 'a
|
||||
(** [run t f] runs [f ()] in a newly-created domain and returns the result.
|
||||
|
||||
Other fibers in the calling domain can run in parallel with the new domain.
|
||||
|
||||
Warning: [f] must only access thread-safe values from the calling domain,
|
||||
but this is not enforced by the type system.
|
||||
|
||||
If the calling fiber is cancelled, this is propagated to the spawned domain. *)
|
||||
|
||||
val run_raw : _ t -> (unit -> 'a) -> 'a
|
||||
(** [run_raw t f] is like {!run}, but does not run an event loop in the new domain,
|
||||
and so cannot perform IO, fork fibers, etc. *)
|
||||
|
||||
(** {2 Provider Interface} *)
|
||||
|
||||
module Pi : sig
|
||||
module type MGR = sig
|
||||
type t
|
||||
|
||||
val run : t -> (cancelled:exn Promise.t -> 'a) -> 'a
|
||||
(** [run t fn] runs [fn ~cancelled] in a new domain.
|
||||
|
||||
If the calling fiber is cancelled, [cancelled] becomes resolved to the {!Cancel.Cancelled} exception.
|
||||
[fn] should cancel itself in this case. *)
|
||||
|
||||
val run_raw : t -> (unit -> 'a) -> 'a
|
||||
end
|
||||
|
||||
type (_, _, _) Resource.pi +=
|
||||
| Mgr : ('t, (module MGR with type t = 't), [> ty]) Resource.pi
|
||||
|
||||
val mgr : (module MGR with type t = 't) -> ('t, ty) Resource.handler
|
||||
end
|
@ -29,7 +29,7 @@ module Stdenv = struct
|
||||
let stderr (t : <stderr : _ Flow.sink; ..>) = t#stderr
|
||||
let net (t : <net : _ Net.t; ..>) = t#net
|
||||
let process_mgr (t : <process_mgr : _ Process.mgr; ..>) = t#process_mgr
|
||||
let domain_mgr (t : <domain_mgr : #Domain_manager.t; ..>) = t#domain_mgr
|
||||
let domain_mgr (t : <domain_mgr : _ Domain_manager.t; ..>) = t#domain_mgr
|
||||
let clock (t : <clock : _ Time.clock; ..>) = t#clock
|
||||
let mono_clock (t : <mono_clock : _ Time.Mono.t; ..>) = t#mono_clock
|
||||
let secure_random (t: <secure_random : _ Flow.source; ..>) = t#secure_random
|
||||
|
@ -75,31 +75,7 @@ module Net = Net
|
||||
module Process = Process
|
||||
|
||||
(** Parallel computation across multiple CPU cores. *)
|
||||
module Domain_manager : sig
|
||||
class virtual t : object
|
||||
method virtual run_raw : 'a. (unit -> 'a) -> 'a
|
||||
|
||||
method virtual run : 'a. (cancelled:exn Promise.t -> 'a) -> 'a
|
||||
(** [t#run fn] runs [fn ~cancelled] in a new domain.
|
||||
|
||||
If the calling fiber is cancelled, [cancelled] becomes resolved to the {!Cancel.Cancelled} exception.
|
||||
[fn] should cancel itself in this case. *)
|
||||
end
|
||||
|
||||
val run : #t -> (unit -> 'a) -> 'a
|
||||
(** [run t f] runs [f ()] in a newly-created domain and returns the result.
|
||||
|
||||
Other fibers in the calling domain can run in parallel with the new domain.
|
||||
|
||||
Warning: [f] must only access thread-safe values from the calling domain,
|
||||
but this is not enforced by the type system.
|
||||
|
||||
If the calling fiber is cancelled, this is propagated to the spawned domain. *)
|
||||
|
||||
val run_raw : #t -> (unit -> 'a) -> 'a
|
||||
(** [run_raw t f] is like {!run}, but does not run an event loop in the new domain,
|
||||
and so cannot perform IO, fork fibers, etc. *)
|
||||
end
|
||||
module Domain_manager = Domain_manager
|
||||
|
||||
(** Clocks, time, sleeping and timeouts. *)
|
||||
module Time = Time
|
||||
@ -144,6 +120,9 @@ module Debug : sig
|
||||
If the system is not ready to receive the trace output,
|
||||
the whole domain must block until it is. *)
|
||||
|
||||
val with_trace_prefix : (Format.formatter -> unit) -> (unit -> 'a) -> 'a
|
||||
(** [with_trace_prefix fmt fn] runs [fn ()] with a traceln that outputs [fmt] before each message. *)
|
||||
|
||||
type t = <
|
||||
traceln : traceln Fiber.key;
|
||||
>
|
||||
@ -211,7 +190,7 @@ module Stdenv : sig
|
||||
To use this, see {!Domain_manager}.
|
||||
*)
|
||||
|
||||
val domain_mgr : <domain_mgr : #Domain_manager.t as 'a; ..> -> 'a
|
||||
val domain_mgr : <domain_mgr : _ Domain_manager.t as 'a; ..> -> 'a
|
||||
(** [domain_mgr t] allows running code on other cores. *)
|
||||
|
||||
(** {1 Time}
|
||||
|
38
lib_eio/mock/domain_manager.ml
Normal file
38
lib_eio/mock/domain_manager.ml
Normal file
@ -0,0 +1,38 @@
|
||||
open Eio.Std
|
||||
|
||||
let id = Fiber.create_key ()
|
||||
|
||||
let with_domain_tracing fn =
|
||||
Eio.Debug.with_trace_prefix (fun f ->
|
||||
Fiber.get id |> Option.iter (fun id -> Fmt.pf f "[%s] " id)
|
||||
) fn
|
||||
|
||||
module Fake_domain_mgr = struct
|
||||
type t = {
|
||||
mutable next_domain_id : int;
|
||||
}
|
||||
|
||||
let create () = { next_domain_id = 1 }
|
||||
|
||||
let run t fn =
|
||||
let self = t.next_domain_id in
|
||||
t.next_domain_id <- t.next_domain_id + 1;
|
||||
let cancelled, _ = Promise.create () in
|
||||
Fiber.with_binding id (string_of_int self)
|
||||
(fun () -> fn ~cancelled)
|
||||
|
||||
let run_raw t fn =
|
||||
let self = t.next_domain_id in
|
||||
t.next_domain_id <- t.next_domain_id + 1;
|
||||
Fiber.with_binding id (string_of_int self) fn
|
||||
end
|
||||
|
||||
let create =
|
||||
let handler = Eio.Domain_manager.Pi.mgr (module Fake_domain_mgr) in
|
||||
fun () -> Eio.Resource.T (Fake_domain_mgr.create (), handler)
|
||||
|
||||
let run fn =
|
||||
let dm = create () in
|
||||
with_domain_tracing @@ fun () ->
|
||||
Fiber.with_binding id "0" @@ fun () ->
|
||||
fn dm
|
23
lib_eio/mock/domain_manager.mli
Normal file
23
lib_eio/mock/domain_manager.mli
Normal file
@ -0,0 +1,23 @@
|
||||
open Eio.Std
|
||||
|
||||
val create : unit -> Eio.Domain_manager.ty r
|
||||
(** [create ()] is a mock domain manager.
|
||||
|
||||
When asked to run a new Eio domain, it just runs it in the parent domain.
|
||||
It runs the function in a context where {!id} is a fresh domain ID
|
||||
(assigned sequentially starting from 1). *)
|
||||
|
||||
val run : (Eio.Domain_manager.ty r -> 'a) -> 'a
|
||||
(** [run fn] runs [fn dm], where [dm] is a new fake domain manager.
|
||||
It also runs {!with_domain_tracing} to display domain IDs in trace output.
|
||||
|
||||
[fn] itself runs with {!id} set to "0". *)
|
||||
|
||||
val id : string Fiber.key
|
||||
(** [id] is used to get or set the current fake domain's ID.
|
||||
|
||||
This is used in traceln output. *)
|
||||
|
||||
val with_domain_tracing : (unit -> 'a) -> 'a
|
||||
(** [with_domain_tracing fn] runs [fn ()] with a modified [traceln] function that
|
||||
prefixes the current {!id} (if any) to each trace message. *)
|
@ -3,6 +3,7 @@ module Handler = Handler
|
||||
module Flow = Flow
|
||||
module Net = Net
|
||||
module Clock = Clock
|
||||
module Domain_manager = Domain_manager
|
||||
module Backend = Backend
|
||||
|
||||
type Eio.Exn.Backend.t += Simulated_failure
|
||||
|
@ -145,6 +145,9 @@ end
|
||||
(** A mock {!Eio.Time} clock for testing timeouts. *)
|
||||
module Clock = Clock
|
||||
|
||||
(** A mock {!Eio.Domain_manager} that runs everything in a single domain. *)
|
||||
module Domain_manager = Domain_manager
|
||||
|
||||
(** {2 Backend for mocks}
|
||||
|
||||
The mocks can be used with any backend, but if you don't need any IO then you can use this one
|
||||
|
@ -209,7 +209,7 @@ val accept_fork :
|
||||
|
||||
val run_server :
|
||||
?max_connections:int ->
|
||||
?additional_domains:(#Domain_manager.t * int) ->
|
||||
?additional_domains:(_ Domain_manager.t * int) ->
|
||||
?stop:'a Promise.t ->
|
||||
on_error:(exn -> unit) ->
|
||||
[> 'tag listening_socket_ty ] r ->
|
||||
|
@ -38,7 +38,7 @@ module Stdenv = struct
|
||||
stdout : sink_ty r;
|
||||
stderr : sink_ty r;
|
||||
net : [`Unix | `Generic] Eio.Net.ty r;
|
||||
domain_mgr : Eio.Domain_manager.t;
|
||||
domain_mgr : Eio.Domain_manager.ty r;
|
||||
process_mgr : Process.mgr_ty r;
|
||||
clock : float Eio.Time.clock_ty r;
|
||||
mono_clock : Eio.Time.Mono.ty r;
|
||||
|
@ -69,7 +69,7 @@ module Stdenv : sig
|
||||
stdout : sink_ty r;
|
||||
stderr : sink_ty r;
|
||||
net : [`Unix | `Generic] Eio.Net.ty r;
|
||||
domain_mgr : Eio.Domain_manager.t;
|
||||
domain_mgr : Eio.Domain_manager.ty r;
|
||||
process_mgr : Process.mgr_ty r;
|
||||
clock : float Eio.Time.clock_ty r;
|
||||
mono_clock : Eio.Time.Mono.ty r;
|
||||
|
@ -386,19 +386,23 @@ let unwrap_backtrace = function
|
||||
| Ok x -> x
|
||||
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
|
||||
|
||||
let domain_mgr ~run_event_loop = object
|
||||
inherit Eio.Domain_manager.t
|
||||
module Domain_mgr = struct
|
||||
type t = {
|
||||
run_event_loop : (unit -> unit) -> unit -> unit;
|
||||
}
|
||||
|
||||
method run_raw fn =
|
||||
let make ~run_event_loop = { run_event_loop }
|
||||
|
||||
let run_raw _t fn =
|
||||
let domain = ref None in
|
||||
Sched.enter (fun t k ->
|
||||
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> Sched.enqueue_thread t k ())))
|
||||
);
|
||||
unwrap_backtrace (Domain.join (Option.get !domain))
|
||||
|
||||
method run fn =
|
||||
let run t fn =
|
||||
let domain = ref None in
|
||||
Sched.enter (fun t k ->
|
||||
Sched.enter (fun sched k ->
|
||||
let cancelled, set_cancelled = Promise.create () in
|
||||
Fiber_context.set_cancel_fn k.fiber (Promise.resolve set_cancelled);
|
||||
domain := Some (Domain.spawn (fun () ->
|
||||
@ -406,14 +410,18 @@ let domain_mgr ~run_event_loop = object
|
||||
(fun () ->
|
||||
let result = ref None in
|
||||
let fn = wrap_backtrace (fun () -> fn ~cancelled) in
|
||||
run_event_loop (fun () -> result := Some (fn ())) ();
|
||||
t.run_event_loop (fun () -> result := Some (fn ())) ();
|
||||
Option.get !result
|
||||
)
|
||||
~finally:(fun () -> Sched.enqueue_thread t k ())))
|
||||
~finally:(fun () -> Sched.enqueue_thread sched k ())))
|
||||
);
|
||||
unwrap_backtrace (Domain.join (Option.get !domain))
|
||||
end
|
||||
|
||||
let domain_mgr ~run_event_loop =
|
||||
let handler = Eio.Domain_manager.Pi.mgr (module Domain_mgr) in
|
||||
Eio.Resource.T (Domain_mgr.make ~run_event_loop, handler)
|
||||
|
||||
module Mono_clock = struct
|
||||
type t = unit
|
||||
type time = Mtime.t
|
||||
|
@ -87,17 +87,17 @@ let unwrap_backtrace = function
|
||||
| Ok x -> x
|
||||
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
|
||||
|
||||
let v = object
|
||||
inherit Eio.Domain_manager.t
|
||||
module Impl = struct
|
||||
type t = unit
|
||||
|
||||
method run_raw fn =
|
||||
let run_raw () fn =
|
||||
let domain = ref None in
|
||||
Eio.Private.Suspend.enter (fun _ctx enqueue ->
|
||||
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
|
||||
);
|
||||
unwrap_backtrace (Domain.join (Option.get !domain))
|
||||
|
||||
method run fn =
|
||||
let run () fn =
|
||||
let domain = ref None in
|
||||
Eio.Private.Suspend.enter (fun ctx enqueue ->
|
||||
let cancelled, set_cancelled = Promise.create () in
|
||||
@ -108,3 +108,7 @@ let v = object
|
||||
);
|
||||
unwrap_backtrace (Domain.join (Option.get !domain))
|
||||
end
|
||||
|
||||
let v =
|
||||
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
|
||||
Eio.Resource.T ((), handler)
|
||||
|
@ -88,17 +88,17 @@ let unwrap_backtrace = function
|
||||
| Ok x -> x
|
||||
| Error (ex, bt) -> Printexc.raise_with_backtrace ex bt
|
||||
|
||||
let v = object
|
||||
inherit Eio.Domain_manager.t
|
||||
module Impl = struct
|
||||
type t = unit
|
||||
|
||||
method run_raw fn =
|
||||
let run_raw () fn =
|
||||
let domain = ref None in
|
||||
Eio.Private.Suspend.enter (fun _ctx enqueue ->
|
||||
domain := Some (Domain.spawn (fun () -> Fun.protect (wrap_backtrace fn) ~finally:(fun () -> enqueue (Ok ()))))
|
||||
);
|
||||
unwrap_backtrace (Domain.join (Option.get !domain))
|
||||
|
||||
method run fn =
|
||||
let run () fn =
|
||||
let domain = ref None in
|
||||
Eio.Private.Suspend.enter (fun ctx enqueue ->
|
||||
let cancelled, set_cancelled = Promise.create () in
|
||||
@ -109,3 +109,7 @@ let v = object
|
||||
);
|
||||
unwrap_backtrace (Domain.join (Option.get !domain))
|
||||
end
|
||||
|
||||
let v =
|
||||
let handler = Eio.Domain_manager.Pi.mgr (module Impl) in
|
||||
Eio.Resource.T ((), handler)
|
||||
|
@ -7,7 +7,7 @@
|
||||
```ocaml
|
||||
open Eio.Std
|
||||
|
||||
let run (fn : Eio.Domain_manager.t -> unit) =
|
||||
let run (fn : Eio.Domain_manager.ty r -> unit) =
|
||||
Eio_main.run @@ fun env ->
|
||||
fn (Eio.Stdenv.domain_mgr env)
|
||||
```
|
||||
|
@ -860,42 +860,18 @@ Non-graceful shutdown, closing all connections still in progress:
|
||||
Exception: Failure "Simulated error".
|
||||
```
|
||||
|
||||
To test support for multiple domains, we just run everything in one domain
|
||||
to keep the output deterministic. We override `traceln` to log the (fake)
|
||||
domain ID too:
|
||||
|
||||
```ocaml
|
||||
let with_domain_tracing id fn =
|
||||
let traceln ?__POS__ fmt =
|
||||
Eio.Private.Debug.default_traceln ?__POS__ ("[%d] " ^^ fmt) id
|
||||
in
|
||||
Fiber.with_binding Eio.Private.Debug.v#traceln { traceln } fn
|
||||
|
||||
let fake_domain_mgr () = object (_ : #Eio.Domain_manager.t)
|
||||
val mutable next_domain_id = 1
|
||||
|
||||
method run fn =
|
||||
let self = next_domain_id in
|
||||
next_domain_id <- next_domain_id + 1;
|
||||
let cancelled, _ = Promise.create () in
|
||||
with_domain_tracing self (fun () -> fn ~cancelled)
|
||||
|
||||
method run_raw _ = assert false
|
||||
end
|
||||
```
|
||||
|
||||
Handling the connections with 3 domains, with a graceful shutdown:
|
||||
|
||||
```ocaml
|
||||
# Eio_mock.Backend.run @@ fun () ->
|
||||
with_domain_tracing 0 @@ fun () ->
|
||||
Eio_mock.Domain_manager.run @@ fun fake_domain_mgr ->
|
||||
let n_domains = 3 in
|
||||
let listening_socket = mock_listener ~n_clients:10 ~n_domains in
|
||||
let stop, set_stop = Promise.create () in
|
||||
Fiber.both
|
||||
(fun () ->
|
||||
Eio.Net.run_server listening_socket handle_connection
|
||||
~additional_domains:(fake_domain_mgr (), n_domains - 1)
|
||||
~additional_domains:(fake_domain_mgr, n_domains - 1)
|
||||
~max_connections:10
|
||||
~on_error:raise
|
||||
~stop
|
||||
@ -938,13 +914,13 @@ Handling the connections with 3 domains, aborting immediately:
|
||||
|
||||
```ocaml
|
||||
# Eio_mock.Backend.run @@ fun () ->
|
||||
with_domain_tracing 0 @@ fun () ->
|
||||
Eio_mock.Domain_manager.run @@ fun fake_domain_mgr ->
|
||||
let n_domains = 3 in
|
||||
let listening_socket = mock_listener ~n_clients:10 ~n_domains in
|
||||
Fiber.both
|
||||
(fun () ->
|
||||
Eio.Net.run_server listening_socket handle_connection
|
||||
~additional_domains:(fake_domain_mgr (), n_domains - 1)
|
||||
~additional_domains:(fake_domain_mgr, n_domains - 1)
|
||||
~max_connections:10
|
||||
~on_error:raise
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user