Merge pull request #113 from talex5/sync-doc

Explain about Promises and Streams in the README
This commit is contained in:
Thomas Leonard 2021-12-13 13:05:49 +00:00 committed by GitHub
commit 2c6d602231
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 200 additions and 17 deletions

180
README.md
View File

@ -26,6 +26,10 @@ This is an unreleased repository, as it's very much a work-in-progress.
* [Time](#time)
* [Multicore Support](#multicore-support)
* [Design Note: Thread-Safety](#design-note-thread-safety)
* [Synchronisation Tools](#synchronisation-tools)
* [Promises](#promises)
* [Streams](#streams)
* [Example: a worker pool](#example-a-worker-pool)
* [Design Note: Determinism](#design-note-determinism)
* [Examples](#examples)
* [Further Reading](#further-reading)
@ -656,6 +660,182 @@ However, if `q` is wrapped by a mutex (as in 4) then the assertion could fail.
The first `Queue.length` will lock and then release the queue, then the second will lock and release it again.
Another domain could change the value between these two calls.
## Synchronisation Tools
Eio provides several sub-modules for communicating between fibres and domains.
### Promises
Promises are a simple and reliable way to communicate between fibres.
One fibre can wait for a promise and another can resolve it:
```ocaml
# Eio_main.run @@ fun _ ->
let promise, resolver = Promise.create () in
Fibre.both
(fun () ->
traceln "Waiting for promise...";
let x = Promise.await promise in
traceln "x = %d" x
)
(fun () ->
traceln "Resolving promise";
Promise.fulfill resolver 42
);;
+Waiting for promise...
+Resolving promise
+x = 42
- : unit = ()
```
A promise is initially "unresolved". It can then either become "fulfilled" (as in the example above) or "broken" (with an exception).
Either way, the promise is then said to be "resolved". A promise can only be resolved once.
Awaiting a promise that is already resolved immediately returns the resolved value
(or raises the exception, if broken).
Promises are one of the easiest tools to use safely:
it doesn't matter whether you wait on a promise before or after it is resolved,
and multiple fibres can wait for the same promise and will get the same result.
Promises are thread-safe; you can wait for a promise in one domain and resolve it in another.
Promises are also useful for integrating with callback-based libraries. For example:
```ocaml
let wrap fn x =
let promise, resolver = Promise.create () in
fn x
~on_success:(Promise.resolve resolver)
~on_error:(Promise.break resolver);
Promise.await promise
```
### Streams
A stream is a bounded queue. Reading from an empty stream waits until an item is available.
Writing to a full stream waits for space.
```ocaml
# Eio_main.run @@ fun _ ->
let stream = Eio.Stream.create 2 in
Fibre.both
(fun () ->
for i = 1 to 5 do
traceln "Adding %d..." i;
Eio.Stream.add stream i
done
)
(fun () ->
for i = 1 to 5 do
let x = Eio.Stream.take stream in
traceln "Got %d" x;
Fibre.yield ()
done
);;
+Adding 1...
+Adding 2...
+Adding 3...
+Got 1
+Adding 4...
+Got 2
+Adding 5...
+Got 3
+Got 4
+Got 5
- : unit = ()
```
Here, we create a stream with a maximum size of 2 items.
The first fibre added 1 and 2 to the stream, but had to wait before it could insert 3.
A stream with a capacity of 1 acts like a mailbox.
A stream with a capacity of 0 will wait until both the sender and receiver are ready.
Streams are thread-safe and can be used to communicate between domains.
### Example: a worker pool
A useful pattern is a pool of workers reading from a stream of work items.
Client fibres submit items to a stream and workers process the items:
```ocaml
let handle_job request =
Printf.sprintf "Processed:%d" request
let run_worker id stream =
traceln "Worker %s ready" id;
while true do
let request, reply = Eio.Stream.take stream in
traceln "Worker %s processing request %d" id request;
Promise.fulfill reply (handle_job request)
done
let submit stream request =
let reply, resolve_reply = Promise.create () in
Eio.Stream.add stream (request, resolve_reply);
Promise.await reply
```
Each item in the stream is a request payload and a resolver for the reply promise.
```ocaml
# Eio_main.run @@ fun env ->
let domain_mgr = Eio.Stdenv.domain_mgr env in
Switch.run @@ fun sw ->
let stream = Eio.Stream.create 100 in
let spawn_worker name =
Fibre.fork_ignore ~sw (fun () ->
Eio.Domain_manager.run domain_mgr (fun () -> run_worker name stream)
)
in
spawn_worker "A";
spawn_worker "B";
Switch.run (fun sw ->
for i = 1 to 3 do
Fibre.fork_ignore ~sw (fun () ->
traceln "Client %d submitting job..." i;
traceln "Client %d got %s" i (submit stream i)
)
done;
);
raise Exit;;
+Worker A ready
+Client 1 submitting job...
+Worker B ready
+Client 2 submitting job...
+Worker A processing request 1
+Client 3 submitting job...
+Worker B processing request 2
+Client 1 got Processed:1
+Worker A processing request 3
+Client 2 got Processed:2
+Client 3 got Processed:3
Exception: Stdlib.Exit.
```
In the code above, any exception raised while processing a job will exit the whole program.
We might prefer to handle exceptions by sending them back to the client and continuing:
```ocaml
let run_worker id stream =
traceln "Worker %s ready" id;
while true do
let request, reply = Eio.Stream.take stream in
traceln "Worker %s processing request %d" id request;
match handle_job request with
| result -> Promise.fulfill reply result
| exception Eio.Cancel.Cancelled ex ->
Promise.break reply (Failure "Worker shut down");
raise ex
| exception ex ->
Promise.break reply ex
done
```
Note that as we're catching all exceptions here we need a special case for `Cancelled`,
which indicates that the worker was asked to shut down while processing a request.
We don't send the `Cancelled` exception itself to the client as we're not asking it to shut down too,
but we do exit the loop and propagate the cancellation to the parent context.
## Design Note: Determinism
Within a domain, fibres are scheduled deterministically.

View File

@ -18,6 +18,12 @@ module Eio_main = struct
now := max !now time
end
(* To avoid non-deterministic output, we run the examples a single domain. *)
let fake_domain_mgr = object (_ : #Eio.Domain_manager.t)
method run fn = fn ()
method run_raw fn = fn ()
end
(* https://github.com/ocaml/ocaml/issues/10324 *)
let dontcrash = Sys.opaque_identity
@ -28,7 +34,7 @@ module Eio_main = struct
method stdin = dontcrash env#stdin
method stdout = dontcrash env#stdout
method cwd = dontcrash env#cwd
method domain_mgr = dontcrash env#domain_mgr
method domain_mgr = fake_domain_mgr
method clock = fake_clock env#clock
end
end

View File

@ -105,7 +105,7 @@ module Std : sig
(** [break u ex] resolves [u]'s promise with the exception [ex].
Any threads waiting for the result will be added to the run queue. *)
val resolve : 'a t -> ('a, exn) result -> unit
val resolve : 'a u -> ('a, exn) result -> unit
(** [resolve t (Ok x)] is [fulfill t x] and
[resolve t (Error ex)] is [break t ex]. *)
@ -176,10 +176,11 @@ module Std : sig
@param on_error This is called if the fibre raises an exception (other than {!Cancel.Cancelled}).
If it raises in turn, the parent switch is turned off. *)
val fork : sw:Switch.t -> exn_turn_off:bool -> (unit -> 'a) -> 'a Promise.t
(** [fork ~sw ~exn_turn_off fn] starts running [fn ()] in a new fibre and returns a promise for its result.
val fork : sw:Switch.t -> (unit -> 'a) -> 'a Promise.t
(** [fork ~sw fn] schedules [fn ()] to run in a new fibre and returns a promise for its result.
The new fibre is attached to [sw] (which can't finish until the fibre ends).
@param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *)
[fork] returns immediately, before the new thread starts.
If [fn] raises an exception then the promise is broken (but [sw] is not turned off). *)
val yield : unit -> unit
(** [yield ()] asks the scheduler to switch to the next runnable task.

View File

@ -2,15 +2,11 @@ open EffectHandlers
type _ eff += Fork : (Cancel.fibre_context -> 'a) -> 'a Promise.t eff
let fork ~sw ~exn_turn_off f =
let fork ~sw f =
let f child =
Switch.with_op sw @@ fun () ->
try
Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t ->
f ()
with ex ->
if exn_turn_off then Switch.turn_off sw ex;
raise ex
Cancel.with_cc ~ctx:child ~parent:sw.cancel ~protected:false @@ fun _t ->
f ()
in
perform (Fork f)

View File

@ -6,7 +6,7 @@ let () =
Printexc.record_backtrace true
let read_one_byte ~sw r =
Fibre.fork ~sw ~exn_turn_off:true (fun () ->
Fibre.fork ~sw (fun () ->
let r = Option.get (Eio_linux.Objects.get_fd_opt r) in
Eio_linux.await_readable r;
let b = Bytes.create 1 in

View File

@ -235,7 +235,7 @@ Exception: Failure "simulated error".
(fun () ->
let sw = Option.get !switch in
Eio.Cancel.protect @@ fun () ->
let child = Fibre.fork ~sw ~exn_turn_off:true (fun () ->
let child = Fibre.fork ~sw (fun () ->
traceln "Forked child";
Fibre.await_cancel ()
) in

View File

@ -24,7 +24,7 @@ Create a promise, fork a thread waiting for it, then fulfull it:
Switch.run @@ fun sw ->
let p, r = Promise.create () in
traceln "Initial state: %a" (pp_promise Fmt.string) p;
let thread = Fibre.fork ~sw ~exn_turn_off:false (fun () -> Promise.await p) in
let thread = Fibre.fork ~sw (fun () -> Promise.await p) in
Promise.fulfill r "ok";
traceln "After being fulfilled: %a" (pp_promise Fmt.string) p;
traceln "Thread before yield: %a" (pp_promise Fmt.string) thread;
@ -45,7 +45,7 @@ Create a promise, fork a thread waiting for it, then break it:
Switch.run @@ fun sw ->
let p, r = Promise.create () in
traceln "Initial state: %a" (pp_promise Fmt.string) p;
let thread = Fibre.fork ~sw ~exn_turn_off:false (fun () -> Promise.await p) in
let thread = Fibre.fork ~sw (fun () -> Promise.await p) in
Promise.break r (Failure "test");
traceln "After being broken: %a" (pp_promise Fmt.string) p;
traceln "Thread before yield: %a" (pp_promise Fmt.string) thread;
@ -93,7 +93,7 @@ Basic semaphore tests:
Switch.run @@ fun sw ->
let running = ref 0 in
let sem = Semaphore.make 2 in
let fork = Fibre.fork ~sw ~exn_turn_off:false in
let fork = Fibre.fork ~sw in
let a = fork (fun () -> Ctf.label "a"; Semaphore.acquire sem; incr running) in
let b = fork (fun () -> Ctf.label "b"; Semaphore.acquire sem; incr running) in
let c = fork (fun () -> Ctf.label "c"; Semaphore.acquire sem; incr running) in