From 1e4519867179dea384f6f723ce3f770d7dfc653d Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Tue, 16 Mar 2021 15:23:02 +0000 Subject: [PATCH] Split promises into their own library --- Makefile | 2 +- dune-project | 6 +++- eunix.opam | 2 +- lib_eioio/dune | 3 +- lib_eioio/eunix.ml | 61 +++++------------------------------------ lib_eioio/eunix.mli | 27 ------------------ lib_promise/dune | 4 +++ lib_promise/promise.ml | 46 +++++++++++++++++++++++++++++++ lib_promise/promise.mli | 39 ++++++++++++++++++++++++++ promise.opam | 28 +++++++++++++++++++ tests/eurcp_lib.ml | 2 +- tests/test.ml | 28 +++++++++++-------- 12 files changed, 150 insertions(+), 98 deletions(-) create mode 100644 lib_promise/dune create mode 100644 lib_promise/promise.ml create mode 100644 lib_promise/promise.mli create mode 100644 promise.opam diff --git a/Makefile b/Makefile index 2a7cfd6..b697f84 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,2 @@ all: - dune build @runtest @install + dune build @runtest @all diff --git a/dune-project b/dune-project index 35df44b..cfcdaca 100644 --- a/dune-project +++ b/dune-project @@ -3,7 +3,7 @@ (generate_opam_files true) (source (github ocaml-multicore/eioio)) (license ISC) -(authors "Anil Madhavapeddy") +(authors "Anil Madhavapeddy" "Thomas Leonard") (maintainers "anil@recoil.org") (package (name eunix) @@ -11,3 +11,7 @@ (description "An effect-based IO implementation for multicore OCaml with fibres.") (depends )) +(package + (name promise) + (synopsis "effect-based promises") + (description "An effect-based implementation of promises.")) diff --git a/eunix.opam b/eunix.opam index 3ec2df2..8469031 100644 --- a/eunix.opam +++ b/eunix.opam @@ -4,7 +4,7 @@ synopsis: "effect-based direct-style IO for OCaml" description: "An effect-based IO implementation for multicore OCaml with fibres." maintainer: ["anil@recoil.org"] -authors: ["Anil Madhavapeddy"] +authors: ["Anil Madhavapeddy" "Thomas Leonard"] license: "ISC" homepage: "https://github.com/ocaml-multicore/eioio" bug-reports: "https://github.com/ocaml-multicore/eioio/issues" diff --git a/lib_eioio/dune b/lib_eioio/dune index 48f3e4a..80ce678 100644 --- a/lib_eioio/dune +++ b/lib_eioio/dune @@ -2,5 +2,4 @@ (name eunix) (public_name eunix) (modules eunix zzz) - (libraries unix uring bheap logs fmt bigstringaf)) - + (libraries promise unix uring bheap logs fmt bigstringaf)) diff --git a/lib_eioio/eunix.ml b/lib_eioio/eunix.ml index fd58c2c..88c89bd 100644 --- a/lib_eioio/eunix.ml +++ b/lib_eioio/eunix.ml @@ -17,48 +17,6 @@ let src = Logs.Src.create "eunix" ~doc:"Effect-based IO system" module Log = (val Logs.src_log src : Logs.LOG) -module Promise = struct - type 'a state = - | Unresolved of ((('a, exn) result -> unit) Queue.t) - | Fulfilled of 'a - | Broken of exn - - type 'a t = 'a state ref - - type 'a u = 'a t - - effect Await : 'a t -> 'a - - let create () = - let t = ref (Unresolved (Queue.create ())) in - t, t - - let await t = - perform (Await t) - - let fulfill t v = - match !t with - | Broken ex -> Fmt.failwith "Can't fulfill already-broken promise: %a" Fmt.exn ex - | Fulfilled _ -> Fmt.failwith "Can't fulfill already-fulfilled promise" - | Unresolved q -> - t := Fulfilled v; - Queue.iter (fun f -> f (Ok v)) q - - let break t ex = - match !t with - | Broken orig -> Fmt.failwith "Can't break already-broken promise: %a -> %a" Fmt.exn orig Fmt.exn ex - | Fulfilled _ -> Fmt.failwith "Can't break already-fulfilled promise (with %a)" Fmt.exn ex - | Unresolved q -> - t := Broken ex; - Queue.iter (fun f -> f (Error ex)) q - - let state t = - match !t with - | Unresolved _ -> `Unresolved - | Fulfilled x -> `Fulfilled x - | Broken ex -> `Broken ex -end - type amount = Exactly of int | Upto of int type rw_req = { @@ -292,18 +250,13 @@ let run ?(queue_depth=64) ?(block_size=4096) main = | effect (Sleep d) k -> Zzz.sleep sleep_q d k; schedule st - | effect (Promise.Await p) k -> - begin match !p with - | Fulfilled v -> continue k v - | Broken ex -> discontinue k ex - | Unresolved q -> - let when_resolved = function - | Ok v -> enqueue_thread st k v - | Error ex -> enqueue_failed_thread st k ex - in - Queue.add when_resolved q; - schedule st - end + | effect (Promise.Await q) k -> + let when_resolved = function + | Ok v -> enqueue_thread st k v + | Error ex -> enqueue_failed_thread st k ex + in + Promise.add_waiter q when_resolved; + schedule st | effect (Fork f) k -> let promise, resolver = Promise.create () in enqueue_thread st k promise; diff --git a/lib_eioio/eunix.mli b/lib_eioio/eunix.mli index 276a16b..fb0fd57 100644 --- a/lib_eioio/eunix.mli +++ b/lib_eioio/eunix.mli @@ -16,33 +16,6 @@ type t -module Promise : sig - type 'a t - (** An ['a t] is a promise for a value of type ['a]. *) - - type 'a u - (** An ['a u] is a resolver for a promise of type ['a]. *) - - val create : unit -> 'a t * 'a u - (** [create ()] is a fresh promise/resolver pair. - The promise is initially unresolved. *) - - val await : 'a t -> 'a - (** [await t] blocks until [t] is resolved. - If [t] is already resolved then this returns immediately. - If [t] is broken, it raises the exception. *) - - val fulfill : 'a u -> 'a -> unit - (** [fulfill u v] successfully resolves [u]'s promise with the value [v]. - Any threads waiting for the result will be added to the run queue. *) - - val break : 'a u -> exn -> unit - (** [break u ex] resolves [u]'s promise with the exception [ex]. - Any threads waiting for the result will be added to the run queue. *) - - val state : 'a t -> [ `Fulfilled of 'a | `Broken of exn | `Unresolved ] -end - (** {1 Fibre functions} *) val fork : (unit -> 'a) -> 'a Promise.t diff --git a/lib_promise/dune b/lib_promise/dune new file mode 100644 index 0000000..6cfb3df --- /dev/null +++ b/lib_promise/dune @@ -0,0 +1,4 @@ +(library + (name promise) + (public_name promise) + (flags (:standard -w -50))) diff --git a/lib_promise/promise.ml b/lib_promise/promise.ml new file mode 100644 index 0000000..5ad431f --- /dev/null +++ b/lib_promise/promise.ml @@ -0,0 +1,46 @@ +type 'a waiters = (('a, exn) result -> unit) Queue.t + +type 'a state = + | Unresolved of 'a waiters + | Fulfilled of 'a + | Broken of exn + +type 'a t = 'a state ref + +type 'a u = 'a t + +effect Await : 'a waiters -> 'a + +let create () = + let t = ref (Unresolved (Queue.create ())) in + t, t + +let await t = + match !t with + | Fulfilled x -> x + | Broken ex -> raise ex + | Unresolved q -> + perform (Await q) + +let fulfill t v = + match !t with + | Broken ex -> invalid_arg ("Can't fulfill already-broken promise: " ^ Printexc.to_string ex) + | Fulfilled _ -> invalid_arg "Can't fulfill already-fulfilled promise" + | Unresolved q -> + t := Fulfilled v; + Queue.iter (fun f -> f (Ok v)) q + +let break t ex = + match !t with + | Broken orig -> invalid_arg (Printf.sprintf "Can't break already-broken promise: %s -> %s" + (Printexc.to_string orig) (Printexc.to_string ex)) + | Fulfilled _ -> invalid_arg (Printf.sprintf "Can't break already-fulfilled promise (with %s)" + (Printexc.to_string ex)) + | Unresolved q -> + t := Broken ex; + Queue.iter (fun f -> f (Error ex)) q + +let state t = !t + +let add_waiter waiters cb = + Queue.add cb waiters diff --git a/lib_promise/promise.mli b/lib_promise/promise.mli new file mode 100644 index 0000000..c32d5cf --- /dev/null +++ b/lib_promise/promise.mli @@ -0,0 +1,39 @@ +type 'a t +(** An ['a t] is a promise for a value of type ['a]. *) + +type 'a u +(** An ['a u] is a resolver for a promise of type ['a]. *) + +val create : unit -> 'a t * 'a u +(** [create ()] is a fresh promise/resolver pair. + The promise is initially unresolved. *) + +val await : 'a t -> 'a +(** [await t] blocks until [t] is resolved. + If [t] is already resolved then this returns immediately. + If [t] is broken, it raises the exception. *) + +val fulfill : 'a u -> 'a -> unit +(** [fulfill u v] successfully resolves [u]'s promise with the value [v]. + Any threads waiting for the result will be added to the run queue. *) + +val break : 'a u -> exn -> unit +(** [break u ex] resolves [u]'s promise with the exception [ex]. + Any threads waiting for the result will be added to the run queue. *) + +type 'a waiters + +type 'a state = + | Unresolved of 'a waiters + | Fulfilled of 'a + | Broken of exn + +val state : 'a t -> 'a state + +(** {2 Provider API} *) + +val add_waiter : 'a waiters -> (('a, exn) result -> unit) -> unit + +effect Await : 'a waiters -> 'a +(** Performed when the user calls [await] on an unresolved promise. + The handler should add itself to the list of waiters and block until its callback is invoked. *) diff --git a/promise.opam b/promise.opam new file mode 100644 index 0000000..7a4af10 --- /dev/null +++ b/promise.opam @@ -0,0 +1,28 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "effect-based promises" +description: "An effect-based implementation of promises." +maintainer: ["anil@recoil.org"] +authors: ["Anil Madhavapeddy" "Thomas Leonard"] +license: "ISC" +homepage: "https://github.com/ocaml-multicore/eioio" +bug-reports: "https://github.com/ocaml-multicore/eioio/issues" +depends: [ + "dune" {>= "2.7"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/ocaml-multicore/eioio.git" diff --git a/tests/eurcp_lib.ml b/tests/eurcp_lib.ml index f047b5c..6e11ad1 100644 --- a/tests/eurcp_lib.ml +++ b/tests/eurcp_lib.ml @@ -18,7 +18,7 @@ let copy_file infd outfd insize block_size = let len = min block_size remaining in let thread = U.fork (fun () -> read_then_write_chunk infd outfd file_offset len) in copy_block (file_offset + len); - U.Promise.await thread + Promise.await thread in copy_block 0 diff --git a/tests/test.ml b/tests/test.ml index 4a34d53..bc0e6a0 100644 --- a/tests/test.ml +++ b/tests/test.ml @@ -14,29 +14,35 @@ let state t = | `Fulfilled x -> Fmt.pf f "fulfilled:%a" (Alcotest.pp t) x ) +let get_state p = + match Promise.state p with + | Unresolved _ -> `Unresolved + | Broken ex -> `Broken ex + | Fulfilled x -> `Fulfilled x + let test_promise () = Eunix.run @@ fun () -> let p, r = Promise.create () in - Alcotest.(check (state string)) "Initially unresolved" (Promise.state p) `Unresolved; + Alcotest.(check (state string)) "Initially unresolved" (get_state p) `Unresolved; let thread = Eunix.fork (fun () -> Promise.await p) in Promise.fulfill r "ok"; - Alcotest.(check (state string)) "Resolved OK" (Promise.state p) (`Fulfilled "ok"); - Alcotest.(check (state string)) "Thread unresolved" (Promise.state thread) `Unresolved; + Alcotest.(check (state string)) "Resolved OK" (get_state p) (`Fulfilled "ok"); + Alcotest.(check (state string)) "Thread unresolved" (get_state thread) `Unresolved; yield (); - Alcotest.(check (state string)) "Thread resolved" (Promise.state thread) @@ `Fulfilled "ok"; + Alcotest.(check (state string)) "Thread resolved" (get_state thread) @@ `Fulfilled "ok"; let result = Promise.await thread in Alcotest.(check string) "Await result" result "ok" let test_promise_exn () = Eunix.run @@ fun () -> let p, r = Promise.create () in - Alcotest.(check (state reject)) "Initially unresolved" (Promise.state p) `Unresolved; + Alcotest.(check (state reject)) "Initially unresolved" (get_state p) `Unresolved; let thread = Eunix.fork (fun () -> Promise.await p) in Promise.break r (Failure "test"); - Alcotest.(check (state reject)) "Broken" (Promise.state p) @@ `Broken (Failure "test"); - Alcotest.(check (state reject)) "Thread unresolved" (Promise.state thread) `Unresolved; + Alcotest.(check (state reject)) "Broken" (get_state p) @@ `Broken (Failure "test"); + Alcotest.(check (state reject)) "Thread unresolved" (get_state thread) `Unresolved; yield (); - Alcotest.(check (state reject)) "Thread broken" (Promise.state thread) @@ `Broken (Failure "test"); + Alcotest.(check (state reject)) "Thread broken" (get_state thread) @@ `Broken (Failure "test"); match Promise.await thread with | `Cant_happen -> assert false | exception (Failure msg) -> Alcotest.(check string) "Await result" msg "test" @@ -58,7 +64,7 @@ let test_poll_add () = Eunix.await_writable w; let sent = Unix.write w (Bytes.of_string "!") 0 1 in assert (sent = 1); - let result = Eunix.Promise.await thread in + let result = Promise.await thread in Alcotest.(check string) "Received data" "!" result let test_poll_add_busy () = @@ -69,9 +75,9 @@ let test_poll_add_busy () = Eunix.yield (); let sent = Unix.write w (Bytes.of_string "!!") 0 2 in assert (sent = 2); - let a = Eunix.Promise.await a in + let a = Promise.await a in Alcotest.(check string) "Received data" "!" a; - let b = Eunix.Promise.await b in + let b = Promise.await b in Alcotest.(check string) "Received data" "!" b let () =