From 43bae5733004ff7abf0aeb7ef716e20a971e0b60 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Fri, 10 Dec 2021 16:54:43 +0000 Subject: [PATCH] Explain about Promises and Streams in the README --- README.md | 180 ++++++++++++++++++++++++++++++++++++ doc/prelude.ml | 8 +- lib_eio/eio.mli | 9 +- lib_eio/fibre.ml | 10 +- lib_eio_linux/tests/test.ml | 2 +- tests/test_fibre.md | 2 +- tests/test_sync.md | 6 +- 7 files changed, 200 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 17bed1f..601266c 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,10 @@ This is an unreleased repository, as it's very much a work-in-progress. * [Time](#time) * [Multicore Support](#multicore-support) * [Design Note: Thread-Safety](#design-note-thread-safety) +* [Synchronisation Tools](#synchronisation-tools) + * [Promises](#promises) + * [Streams](#streams) + * [Example: a worker pool](#example-a-worker-pool) * [Design Note: Determinism](#design-note-determinism) * [Examples](#examples) * [Further Reading](#further-reading) @@ -656,6 +660,182 @@ However, if `q` is wrapped by a mutex (as in 4) then the assertion could fail. The first `Queue.length` will lock and then release the queue, then the second will lock and release it again. Another domain could change the value between these two calls. +## Synchronisation Tools + +Eio provides several sub-modules for communicating between fibres and domains. + +### Promises + +Promises are a simple and reliable way to communicate between fibres. +One fibre can wait for a promise and another can resolve it: + +```ocaml +# Eio_main.run @@ fun _ -> + let promise, resolver = Promise.create () in + Fibre.both + (fun () -> + traceln "Waiting for promise..."; + let x = Promise.await promise in + traceln "x = %d" x + ) + (fun () -> + traceln "Resolving promise"; + Promise.fulfill resolver 42 + );; ++Waiting for promise... ++Resolving promise ++x = 42 +- : unit = () +``` + +A promise is initially "unresolved". It can then either become "fulfilled" (as in the example above) or "broken" (with an exception). +Either way, the promise is then said to be "resolved". A promise can only be resolved once. +Awaiting a promise that is already resolved immediately returns the resolved value +(or raises the exception, if broken). + +Promises are one of the easiest tools to use safely: +it doesn't matter whether you wait on a promise before or after it is resolved, +and multiple fibres can wait for the same promise and will get the same result. +Promises are thread-safe; you can wait for a promise in one domain and resolve it in another. + +Promises are also useful for integrating with callback-based libraries. For example: + +```ocaml +let wrap fn x = + let promise, resolver = Promise.create () in + fn x + ~on_success:(Promise.resolve resolver) + ~on_error:(Promise.break resolver); + Promise.await promise +``` + +### Streams + +A stream is a bounded queue. Reading from an empty stream waits until an item is available. +Writing to a full stream waits for space. + +```ocaml +# Eio_main.run @@ fun _ -> + let stream = Eio.Stream.create 2 in + Fibre.both + (fun () -> + for i = 1 to 5 do + traceln "Adding %d..." i; + Eio.Stream.add stream i + done + ) + (fun () -> + for i = 1 to 5 do + let x = Eio.Stream.take stream in + traceln "Got %d" x; + Fibre.yield () + done + );; ++Adding 1... ++Adding 2... ++Adding 3... ++Got 1 ++Adding 4... ++Got 2 ++Adding 5... ++Got 3 ++Got 4 ++Got 5 +- : unit = () +``` + +Here, we create a stream with a maximum size of 2 items. +The first fibre added 1 and 2 to the stream, but had to wait before it could insert 3. + +A stream with a capacity of 1 acts like a mailbox. +A stream with a capacity of 0 will wait until both the sender and receiver are ready. + +Streams are thread-safe and can be used to communicate between domains. + +### Example: a worker pool + +A useful pattern is a pool of workers reading from a stream of work items. +Client fibres submit items to a stream and workers process the items: + +```ocaml +let handle_job request = + Printf.sprintf "Processed:%d" request + +let run_worker id stream = + traceln "Worker %s ready" id; + while true do + let request, reply = Eio.Stream.take stream in + traceln "Worker %s processing request %d" id request; + Promise.fulfill reply (handle_job request) + done + +let submit stream request = + let reply, resolve_reply = Promise.create () in + Eio.Stream.add stream (request, resolve_reply); + Promise.await reply +``` + +Each item in the stream is a request payload and a resolver for the reply promise. + +```ocaml +# Eio_main.run @@ fun env -> + let domain_mgr = Eio.Stdenv.domain_mgr env in + Switch.run @@ fun sw -> + let stream = Eio.Stream.create 100 in + let spawn_worker name = + Fibre.fork_ignore ~sw (fun () -> + Eio.Domain_manager.run domain_mgr (fun () -> run_worker name stream) + ) + in + spawn_worker "A"; + spawn_worker "B"; + Switch.run (fun sw -> + for i = 1 to 3 do + Fibre.fork_ignore ~sw (fun () -> + traceln "Client %d submitting job..." i; + traceln "Client %d got %s" i (submit stream i) + ) + done; + ); + raise Exit;; ++Worker A ready ++Client 1 submitting job... ++Worker B ready ++Client 2 submitting job... ++Worker A processing request 1 ++Client 3 submitting job... ++Worker B processing request 2 ++Client 1 got Processed:1 ++Worker A processing request 3 ++Client 2 got Processed:2 ++Client 3 got Processed:3 +Exception: Stdlib.Exit. +``` + +In the code above, any exception raised while processing a job will exit the whole program. +We might prefer to handle exceptions by sending them back to the client and continuing: + +```ocaml +let run_worker id stream = + traceln "Worker %s ready" id; + while true do + let request, reply = Eio.Stream.take stream in + traceln "Worker %s processing request %d" id request; + match handle_job request with + | result -> Promise.fulfill reply result + | exception Eio.Cancel.Cancelled ex -> + Promise.break reply (Failure "Worker shut down"); + raise ex + | exception ex -> + Promise.break reply ex + done +``` + +Note that as we're catching all exceptions here we need a special case for `Cancelled`, +which indicates that the worker was asked to shut down while processing a request. +We don't send the `Cancelled` exception itself to the client as we're not asking it to shut down too, +but we do exit the loop and propagate the cancellation to the parent context. + ## Design Note: Determinism Within a domain, fibres are scheduled deterministically. diff --git a/doc/prelude.ml b/doc/prelude.ml index cca8863..6671941 100644 --- a/doc/prelude.ml +++ b/doc/prelude.ml @@ -18,6 +18,12 @@ module Eio_main = struct now := max !now time end + (* To avoid non-deterministic output, we run the examples a single domain. *) + let fake_domain_mgr = object (_ : #Eio.Domain_manager.t) + method run fn = fn () + method run_raw fn = fn () + end + (* https://github.com/ocaml/ocaml/issues/10324 *) let dontcrash = Sys.opaque_identity @@ -28,7 +34,7 @@ module Eio_main = struct method stdin = dontcrash env#stdin method stdout = dontcrash env#stdout method cwd = dontcrash env#cwd - method domain_mgr = dontcrash env#domain_mgr + method domain_mgr = fake_domain_mgr method clock = fake_clock env#clock end end diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 7e39eb5..72fb92f 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -105,7 +105,7 @@ module Std : sig (** [break u ex] resolves [u]'s promise with the exception [ex]. Any threads waiting for the result will be added to the run queue. *) - val resolve : 'a t -> ('a, exn) result -> unit + val resolve : 'a u -> ('a, exn) result -> unit (** [resolve t (Ok x)] is [fulfill t x] and [resolve t (Error ex)] is [break t ex]. *) @@ -176,10 +176,11 @@ module Std : sig @param on_error This is called if the fibre raises an exception (other than {!Cancel.Cancelled}). If it raises in turn, the parent switch is turned off. *) - val fork : sw:Switch.t -> exn_turn_off:bool -> (unit -> 'a) -> 'a Promise.t - (** [fork ~sw ~exn_turn_off fn] starts running [fn ()] in a new fibre and returns a promise for its result. + val fork : sw:Switch.t -> (unit -> 'a) -> 'a Promise.t + (** [fork ~sw fn] schedules [fn ()] to run in a new fibre and returns a promise for its result. The new fibre is attached to [sw] (which can't finish until the fibre ends). - @param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *) + [fork] returns immediately, before the new thread starts. + If [fn] raises an exception then the promise is broken (but [sw] is not turned off). *) val yield : unit -> unit (** [yield ()] asks the scheduler to switch to the next runnable task. diff --git a/lib_eio/fibre.ml b/lib_eio/fibre.ml index 9f8d919..bf5ccc4 100644 --- a/lib_eio/fibre.ml +++ b/lib_eio/fibre.ml @@ -2,15 +2,11 @@ open EffectHandlers type _ eff += Fork : (Cancel.fibre_context -> 'a) -> 'a Promise.t eff -let fork ~sw ~exn_turn_off f = +let fork ~sw f = let f child = Switch.with_op sw @@ fun () -> - try - Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> - f () - with ex -> - if exn_turn_off then Switch.turn_off sw ex; - raise ex + Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t -> + f () in perform (Fork f) diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 1839eb0..752c9f9 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -6,7 +6,7 @@ let () = Printexc.record_backtrace true let read_one_byte ~sw r = - Fibre.fork ~sw ~exn_turn_off:true (fun () -> + Fibre.fork ~sw (fun () -> let r = Option.get (Eio_linux.Objects.get_fd_opt r) in Eio_linux.await_readable r; let b = Bytes.create 1 in diff --git a/tests/test_fibre.md b/tests/test_fibre.md index 90c0aab..1656a7f 100644 --- a/tests/test_fibre.md +++ b/tests/test_fibre.md @@ -235,7 +235,7 @@ Exception: Failure "simulated error". (fun () -> let sw = Option.get !switch in Eio.Cancel.protect @@ fun () -> - let child = Fibre.fork ~sw ~exn_turn_off:true (fun () -> + let child = Fibre.fork ~sw (fun () -> traceln "Forked child"; Fibre.await_cancel () ) in diff --git a/tests/test_sync.md b/tests/test_sync.md index 62d3673..d1a2657 100644 --- a/tests/test_sync.md +++ b/tests/test_sync.md @@ -24,7 +24,7 @@ Create a promise, fork a thread waiting for it, then fulfull it: Switch.run @@ fun sw -> let p, r = Promise.create () in traceln "Initial state: %a" (pp_promise Fmt.string) p; - let thread = Fibre.fork ~sw ~exn_turn_off:false (fun () -> Promise.await p) in + let thread = Fibre.fork ~sw (fun () -> Promise.await p) in Promise.fulfill r "ok"; traceln "After being fulfilled: %a" (pp_promise Fmt.string) p; traceln "Thread before yield: %a" (pp_promise Fmt.string) thread; @@ -45,7 +45,7 @@ Create a promise, fork a thread waiting for it, then break it: Switch.run @@ fun sw -> let p, r = Promise.create () in traceln "Initial state: %a" (pp_promise Fmt.string) p; - let thread = Fibre.fork ~sw ~exn_turn_off:false (fun () -> Promise.await p) in + let thread = Fibre.fork ~sw (fun () -> Promise.await p) in Promise.break r (Failure "test"); traceln "After being broken: %a" (pp_promise Fmt.string) p; traceln "Thread before yield: %a" (pp_promise Fmt.string) thread; @@ -93,7 +93,7 @@ Basic semaphore tests: Switch.run @@ fun sw -> let running = ref 0 in let sem = Semaphore.make 2 in - let fork = Fibre.fork ~sw ~exn_turn_off:false in + let fork = Fibre.fork ~sw in let a = fork (fun () -> Ctf.label "a"; Semaphore.acquire sem; incr running) in let b = fork (fun () -> Ctf.label "b"; Semaphore.acquire sem; incr running) in let c = fork (fun () -> Ctf.label "c"; Semaphore.acquire sem; incr running) in