diff --git a/README.md b/README.md index e359c07..d289ec7 100644 --- a/README.md +++ b/README.md @@ -59,8 +59,7 @@ It is able to run a web-server with good performance, but most features are stil ## Structure of the code -- `fibreslib` provides concurrency primitives (promises, semaphores, etc). -- `eio` provides a high-level, cross-platform OS API. +- `eio` provides concurrency primitives (promises, etc), and a high-level, cross-platform OS API. - `eunix` provides a Linux io-uring backend for these APIs, plus a low-level API that can be used directly (in non-portable code). - `eio_main` selects an appropriate backend (e.g. `eunix`), depending on your platform. @@ -86,11 +85,11 @@ opam depext -i eio_main utop ``` To try out the examples interactively, run `utop` and `require` the `eio_main` library. -It is also convenient to open the `Fibreslib` module: +It is also convenient to open the `Eio.Std` module: ```ocaml # #require "eio_main";; -# open Fibreslib;; +# open Eio.Std;; ``` This function writes a greeting to stdout: diff --git a/dune-project b/dune-project index 0c441ea..02de301 100644 --- a/dune-project +++ b/dune-project @@ -12,7 +12,8 @@ "An effect-based IO API for multicore OCaml with fibres.") (depends (ctf (= :version)) - (fibreslib (= :version)) + (cstruct (>= 6.0.0)) + lwt-dllist (alcotest (and (>= 1.4.0) :with-test)))) (package (name eunix) @@ -22,7 +23,6 @@ (depends (ocaml-variants (= "4.12.0+domains+effects")) (ctf (= :version)) - (fibreslib (= :version)) (eio (= :version)) (alcotest (and (>= 1.4.0) :with-test)) (mdx :with-test) @@ -31,14 +31,6 @@ (bigstringaf (>= 0.7.0)) uring (bheap (>= 2.0.0)))) -(package - (name fibreslib) - (synopsis "Effect-based synchronisation primitives") - (description "An effect-based API for promises and other synchronisation primitives.") - (depends - (ctf (= :version)) - lwt-dllist - (alcotest (and (>= 1.4.0) :with-test)))) (package (name ctf) (synopsis "CTF tracing") diff --git a/eio.opam b/eio.opam index 120ec80..ea9a516 100644 --- a/eio.opam +++ b/eio.opam @@ -10,7 +10,8 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues" depends: [ "dune" {>= "2.8"} "ctf" {= version} - "fibreslib" {= version} + "cstruct" {>= "6.0.0"} + "lwt-dllist" "alcotest" {>= "1.4.0" & with-test} "odoc" {with-doc} ] diff --git a/eunix.opam b/eunix.opam index f723277..2ffac52 100644 --- a/eunix.opam +++ b/eunix.opam @@ -11,7 +11,6 @@ depends: [ "dune" {>= "2.8"} "ocaml-variants" {= "4.12.0+domains+effects"} "ctf" {= version} - "fibreslib" {= version} "eio" {= version} "alcotest" {>= "1.4.0" & with-test} "mdx" {with-test} diff --git a/fibreslib.opam b/fibreslib.opam deleted file mode 100644 index a11ce36..0000000 --- a/fibreslib.opam +++ /dev/null @@ -1,32 +0,0 @@ -# This file is generated by dune, edit dune-project instead -opam-version: "2.0" -synopsis: "Effect-based synchronisation primitives" -description: - "An effect-based API for promises and other synchronisation primitives." -maintainer: ["anil@recoil.org"] -authors: ["Anil Madhavapeddy" "Thomas Leonard"] -license: "ISC" -homepage: "https://github.com/ocaml-multicore/eio" -bug-reports: "https://github.com/ocaml-multicore/eio/issues" -depends: [ - "dune" {>= "2.8"} - "ctf" {= version} - "lwt-dllist" - "alcotest" {>= "1.4.0" & with-test} - "odoc" {with-doc} -] -build: [ - ["dune" "subst"] {dev} - [ - "dune" - "build" - "-p" - name - "-j" - jobs - "@install" - "@runtest" {with-test} - "@doc" {with-doc} - ] -] -dev-repo: "git+https://github.com/ocaml-multicore/eio.git" diff --git a/fibreslib/dune b/fibreslib/dune deleted file mode 100644 index 8c9ceb9..0000000 --- a/fibreslib/dune +++ /dev/null @@ -1,5 +0,0 @@ -(library - (name fibreslib) - (public_name fibreslib) - (libraries ctf lwt-dllist) - (flags (:standard -w -50))) diff --git a/fibreslib/fibreslib.ml b/fibreslib/fibreslib.ml deleted file mode 100644 index 58b3aae..0000000 --- a/fibreslib/fibreslib.ml +++ /dev/null @@ -1,23 +0,0 @@ -module Promise = Promise -module Fibre = Fibre -module Semaphore = Semaphore -module Switch = Switch - -let traceln ?__POS__ fmt = - fmt |> Format.kasprintf (fun msg -> - Ctf.label msg; - match __POS__ with - | Some (file, lnum, _, _) -> Format.printf "%s:%d %s@." file lnum msg - | None -> Format.printf "%s@." msg - ) - -module Fibre_impl = struct - module Effects = struct - effect Await = Switch.Await - effect Fork = Fibre.Fork - effect Fork_ignore = Fibre.Fork_ignore - effect Yield = Fibre.Yield - end - module Waiters = Waiters - module Switch = Switch -end diff --git a/fibreslib/fibreslib.mli b/fibreslib/fibreslib.mli deleted file mode 100644 index ca705f4..0000000 --- a/fibreslib/fibreslib.mli +++ /dev/null @@ -1,234 +0,0 @@ -(** Controlling the lifetime of fibres (groups, exceptions, cancellations, timeouts). *) -module Switch : sig - type t - (** A switch controls a group of fibres. - Once a switch is turned off, all activities in that context should cancel themselves. *) - - exception Multiple_exceptions of exn list - - exception Cancelled of exn - (** [Cancelled ex] indicates that the switch was turned off with exception [ex]. - It is usually not necessary to report a [Cancelled] exception to the user, - as the original problem will be handled elsewhere. *) - - val top : (t -> 'a) -> 'a - (** [top fn] runs [fn] with a fresh top-level switch (initially on). - When [fn] exits, [top] waits for all operations registered with the switch to finish - (it does not turn the switch off itself). - If the switch is turned off before it returns, [top] re-raises the switch's exception(s). - @raise Multiple_exceptions If [turn_off] is called more than once. *) - - val sub : ?on_release:(unit -> unit) -> sw:t -> on_error:(exn -> 'a) -> (t -> 'a) -> 'a - (** [sub ~sw ~on_error fn] is like [top fn], but the new switch is a child of [sw], so that - cancelling [sw] also cancels the child (but not the other way around). - If [fn] raises an exception then it is passed to [on_error]. - If you only want to use [sub] to wait for a group of threads to finish, but not to contain - errors, you can use [~on_error:raise]. - @param on_release Register this function with [Switch.on_release sub] once the sub-switch is created. - If creating the sub-switch fails, run it immediately. *) - - val check : t -> unit - (** [check t] checks that [t] is still on. - @raise Cancelled If the switch is off. *) - - val get_error : t -> exn option - (** [get_error t] is like [check t] except that it returns the exception instead of raising it. - If [t] is finished, this returns (rather than raising) the [Invalid_argument] exception too. *) - - val turn_off : t -> exn -> unit - (** [turn_off t ex] turns off [t], with reason [ex]. - It returns immediately, without waiting for the shutdown actions to complete. - If [t] is already off then [ex] is added to the list of exceptions (unless - [ex] is [Cancelled] or identical to the original exception, in which case - it is ignored). *) - - val on_release : t -> (unit -> unit) -> unit - (** [on_release t fn] registers [fn] to be called once [t]'s main function has returned - and all fibres have finished. - If [fn] raises an exception, it is passed to [turn_off]. - Release handlers are run in LIFO order, in series. - If you want to allow other release handlers to run concurrently, you can start the release - operation and then call [on_release] again from within [fn] to register a function to await the result. - This will be added to a fresh batch of handlers, run after the original set have finished. - Note that [fn] must work even if the switch has been turned off, - so using [sub t] or similar within [fn] is usually a bad idea. *) - - val on_release_cancellable : t -> (unit -> unit) -> (unit -> unit) - (** Like [on_release], but returns a function that can be called to remove the handler. - Calling this more than once has no effect. *) -end - -module Promise : sig - type 'a t - (** An ['a t] is a promise for a value of type ['a]. *) - - type 'a u - (** An ['a u] is a resolver for a promise of type ['a]. *) - - val create : ?label:string -> unit -> 'a t * 'a u - (** [create ()] is a fresh promise/resolver pair. - The promise is initially unresolved. *) - - val await : ?sw:Switch.t -> 'a t -> 'a - (** [await t] blocks until [t] is resolved. - If [t] is already resolved then this returns immediately. - If [t] is broken, it raises the exception. - @param sw Cancel wait if [sw] is turned off. *) - - val await_result : ?sw:Switch.t -> 'a t -> ('a, exn) result - (** [await_result t] is like [await t], but returns [Error ex] if [t] is broken - instead of raising an exception. - Note that turning off [sw] still raises an exception. *) - - val fulfill : 'a u -> 'a -> unit - (** [fulfill u v] successfully resolves [u]'s promise with the value [v]. - Any threads waiting for the result will be added to the run queue. *) - - val break : 'a u -> exn -> unit - (** [break u ex] resolves [u]'s promise with the exception [ex]. - Any threads waiting for the result will be added to the run queue. *) - - val resolve : 'a t -> ('a, exn) result -> unit - (** [resolve t (Ok x)] is [fulfill t x] and - [resolve t (Error ex)] is [break t ex]. *) - - val fulfilled : 'a -> 'a t - (** [fulfilled x] is a promise that is already fulfilled with result [x]. *) - - val broken : exn -> 'a t - (** [broken x] is a promise that is already broken with exception [ex]. *) - - type 'a waiters - - type 'a state = - | Unresolved of 'a waiters - | Fulfilled of 'a - | Broken of exn - - val state : 'a t -> 'a state - (** [state t] is the current state of [t]. *) - - val is_resolved : 'a t -> bool - (** [is_resolved t] is [true] iff [state t] is [Fulfilled] or [Broken]. *) - - val create_with_id : Ctf.id -> 'a t * 'a u - (** Like [create], but the caller creates the tracing ID. - This can be useful when implementing other primitives that use promises internally, - to give them a different type in the trace output. *) -end - -module Fibre : sig - val both : sw:Switch.t -> (unit -> unit) -> (unit -> unit) -> unit - (** [both ~sw f g] runs [f ()] and [g ()] concurrently. - If either raises an exception, [sw] is turned off. - [both] waits for both functions to finish even if one raises. *) - - val fork_ignore : sw:Switch.t -> (unit -> unit) -> unit - (** [fork_ignore ~sw fn] runs [fn ()] in a new fibre, but does not wait for it to complete. - The new fibre is attached to [sw] (which can't finish until the fibre ends). - If the fibre raises an exception, [sw] is turned off. - If [sw] is already off then [fn] fails immediately, but the calling thread continues. *) - - val fork_sub_ignore : ?on_release:(unit -> unit) -> sw:Switch.t -> on_error:(exn -> unit) -> (Switch.t -> unit) -> unit - (** [fork_sub_ignore ~sw ~on_error fn] is like [fork_ignore], but it creates a new sub-switch for the fibre. - This means that you can cancel the child switch without cancelling the parent. - This is a convenience function for running {!Switch.sub} inside a {!fork_ignore}. *) - - val fork : sw:Switch.t -> exn_turn_off:bool -> (unit -> 'a) -> 'a Promise.t - (** [fork ~sw ~exn_turn_off fn] starts running [fn ()] in a new fibre and returns a promise for its result. - The new fibre is attached to [sw] (which can't finish until the fibre ends). - @param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *) - - val yield : ?sw:Switch.t -> unit -> unit - (** [yield ()] asks the scheduler to switch to the next runnable task. - The current task remains runnable, but goes to the back of the queue. - @param sw Ensure that the switch is still on before returning. *) -end - -(** A counting semaphore for use within a single domain. - The API is based on OCaml's [Semaphore.Counting]. *) -module Semaphore : sig - type t - (** The type of counting semaphores. *) - - val make : int -> t - (** [make n] returns a new counting semaphore, with initial value [n]. - The initial value [n] must be nonnegative. - @raise Invalid_argument if [n < 0] *) - - val release : t -> unit - (** [release t] increments the value of semaphore [t]. - If other fibres are waiting on [t], the one that has been waiting the longest is resumed. - @raise Sys_error if the value of the semaphore would overflow [max_int] *) - - val acquire : t -> unit - (** [acquire t] blocks the calling fibre until the value of semaphore [t] - is not zero, then atomically decrements the value of [t] and returns. *) - - val get_value : t -> int - (** [get_value t] returns the current value of semaphore [t]. *) -end - -val traceln : - ?__POS__:string * int * int * int -> - ('a, Format.formatter, unit, unit) format4 -> 'a -(** [traceln fmt] outputs a debug message (typically to stderr). - Trace messages are printed by default and do not require logging to be configured first. - The message is printed with a newline, and is flushed automatically. - This is intended for quick debugging rather than for production code. - Examples: - {[ - traceln "x = %d" x; - traceln "x = %d" x ~__POSS__; (* With location information *) - |} - @param __POS__ Display [__POS__] as the location of the [traceln] call. *) - -(** API for use by the scheduler implementation. *) -module Fibre_impl : sig - module Waiters : sig - type 'a t - (** A queue of callbacks waiting for a value of type ['a]. *) - - type waiter - - val null : waiter - (** A dummy waiter that does nothing when removed. *) - - val add_waiter : 'a t -> (('a, exn) result -> unit) -> waiter - (** [add_waiter t fn] adds [fn] to the queue of callbacks to be invoked when the wait is over. - [fn] will typically add some saved continuation to the runqueue. *) - - val remove_waiter : waiter -> unit - (** [remove_waiter w] removes a waiter previously added with e.g. [add_waiter]. - If the waiter is already removed, this does nothing. *) - end - - module Switch : sig - type t = Switch.t - - val add_cancel_hook : t -> (exn -> unit) -> Waiters.waiter - (** [add_cancel_hook t cancel] registers shutdown function [cancel] with [t]. - When [t] is turned off, [cancel] is called. - If [t] is already off, it calls [cancel] immediately. *) - - val add_cancel_hook_opt : t option -> (exn -> unit) -> Waiters.waiter - (**[add_cancel_hook_opt (Some t)] is [add_cancel_hook t]. - If called with [None], it does nothing and returns a dummy waiter. *) - end - - module Effects : sig - effect Await : Switch.t option * Ctf.id * 'a Waiters.t -> 'a - (** Performed when a fibre must be suspended (e.g. because it called {!Promise.await} on an unresolved promise). - The effect handler should add itself to the list of waiters and block until its callback is invoked. - The ID is used for tracing. *) - - effect Fork : (unit -> 'a) -> 'a Promise.t - (** See {!Fibre.fork} *) - - effect Fork_ignore : (unit -> unit) -> unit - (** See {!Fibre.fork_ignore} *) - - effect Yield : unit - (** See {!Fibre.yield} *) - end -end diff --git a/lib_eio/dune b/lib_eio/dune index 6a88a1e..6e23e47 100644 --- a/lib_eio/dune +++ b/lib_eio/dune @@ -1,4 +1,5 @@ (library (name eio) (public_name eio) - (libraries cstruct fibreslib)) + (libraries cstruct ctf lwt-dllist) + (flags (:standard -w -50))) diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index 6755260..da40f0d 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -1,4 +1,20 @@ -open Fibreslib +module Std = struct + module Promise = Promise + module Fibre = Fibre + module Switch = Switch + + let traceln ?__POS__ fmt = + fmt |> Format.kasprintf (fun msg -> + Ctf.label msg; + match __POS__ with + | Some (file, lnum, _, _) -> Format.printf "%s:%d %s@." file lnum msg + | None -> Format.printf "%s@." msg + ) +end + +module Semaphore = Semaphore + +open Std module Generic = struct type 'a ty = .. @@ -149,3 +165,14 @@ module Stdenv = struct let network (t : ) = t#network end + +module Private = struct + module Effects = struct + effect Await = Switch.Await + effect Fork = Fibre.Fork + effect Fork_ignore = Fibre.Fork_ignore + effect Yield = Fibre.Yield + end + module Waiters = Waiters + module Switch = Switch +end diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index a7c48b1..ccd949f 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -1,4 +1,198 @@ -open Fibreslib +(** Effects based parallel IO for OCaml *) + +(** {1 Concurrency primitives} *) + +(** Commonly used standard features. This module is intended to be [open]ed. *) +module Std : sig + (** Controlling the lifetime of fibres (groups, exceptions, cancellations, timeouts). *) + module Switch : sig + type t + (** A switch controls a group of fibres. + Once a switch is turned off, all activities in that context should cancel themselves. *) + + exception Multiple_exceptions of exn list + + exception Cancelled of exn + (** [Cancelled ex] indicates that the switch was turned off with exception [ex]. + It is usually not necessary to report a [Cancelled] exception to the user, + as the original problem will be handled elsewhere. *) + + val top : (t -> 'a) -> 'a + (** [top fn] runs [fn] with a fresh top-level switch (initially on). + When [fn] exits, [top] waits for all operations registered with the switch to finish + (it does not turn the switch off itself). + If the switch is turned off before it returns, [top] re-raises the switch's exception(s). + @raise Multiple_exceptions If [turn_off] is called more than once. *) + + val sub : ?on_release:(unit -> unit) -> sw:t -> on_error:(exn -> 'a) -> (t -> 'a) -> 'a + (** [sub ~sw ~on_error fn] is like [top fn], but the new switch is a child of [sw], so that + cancelling [sw] also cancels the child (but not the other way around). + If [fn] raises an exception then it is passed to [on_error]. + If you only want to use [sub] to wait for a group of threads to finish, but not to contain + errors, you can use [~on_error:raise]. + @param on_release Register this function with [Switch.on_release sub] once the sub-switch is created. + If creating the sub-switch fails, run it immediately. *) + + val check : t -> unit + (** [check t] checks that [t] is still on. + @raise Cancelled If the switch is off. *) + + val get_error : t -> exn option + (** [get_error t] is like [check t] except that it returns the exception instead of raising it. + If [t] is finished, this returns (rather than raising) the [Invalid_argument] exception too. *) + + val turn_off : t -> exn -> unit + (** [turn_off t ex] turns off [t], with reason [ex]. + It returns immediately, without waiting for the shutdown actions to complete. + If [t] is already off then [ex] is added to the list of exceptions (unless + [ex] is [Cancelled] or identical to the original exception, in which case + it is ignored). *) + + val on_release : t -> (unit -> unit) -> unit + (** [on_release t fn] registers [fn] to be called once [t]'s main function has returned + and all fibres have finished. + If [fn] raises an exception, it is passed to [turn_off]. + Release handlers are run in LIFO order, in series. + If you want to allow other release handlers to run concurrently, you can start the release + operation and then call [on_release] again from within [fn] to register a function to await the result. + This will be added to a fresh batch of handlers, run after the original set have finished. + Note that [fn] must work even if the switch has been turned off, + so using [sub t] or similar within [fn] is usually a bad idea. *) + + val on_release_cancellable : t -> (unit -> unit) -> (unit -> unit) + (** Like [on_release], but returns a function that can be called to remove the handler. + Calling this more than once has no effect. *) + end + + module Promise : sig + type 'a t + (** An ['a t] is a promise for a value of type ['a]. *) + + type 'a u + (** An ['a u] is a resolver for a promise of type ['a]. *) + + val create : ?label:string -> unit -> 'a t * 'a u + (** [create ()] is a fresh promise/resolver pair. + The promise is initially unresolved. *) + + val await : ?sw:Switch.t -> 'a t -> 'a + (** [await t] blocks until [t] is resolved. + If [t] is already resolved then this returns immediately. + If [t] is broken, it raises the exception. + @param sw Cancel wait if [sw] is turned off. *) + + val await_result : ?sw:Switch.t -> 'a t -> ('a, exn) result + (** [await_result t] is like [await t], but returns [Error ex] if [t] is broken + instead of raising an exception. + Note that turning off [sw] still raises an exception. *) + + val fulfill : 'a u -> 'a -> unit + (** [fulfill u v] successfully resolves [u]'s promise with the value [v]. + Any threads waiting for the result will be added to the run queue. *) + + val break : 'a u -> exn -> unit + (** [break u ex] resolves [u]'s promise with the exception [ex]. + Any threads waiting for the result will be added to the run queue. *) + + val resolve : 'a t -> ('a, exn) result -> unit + (** [resolve t (Ok x)] is [fulfill t x] and + [resolve t (Error ex)] is [break t ex]. *) + + val fulfilled : 'a -> 'a t + (** [fulfilled x] is a promise that is already fulfilled with result [x]. *) + + val broken : exn -> 'a t + (** [broken x] is a promise that is already broken with exception [ex]. *) + + type 'a waiters + + type 'a state = + | Unresolved of 'a waiters + | Fulfilled of 'a + | Broken of exn + + val state : 'a t -> 'a state + (** [state t] is the current state of [t]. *) + + val is_resolved : 'a t -> bool + (** [is_resolved t] is [true] iff [state t] is [Fulfilled] or [Broken]. *) + + val create_with_id : Ctf.id -> 'a t * 'a u + (** Like [create], but the caller creates the tracing ID. + This can be useful when implementing other primitives that use promises internally, + to give them a different type in the trace output. *) + end + + module Fibre : sig + val both : sw:Switch.t -> (unit -> unit) -> (unit -> unit) -> unit + (** [both ~sw f g] runs [f ()] and [g ()] concurrently. + If either raises an exception, [sw] is turned off. + [both] waits for both functions to finish even if one raises. *) + + val fork_ignore : sw:Switch.t -> (unit -> unit) -> unit + (** [fork_ignore ~sw fn] runs [fn ()] in a new fibre, but does not wait for it to complete. + The new fibre is attached to [sw] (which can't finish until the fibre ends). + If the fibre raises an exception, [sw] is turned off. + If [sw] is already off then [fn] fails immediately, but the calling thread continues. *) + + val fork_sub_ignore : ?on_release:(unit -> unit) -> sw:Switch.t -> on_error:(exn -> unit) -> (Switch.t -> unit) -> unit + (** [fork_sub_ignore ~sw ~on_error fn] is like [fork_ignore], but it creates a new sub-switch for the fibre. + This means that you can cancel the child switch without cancelling the parent. + This is a convenience function for running {!Switch.sub} inside a {!fork_ignore}. *) + + val fork : sw:Switch.t -> exn_turn_off:bool -> (unit -> 'a) -> 'a Promise.t + (** [fork ~sw ~exn_turn_off fn] starts running [fn ()] in a new fibre and returns a promise for its result. + The new fibre is attached to [sw] (which can't finish until the fibre ends). + @param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *) + + val yield : ?sw:Switch.t -> unit -> unit + (** [yield ()] asks the scheduler to switch to the next runnable task. + The current task remains runnable, but goes to the back of the queue. + @param sw Ensure that the switch is still on before returning. *) + end + + val traceln : + ?__POS__:string * int * int * int -> + ('a, Format.formatter, unit, unit) format4 -> 'a + (** [traceln fmt] outputs a debug message (typically to stderr). + Trace messages are printed by default and do not require logging to be configured first. + The message is printed with a newline, and is flushed automatically. + This is intended for quick debugging rather than for production code. + Examples: + {[ + traceln "x = %d" x; + traceln "x = %d" x ~__POSS__; (* With location information *) + |} + @param __POS__ Display [__POS__] as the location of the [traceln] call. *) +end + +open Std + +(** A counting semaphore for use within a single domain. + The API is based on OCaml's [Semaphore.Counting]. *) +module Semaphore : sig + type t + (** The type of counting semaphores. *) + + val make : int -> t + (** [make n] returns a new counting semaphore, with initial value [n]. + The initial value [n] must be nonnegative. + @raise Invalid_argument if [n < 0] *) + + val release : t -> unit + (** [release t] increments the value of semaphore [t]. + If other fibres are waiting on [t], the one that has been waiting the longest is resumed. + @raise Sys_error if the value of the semaphore would overflow [max_int] *) + + val acquire : t -> unit + (** [acquire t] blocks the calling fibre until the value of semaphore [t] + is not zero, then atomically decrements the value of [t] and returns. *) + + val get_value : t -> int + (** [get_value t] returns the current value of semaphore [t]. *) +end + +(** {1 Cross-platform OS API} *) (** A base class for objects that can be queried at runtime for extra features. *) module Generic : sig @@ -131,3 +325,55 @@ module Stdenv : sig val network : -> 'a end + +(** {1 Provider API for OS schedulers} *) + +(** API for use by the scheduler implementation. *) +module Private : sig + module Waiters : sig + type 'a t + (** A queue of callbacks waiting for a value of type ['a]. *) + + type waiter + + val null : waiter + (** A dummy waiter that does nothing when removed. *) + + val add_waiter : 'a t -> (('a, exn) result -> unit) -> waiter + (** [add_waiter t fn] adds [fn] to the queue of callbacks to be invoked when the wait is over. + [fn] will typically add some saved continuation to the runqueue. *) + + val remove_waiter : waiter -> unit + (** [remove_waiter w] removes a waiter previously added with e.g. [add_waiter]. + If the waiter is already removed, this does nothing. *) + end + + module Switch : sig + type t = Switch.t + + val add_cancel_hook : t -> (exn -> unit) -> Waiters.waiter + (** [add_cancel_hook t cancel] registers shutdown function [cancel] with [t]. + When [t] is turned off, [cancel] is called. + If [t] is already off, it calls [cancel] immediately. *) + + val add_cancel_hook_opt : t option -> (exn -> unit) -> Waiters.waiter + (**[add_cancel_hook_opt (Some t)] is [add_cancel_hook t]. + If called with [None], it does nothing and returns a dummy waiter. *) + end + + module Effects : sig + effect Await : Switch.t option * Ctf.id * 'a Waiters.t -> 'a + (** Performed when a fibre must be suspended (e.g. because it called {!Promise.await} on an unresolved promise). + The effect handler should add itself to the list of waiters and block until its callback is invoked. + The ID is used for tracing. *) + + effect Fork : (unit -> 'a) -> 'a Promise.t + (** See {!Fibre.fork} *) + + effect Fork_ignore : (unit -> unit) -> unit + (** See {!Fibre.fork_ignore} *) + + effect Yield : unit + (** See {!Fibre.yield} *) + end +end diff --git a/fibreslib/fibre.ml b/lib_eio/fibre.ml similarity index 100% rename from fibreslib/fibre.ml rename to lib_eio/fibre.ml diff --git a/fibreslib/promise.ml b/lib_eio/promise.ml similarity index 100% rename from fibreslib/promise.ml rename to lib_eio/promise.ml diff --git a/fibreslib/semaphore.ml b/lib_eio/semaphore.ml similarity index 100% rename from fibreslib/semaphore.ml rename to lib_eio/semaphore.ml diff --git a/fibreslib/switch.ml b/lib_eio/switch.ml similarity index 100% rename from fibreslib/switch.ml rename to lib_eio/switch.ml diff --git a/fibreslib/waiters.ml b/lib_eio/waiters.ml similarity index 100% rename from fibreslib/waiters.ml rename to lib_eio/waiters.ml diff --git a/fibreslib/waiters.mli b/lib_eio/waiters.mli similarity index 100% rename from fibreslib/waiters.mli rename to lib_eio/waiters.mli diff --git a/lib_eunix/dune b/lib_eunix/dune index a54c182..9e9ce87 100644 --- a/lib_eunix/dune +++ b/lib_eunix/dune @@ -1,4 +1,4 @@ (library (name eunix) (public_name eunix) - (libraries eio fibreslib unix uring bheap logs fmt bigstringaf ctf)) + (libraries eio unix uring bheap logs fmt bigstringaf ctf)) diff --git a/lib_eunix/eunix.ml b/lib_eunix/eunix.ml index 6177586..613b487 100644 --- a/lib_eunix/eunix.ml +++ b/lib_eunix/eunix.ml @@ -17,7 +17,7 @@ let src = Logs.Src.create "eunix" ~doc:"Effect-based IO system" module Log = (val Logs.src_log src : Logs.LOG) -open Fibreslib +open Eio.Std (* SIGPIPE makes no sense in a modern application. *) let () = Sys.(set_signal sigpipe Signal_ignore) @@ -86,7 +86,7 @@ type rw_req = { sw : Switch.t option; } -type cancel_hook = Fibre_impl.Waiters.waiter ref +type cancel_hook = Eio.Private.Waiters.waiter ref (* Type of user-data attached to jobs. *) type io_job = @@ -156,7 +156,7 @@ let cancel job = If [sw] is already off, it schedules [action] to be discontinued. @return Whether to retry the operation later, once there is space. *) let with_cancel_hook ?sw ~action st fn = - let release = ref Fibre_impl.Waiters.null in + let release = ref Eio.Private.Waiters.null in match sw with | None -> fn release = None | Some sw -> @@ -166,7 +166,7 @@ let with_cancel_hook ?sw ~action st fn = match fn release with | None -> true | Some job -> - release := Fibre_impl.Switch.add_cancel_hook sw (fun _ -> cancel job); + release := Eio.Private.Switch.add_cancel_hook sw (fun _ -> cancel job); false let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; sw; action} as req) = @@ -320,27 +320,27 @@ and handle_complete st ~runnable result = match runnable with | Read (req, cancel) -> Log.debug (fun l -> l "read returned"); - Fibre_impl.Waiters.remove_waiter !cancel; + Eio.Private.Waiters.remove_waiter !cancel; complete_rw_req st req result | Write (req, cancel) -> Log.debug (fun l -> l "write returned"); - Fibre_impl.Waiters.remove_waiter !cancel; + Eio.Private.Waiters.remove_waiter !cancel; complete_rw_req st req result | Poll_add (k, cancel) -> Log.debug (fun l -> l "poll_add returned"); - Fibre_impl.Waiters.remove_waiter !cancel; + Eio.Private.Waiters.remove_waiter !cancel; Suspended.continue k result | Splice (k, cancel) -> Log.debug (fun l -> l "splice returned"); - Fibre_impl.Waiters.remove_waiter !cancel; + Eio.Private.Waiters.remove_waiter !cancel; Suspended.continue k result | Connect (k, cancel) -> Log.debug (fun l -> l "connect returned"); - Fibre_impl.Waiters.remove_waiter !cancel; + Eio.Private.Waiters.remove_waiter !cancel; Suspended.continue k result | Accept (k, cancel) -> Log.debug (fun l -> l "accept returned"); - Fibre_impl.Waiters.remove_waiter !cancel; + Eio.Private.Waiters.remove_waiter !cancel; Suspended.continue k result | Close k -> Log.debug (fun l -> l "close returned"); @@ -665,7 +665,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let k = { Suspended.k; tid } in enqueue_accept ~sw st k fd client_addr; schedule st - | effect Fibre_impl.Effects.Yield k -> + | effect Eio.Private.Effects.Yield k -> let k = { Suspended.k; tid } in enqueue_thread st k (); schedule st @@ -673,11 +673,11 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let k = { Suspended.k; tid } in Zzz.sleep sleep_q d k; schedule st - | effect (Fibre_impl.Effects.Await (sw, pid, q)) k -> + | effect (Eio.Private.Effects.Await (sw, pid, q)) k -> let k = { Suspended.k; tid } in let waiters = Queue.create () in let when_resolved r = - Queue.iter Fibre_impl.Waiters.remove_waiter waiters; + Queue.iter Eio.Private.Waiters.remove_waiter waiters; match r with | Ok v -> Ctf.note_read ~reader:tid pid; @@ -688,13 +688,13 @@ let run ?(queue_depth=64) ?(block_size=4096) main = in let cancel ex = when_resolved (Error ex) in sw |> Option.iter (fun sw -> - let cancel_waiter = Fibre_impl.Switch.add_cancel_hook sw cancel in + let cancel_waiter = Eio.Private.Switch.add_cancel_hook sw cancel in Queue.add cancel_waiter waiters; ); - let resolved_waiter = Fibre_impl.Waiters.add_waiter q when_resolved in + let resolved_waiter = Eio.Private.Waiters.add_waiter q when_resolved in Queue.add resolved_waiter waiters; schedule st - | effect (Fibre_impl.Effects.Fork f) k -> + | effect (Eio.Private.Effects.Fork f) k -> let k = { Suspended.k; tid } in let id = Ctf.mint_id () in Ctf.note_created id Ctf.Task; @@ -709,7 +709,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex); Promise.break resolver ex ) - | effect (Fibre_impl.Effects.Fork_ignore f) k -> + | effect (Eio.Private.Effects.Fork_ignore f) k -> let k = { Suspended.k; tid } in enqueue_thread st k (); let child = Ctf.note_fork () in diff --git a/lib_eunix/eunix.mli b/lib_eunix/eunix.mli index 2aa9794..b2f0f88 100644 --- a/lib_eunix/eunix.mli +++ b/lib_eunix/eunix.mli @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Fibreslib +open Eio.Std type t diff --git a/tests/basic_eunix.ml b/tests/basic_eunix.ml index f1d647e..2a01ad6 100644 --- a/tests/basic_eunix.ml +++ b/tests/basic_eunix.ml @@ -1,7 +1,7 @@ (* basic tests using effects *) open Eunix -open Fibreslib +open Eio.Std module Int63 = Optint.Int63 let setup_log level = diff --git a/tests/eurcp_lib.ml b/tests/eurcp_lib.ml index bfb1a22..20712c6 100644 --- a/tests/eurcp_lib.ml +++ b/tests/eurcp_lib.ml @@ -1,6 +1,6 @@ (* cp(1) built with effects. *) -open Fibreslib +open Eio.Std module U = Eunix module Int63 = Optint.Int63 diff --git a/tests/test.ml b/tests/test.ml index b3cc220..250ccc2 100644 --- a/tests/test.ml +++ b/tests/test.ml @@ -1,4 +1,4 @@ -open Fibreslib +open Eio.Std let () = Logs.(set_level ~all:true (Some Debug)); @@ -106,6 +106,7 @@ let test_fork_ignore () = Alcotest.(check int) "Forked code ran" 2 !i let test_semaphore () = + let module Semaphore = Eio.Semaphore in Eunix.run ~queue_depth:1 @@ fun _stdenv -> Switch.top @@ fun sw -> let running = ref 0 in @@ -135,6 +136,7 @@ let test_semaphore () = Semaphore.release sem let test_semaphore_no_waiter () = + let module Semaphore = Eio.Semaphore in Eunix.run ~queue_depth:2 @@ fun _stdenv -> Switch.top @@ fun sw -> let sem = Semaphore.make 0 in diff --git a/tests/test_network.md b/tests/test_network.md index 5f3cfee..2364981 100644 --- a/tests/test_network.md +++ b/tests/test_network.md @@ -5,7 +5,7 @@ ``` ```ocaml -open Fibreslib +open Eio.Std let run (fn : network:Eio.Network.t -> Switch.t -> unit) = try diff --git a/tests/test_switch.md b/tests/test_switch.md index bc4198c..a131a2c 100644 --- a/tests/test_switch.md +++ b/tests/test_switch.md @@ -5,7 +5,7 @@ ``` ```ocaml -open Fibreslib +open Eio.Std let run (fn : Switch.t -> unit) = try @@ -153,15 +153,15 @@ Turning off a switch runs the cancel callbacks, unless they've been removed by t ```ocaml # run (fun sw -> - let h1 = Fibre_impl.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 1") in - let h2 = Fibre_impl.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 2") in - let h3 = Fibre_impl.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 3") in - Fibre_impl.Waiters.remove_waiter h2; + let h1 = Eio.Private.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 1") in + let h2 = Eio.Private.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 2") in + let h3 = Eio.Private.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 3") in + Eio.Private.Waiters.remove_waiter h2; Switch.turn_off sw (Failure "Cancelled"); - let h4 = Fibre_impl.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 4") in - Fibre_impl.Waiters.remove_waiter h1; - Fibre_impl.Waiters.remove_waiter h3; - Fibre_impl.Waiters.remove_waiter h4 + let h4 = Eio.Private.Switch.add_cancel_hook sw (fun _ -> traceln "Cancel 4") in + Eio.Private.Waiters.remove_waiter h1; + Eio.Private.Waiters.remove_waiter h3; + Eio.Private.Waiters.remove_waiter h4 ) Cancel 3 Cancel 1