Implicit cancellation

Instead of requiring every cancellable operation to pass a `~sw`
argument, give each fibre a default switch and use that. It's too easy
to forget to make something cancellable and clutters up the code.
This commit is contained in:
Thomas Leonard 2021-11-05 11:59:42 +00:00
parent 9619508c4e
commit ed2382bed5
18 changed files with 472 additions and 373 deletions

View File

@ -155,8 +155,8 @@ Here's an example running two threads of execution (fibres) concurrently:
let main _env = let main _env =
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done) (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield () done)
(fun () -> for y = 1 to 3 do traceln "y = %d" y; Fibre.yield ~sw () done);; (fun () -> for y = 1 to 3 do traceln "y = %d" y; Fibre.yield () done);;
``` ```
```ocaml ```ocaml
@ -170,12 +170,8 @@ let main _env =
- : unit = () - : unit = ()
``` ```
Notes: The two fibres run on a single core, so only one can be running at a time.
Calling an operation that performs an effect (such as `yield`) can switch to a different thread.
- The two fibres run on a single core, so only one can be running at a time.
Calling an operation that performs an effect (such as `yield`) can switch to a different thread.
- The `sw` argument is used to handle exceptions (described later).
## Tracing ## Tracing
@ -219,7 +215,7 @@ Here's what happens if one of the two threads above fails:
# Eio_main.run @@ fun _env -> # Eio_main.run @@ fun _env ->
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done) (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield () done)
(fun () -> failwith "Simulated error");; (fun () -> failwith "Simulated error");;
+x = 1 +x = 1
Exception: Failure "Simulated error". Exception: Failure "Simulated error".
@ -230,24 +226,19 @@ What happened here was:
1. The first fibre ran, printed `x = 1` and yielded. 1. The first fibre ran, printed `x = 1` and yielded.
2. The second fibre raised an exception. 2. The second fibre raised an exception.
3. `Fibre.both` caught the exception and turned off the switch. 3. `Fibre.both` caught the exception and turned off the switch.
4. The first thread's `yield` saw the switch was off and raised the exception there too. 4. The first thread's `yield` saw the switch was off and raised a `Cancelled` exception there.
5. Once both threads had finished, `Fibre.both` re-raised the exception. 5. Once both threads had finished, `Fibre.both` re-raised the exception.
Please note: turning off a switch only asks the other thread(s) to cancel.
A thread is free to ignore the switch and continue (perhaps to clean up some resources).
Any operation that can be cancelled should take a `~sw` argument.
Switches can also be used to wait for threads even when there isn't an error. e.g. Switches can also be used to wait for threads even when there isn't an error. e.g.
```ocaml ```ocaml
# Eio_main.run @@ fun _env -> # Eio_main.run @@ fun _env ->
Switch.top (fun sw -> Switch.top (fun sw ->
Fibre.fork_ignore ~sw Fibre.fork_ignore ~sw
(fun () -> for i = 1 to 3 do traceln "i = %d" i; Fibre.yield ~sw () done); (fun () -> for i = 1 to 3 do traceln "i = %d" i; Fibre.yield () done);
traceln "First thread forked"; traceln "First thread forked";
Fibre.fork_ignore ~sw Fibre.fork_ignore ~sw
(fun () -> for j = 1 to 3 do traceln "j = %d" j; Fibre.yield ~sw () done); (fun () -> for j = 1 to 3 do traceln "j = %d" j; Fibre.yield () done);
traceln "Second thread forked; top-level code is finished" traceln "Second thread forked; top-level code is finished"
); );
traceln "Switch is finished";; traceln "Switch is finished";;
@ -270,6 +261,8 @@ For example, a web-server might use one switch for the whole server and then cre
This allows you to end all fibres handling a single connection by turning off that connection's switch, This allows you to end all fibres handling a single connection by turning off that connection's switch,
or to exit the whole application using the top-level switch. or to exit the whole application using the top-level switch.
If you want to make an operation non-cancellable, wrap it in a `Switch.top` to create a fresh switch.
## Design Note: Results vs Exceptions ## Design Note: Results vs Exceptions
The OCaml standard library uses exceptions to report errors in most cases. The OCaml standard library uses exceptions to report errors in most cases.

View File

@ -7,14 +7,14 @@ module Eio_main = struct
let fake_clock real_clock = object (_ : #Eio.Time.clock) let fake_clock real_clock = object (_ : #Eio.Time.clock)
method now = !now method now = !now
method sleep_until ?sw time = method sleep_until time =
(* The fake times are all in the past, so we just ask to wait until the (* The fake times are all in the past, so we just ask to wait until the
fake time is due and it will happen immediately. If we wait for fake time is due and it will happen immediately. If we wait for
multiple times, they'll get woken in the right order. At the moment, multiple times, they'll get woken in the right order. At the moment,
the scheduler only checks for expired timers when the run-queue is the scheduler only checks for expired timers when the run-queue is
empty, so this is a convenient way to wait for the system to be idle. empty, so this is a convenient way to wait for the system to be idle.
Will need revising if we make the scheduler fair at some point. *) Will need revising if we make the scheduler fair at some point. *)
Eio.Time.sleep_until ?sw real_clock time; Eio.Time.sleep_until real_clock time;
now := max !now time now := max !now time
end end

View File

@ -30,7 +30,7 @@ module Flow = struct
type shutdown_command = [ `Receive | `Send | `All ] type shutdown_command = [ `Receive | `Send | `All ]
type read_method = .. type read_method = ..
type read_method += Read_source_buffer of (?sw:Switch.t -> (Cstruct.t list -> unit) -> unit) type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit)
class type close = object class type close = object
method close : unit method close : unit
@ -40,11 +40,11 @@ module Flow = struct
class virtual read = object class virtual read = object
method virtual read_methods : read_method list method virtual read_methods : read_method list
method virtual read_into : ?sw:Switch.t -> Cstruct.t -> int method virtual read_into : Cstruct.t -> int
end end
let read_into ?sw (t : #read) buf = let read_into (t : #read) buf =
let got = t#read_into ?sw buf in let got = t#read_into buf in
assert (got > 0); assert (got > 0);
got got
@ -61,8 +61,8 @@ module Flow = struct
inherit source inherit source
method private read_source_buffer ?sw fn = method private read_source_buffer fn =
Option.iter Switch.check sw; Fibre.yield ();
let rec aux () = let rec aux () =
match data with match data with
| [] -> raise End_of_file | [] -> raise End_of_file
@ -74,8 +74,8 @@ module Flow = struct
method read_methods = method read_methods =
[ Read_source_buffer self#read_source_buffer ] [ Read_source_buffer self#read_source_buffer ]
method read_into ?sw dst = method read_into dst =
Option.iter Switch.check sw; Fibre.yield ();
let avail, src = Cstruct.fillv ~dst ~src:data in let avail, src = Cstruct.fillv ~dst ~src:data in
if avail = 0 then raise End_of_file; if avail = 0 then raise End_of_file;
data <- src; data <- src;
@ -85,12 +85,12 @@ module Flow = struct
let string_source s = cstruct_source [Cstruct.of_string s] let string_source s = cstruct_source [Cstruct.of_string s]
class virtual write = object class virtual write = object
method virtual write : 'a. ?sw:Switch.t -> (#source as 'a) -> unit method virtual write : 'a. (#source as 'a) -> unit
end end
let copy ?sw (src : #source) (dst : #write) = dst#write ?sw src let copy (src : #source) (dst : #write) = dst#write src
let copy_string ?sw s = copy ?sw (string_source s) let copy_string s = copy (string_source s)
class virtual sink = object (_ : #Generic.t) class virtual sink = object (_ : #Generic.t)
method probe _ = None method probe _ = None
@ -101,11 +101,11 @@ module Flow = struct
object object
inherit sink inherit sink
method write ?sw src = method write src =
let buf = Cstruct.create 4096 in let buf = Cstruct.create 4096 in
try try
while true do while true do
let got = src#read_into ?sw buf in let got = src#read_into buf in
Buffer.add_string b (Cstruct.to_string ~len:got buf) Buffer.add_string b (Cstruct.to_string ~len:got buf)
done done
with End_of_file -> () with End_of_file -> ()
@ -171,14 +171,14 @@ end
module Time = struct module Time = struct
class virtual clock = object class virtual clock = object
method virtual now : float method virtual now : float
method virtual sleep_until : ?sw:Switch.t -> float -> unit method virtual sleep_until : float -> unit
end end
let now (t : #clock) = t#now let now (t : #clock) = t#now
let sleep_until ?sw (t : #clock) time = t#sleep_until ?sw time let sleep_until (t : #clock) time = t#sleep_until time
let sleep ?sw t d = sleep_until ?sw t (now t +. d) let sleep t d = sleep_until t (now t +. d)
end end
module Dir = struct module Dir = struct
@ -203,7 +203,7 @@ module Dir = struct
append:bool -> append:bool ->
create:create -> create:create ->
path -> <rw; Flow.close> path -> <rw; Flow.close>
method virtual mkdir : ?sw:Switch.t -> perm:Unix.file_perm -> path -> unit method virtual mkdir : perm:Unix.file_perm -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> t_with_close method virtual open_dir : sw:Switch.t -> path -> t_with_close
end end
and virtual t_with_close = object and virtual t_with_close = object
@ -215,7 +215,7 @@ module Dir = struct
let open_in ~sw (t:#t) = t#open_in ~sw let open_in ~sw (t:#t) = t#open_in ~sw
let open_out ~sw ?(append=false) ~create (t:#t) path = t#open_out ~sw ~append ~create path let open_out ~sw ?(append=false) ~create (t:#t) path = t#open_out ~sw ~append ~create path
let open_dir ~sw (t:#t) = t#open_dir ~sw let open_dir ~sw (t:#t) = t#open_dir ~sw
let mkdir ?sw (t:#t) = t#mkdir ?sw let mkdir (t:#t) = t#mkdir
let with_open_in ?sw (t:#t) path fn = let with_open_in ?sw (t:#t) path fn =
Switch.sub_opt sw @@ fun sw -> fn (open_in ~sw t path) Switch.sub_opt sw @@ fun sw -> fn (open_in ~sw t path)
@ -254,9 +254,12 @@ module Private = struct
type 'a enqueue = 'a Suspend.enqueue type 'a enqueue = 'a Suspend.enqueue
type _ eff += type _ eff +=
| Suspend = Suspend.Suspend | Suspend = Suspend.Suspend
| Suspend_unchecked = Suspend.Suspend_unchecked
| Fork = Fibre.Fork | Fork = Fibre.Fork
| Fork_ignore = Fibre.Fork_ignore | Fork_ignore = Fibre.Fork_ignore
| Trace = Std.Trace | Trace = Std.Trace
| Yield = Fibre.Yield
| Set_switch = Switch.Set_switch
end end
module Switch = Switch let boot_switch = Switch.boot_switch
end end

View File

@ -170,10 +170,9 @@ module Std : sig
The new fibre is attached to [sw] (which can't finish until the fibre ends). The new fibre is attached to [sw] (which can't finish until the fibre ends).
@param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *) @param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *)
val yield : ?sw:Switch.t -> unit -> unit val yield : unit -> unit
(** [yield ()] asks the scheduler to switch to the next runnable task. (** [yield ()] asks the scheduler to switch to the next runnable task.
The current task remains runnable, but goes to the back of the queue. The current task remains runnable, but goes to the back of the queue. *)
@param sw Ensure that the switch is still on before returning. *)
end end
val traceln : val traceln :
@ -266,7 +265,7 @@ module Flow : sig
type read_method = .. type read_method = ..
(** Sources can offer a list of ways to read them, in order of preference. *) (** Sources can offer a list of ways to read them, in order of preference. *)
type read_method += Read_source_buffer of (?sw:Switch.t -> (Cstruct.t list -> unit) -> unit) type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit)
(** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn] (** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn]
to borrow a view of the source's buffers. to borrow a view of the source's buffers.
[rb] will raise [End_of_file] if no more data will be produced. [rb] will raise [End_of_file] if no more data will be produced.
@ -282,15 +281,14 @@ module Flow : sig
class virtual read : object class virtual read : object
method virtual read_methods : read_method list method virtual read_methods : read_method list
method virtual read_into : ?sw:Switch.t -> Cstruct.t -> int method virtual read_into : Cstruct.t -> int
end end
val read_into : ?sw:Switch.t -> #read -> Cstruct.t -> int val read_into : #read -> Cstruct.t -> int
(** [read_into reader buf] reads one or more bytes into [buf]. (** [read_into reader buf] reads one or more bytes into [buf].
It returns the number of bytes written (which may be less than the It returns the number of bytes written (which may be less than the
buffer size even if there is more data to be read). buffer size even if there is more data to be read).
[buf] must not be zero-length. [buf] must not be zero-length.
@param sw Abort the read if [sw] is turned off.
@raise End_of_file if there is no more data to read *) @raise End_of_file if there is no more data to read *)
val read_methods : #read -> read_method list val read_methods : #read -> read_method list
@ -309,13 +307,13 @@ module Flow : sig
val cstruct_source : Cstruct.t list -> source val cstruct_source : Cstruct.t list -> source
class virtual write : object class virtual write : object
method virtual write : 'a. ?sw:Switch.t -> (#source as 'a) -> unit method virtual write : 'a. (#source as 'a) -> unit
end end
val copy : ?sw:Switch.t -> #source -> #write -> unit val copy : #source -> #write -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *) (** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
val copy_string : ?sw:Switch.t -> string -> #write -> unit val copy_string : string -> #write -> unit
(** Consumer base class. *) (** Consumer base class. *)
class virtual sink : object class virtual sink : object
@ -404,19 +402,17 @@ end
module Time : sig module Time : sig
class virtual clock : object class virtual clock : object
method virtual now : float method virtual now : float
method virtual sleep_until : ?sw:Switch.t -> float -> unit method virtual sleep_until : float -> unit
end end
val now : #clock -> float val now : #clock -> float
(** [now t] is the current time according to [t]. *) (** [now t] is the current time according to [t]. *)
val sleep_until : ?sw:Switch.t -> #clock -> float -> unit val sleep_until : #clock -> float -> unit
(** [sleep_until t time] waits until the given time is reached. (** [sleep_until t time] waits until the given time is reached. *)
@param sw The sleep is aborted if the switch is turned off. *)
val sleep : ?sw:Switch.t -> #clock -> float -> unit val sleep : #clock -> float -> unit
(** [sleep t d] waits for [d] seconds. (** [sleep t d] waits for [d] seconds. *)
@param sw The sleep is aborted if the switch is turned off. *)
end end
module Dir : sig module Dir : sig
@ -448,7 +444,7 @@ module Dir : sig
append:bool -> append:bool ->
create:create -> create:create ->
path -> <rw; Flow.close> path -> <rw; Flow.close>
method virtual mkdir : ?sw:Switch.t -> perm:Unix.file_perm -> path -> unit method virtual mkdir : perm:Unix.file_perm -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> t_with_close method virtual open_dir : sw:Switch.t -> path -> t_with_close
end end
and virtual t_with_close : object and virtual t_with_close : object
@ -482,7 +478,7 @@ module Dir : sig
(** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes (** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes
it automatically when [fn] returns (if it hasn't already been closed by then). *) it automatically when [fn] returns (if it hasn't already been closed by then). *)
val mkdir : ?sw:Switch.t -> #t -> perm:Unix.file_perm -> path -> unit val mkdir : #t -> perm:Unix.file_perm -> path -> unit
(** [mkdir t ~perm path] creates a new directory [t/path] with permissions [perm]. *) (** [mkdir t ~perm path] creates a new directory [t/path] with permissions [perm]. *)
val open_dir : sw:Switch.t -> #t -> path -> <t; Flow.close> val open_dir : sw:Switch.t -> #t -> path -> <t; Flow.close>
@ -543,7 +539,11 @@ module Private : sig
(e.g. because it called {!Promise.await} on an unresolved promise). (e.g. because it called {!Promise.await} on an unresolved promise).
The effect handler runs [fn tid enqueue] in the scheduler context, The effect handler runs [fn tid enqueue] in the scheduler context,
passing it the suspended fibre's thread ID (for tracing) and a function to resume it. passing it the suspended fibre's thread ID (for tracing) and a function to resume it.
[fn] should arrange for [enqueue] to be called once the thread is ready to run again. *) [fn] should arrange for [enqueue] to be called once the thread is ready to run again.
If a cancellation is pending, this will raise it. *)
| Suspend_unchecked : (Ctf.id -> 'a enqueue -> unit) -> 'a eff
(** [Suspend_unchecked] is like [Suspend], but doesn't raise pending exceptions. *)
| Fork : (unit -> 'a) -> 'a Promise.t eff | Fork : (unit -> 'a) -> 'a Promise.t eff
(** See {!Fibre.fork} *) (** See {!Fibre.fork} *)
@ -551,12 +551,18 @@ module Private : sig
| Fork_ignore : (unit -> unit) -> unit eff | Fork_ignore : (unit -> unit) -> unit eff
(** See {!Fibre.fork_ignore} *) (** See {!Fibre.fork_ignore} *)
| Yield : unit eff
| Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff | Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff
(** [perform Trace fmt] writes trace logging to the configured trace output. (** [perform Trace fmt] writes trace logging to the configured trace output.
It must not switch fibres, as tracing must not affect scheduling. It must not switch fibres, as tracing must not affect scheduling.
If the system is not ready to receive the trace output, If the system is not ready to receive the trace output,
the whole domain must block until it is. *) the whole domain must block until it is. *)
| Set_switch : Switch.t -> Switch.t eff
end end
val boot_switch : Switch.t
end end

View File

@ -1,6 +1,7 @@
open EffectHandlers open EffectHandlers
type _ eff += Fork : (unit -> 'a) -> 'a Promise.t eff type _ eff += Fork : (unit -> 'a) -> 'a Promise.t eff
type _ eff += Yield : unit eff
let fork ~sw ~exn_turn_off f = let fork ~sw ~exn_turn_off f =
let f () = let f () =
@ -24,9 +25,8 @@ let fork_ignore ~sw f =
in in
perform (Fork_ignore f) perform (Fork_ignore f)
let yield ?sw () = let yield () =
Suspend.enter (fun _id enqueue -> enqueue (Ok ())); perform Yield
Option.iter Switch.check sw
let both ~sw f g = let both ~sw f g =
let x = fork ~sw ~exn_turn_off:true f in let x = fork ~sw ~exn_turn_off:true f in
@ -34,8 +34,12 @@ let both ~sw f g =
try g () try g ()
with ex -> Switch.turn_off sw ex with ex -> Switch.turn_off sw ex
end; end;
Promise.await x; ignore (Promise.await_result x : (unit, exn) result);
Switch.check sw match sw.state with
| On _ -> ()
| Off (ex, bt) ->
Switch.raise_with_extras sw ex bt
| Finished -> assert false
let fork_sub_ignore ?on_release ~sw ~on_error f = let fork_sub_ignore ?on_release ~sw ~on_error f =
if Switch.is_finished sw then ( if Switch.is_finished sw then (

View File

@ -2,5 +2,7 @@ open EffectHandlers
type 'a enqueue = ('a, exn) result -> unit type 'a enqueue = ('a, exn) result -> unit
type _ eff += Suspend : (Ctf.id -> 'a enqueue -> unit) -> 'a eff type _ eff += Suspend : (Ctf.id -> 'a enqueue -> unit) -> 'a eff
type _ eff += Suspend_unchecked : (Ctf.id -> 'a enqueue -> unit) -> 'a eff
let enter fn = perform (Suspend fn) let enter fn = perform (Suspend fn)
let enter_unchecked fn = perform (Suspend_unchecked fn)

View File

@ -1,3 +1,5 @@
open EffectHandlers
exception Multiple_exceptions of exn list exception Multiple_exceptions of exn list
exception Cancelled of exn exception Cancelled of exn
@ -24,6 +26,23 @@ type t = {
waiter : unit Waiters.t; (* The main [top]/[sub] function may wait here for fibres to finish. *) waiter : unit Waiters.t; (* The main [top]/[sub] function may wait here for fibres to finish. *)
} }
(* A dummy switch for bootstrapping *)
let boot_switch = {
id = Ctf.mint_id ();
state = Finished;
fibres = 0;
extra_exceptions = [];
on_release = Lwt_dllist.create ();
waiter = Waiters.create ();
}
type _ eff += Set_switch : t -> t eff
let with_switch t fn =
let old = perform (Set_switch t) in
Fun.protect fn
~finally:(fun () -> ignore (perform (Set_switch old)))
let null_hook = ignore let null_hook = ignore
let remove_hook h = h () let remove_hook h = h ()
@ -52,6 +71,7 @@ let rec turn_off t ex =
| Off _ -> | Off _ ->
begin match ex with begin match ex with
| Cancelled _ -> () (* The original exception will be reported elsewhere *) | Cancelled _ -> () (* The original exception will be reported elsewhere *)
| Multiple_exceptions exns -> List.iter (turn_off t) exns
| _ -> t.extra_exceptions <- ex :: t.extra_exceptions | _ -> t.extra_exceptions <- ex :: t.extra_exceptions
end end
| On q -> | On q ->
@ -108,7 +128,7 @@ let await_internal ?sw waiters id tid enqueue =
Queue.add resolved_waiter cleanup_hooks Queue.add resolved_waiter cleanup_hooks
let await ?sw waiters id = let await ?sw waiters id =
Suspend.enter (await_internal ?sw waiters id) Suspend.enter_unchecked (await_internal ?sw waiters id)
let rec await_idle t = let rec await_idle t =
(* Wait for fibres to finish: *) (* Wait for fibres to finish: *)
@ -149,6 +169,7 @@ let top fn =
waiter = Waiters.create (); waiter = Waiters.create ();
on_release = Lwt_dllist.create (); on_release = Lwt_dllist.create ();
} in } in
with_switch t @@ fun () ->
match fn t with match fn t with
| v -> | v ->
await_idle t; await_idle t;

View File

@ -95,7 +95,6 @@ type rw_req = {
buf : Uring.Region.chunk; buf : Uring.Region.chunk;
mutable cur_off : int; mutable cur_off : int;
action : int Suspended.t; action : int Suspended.t;
sw : Switch.t option;
} }
type cancel_hook = Switch.hook ref type cancel_hook = Switch.hook ref
@ -109,6 +108,7 @@ type io_job =
type runnable = type runnable =
| Thread : 'a Suspended.t * 'a -> runnable | Thread : 'a Suspended.t * 'a -> runnable
| Thread_checked : unit Suspended.t -> runnable
| Failed_thread : 'a Suspended.t * exn -> runnable | Failed_thread : 'a Suspended.t * exn -> runnable
type t = { type t = {
@ -127,6 +127,7 @@ let enqueue_thread st k x =
let enqueue_failed_thread st k ex = let enqueue_failed_thread st k ex =
Queue.push (Failed_thread (k, ex)) st.run_q Queue.push (Failed_thread (k, ex)) st.run_q
type _ eff += Enter_unchecked : (t -> 'a Suspended.t -> unit) -> 'a eff
type _ eff += Enter : (t -> 'a Suspended.t -> unit) -> 'a eff type _ eff += Enter : (t -> 'a Suspended.t -> unit) -> 'a eff
let enter fn = perform (Enter fn) let enter fn = perform (Enter fn)
@ -138,7 +139,7 @@ let rec enqueue_cancel job st action =
| Some _ -> () | Some _ -> ()
let cancel job = let cancel job =
let res = enter (enqueue_cancel job) in let res = perform (Enter_unchecked (enqueue_cancel job)) in
Log.debug (fun l -> l "cancel returned"); Log.debug (fun l -> l "cancel returned");
if res = -2 then ( if res = -2 then (
Log.debug (fun f -> f "Cancel returned ENOENT - operation completed before cancel took effect") Log.debug (fun f -> f "Cancel returned ENOENT - operation completed before cancel took effect")
@ -175,27 +176,25 @@ let cancel job =
When [fn cancel_hook] returns, it registers a cancellation callback with [sw] and stores its handle in [cancel_hook]. When [fn cancel_hook] returns, it registers a cancellation callback with [sw] and stores its handle in [cancel_hook].
If [sw] is already off, it schedules [action] to be discontinued. If [sw] is already off, it schedules [action] to be discontinued.
@return Whether to retry the operation later, once there is space. *) @return Whether to retry the operation later, once there is space. *)
let with_cancel_hook ?sw ~action st fn = let with_cancel_hook ~action st fn =
let release = ref Switch.null_hook in let release = ref Switch.null_hook in
match sw with let sw = action.Suspended.fibre.switch in
| None -> fn release = None match Switch.get_error sw with
| Some sw -> | Some ex -> enqueue_failed_thread st action ex; false
match Switch.get_error sw with | None ->
| Some ex -> enqueue_failed_thread st action ex; false match fn release with
| None -> | None -> true
match fn release with | Some job ->
| None -> true release := Switch.add_cancel_hook sw (fun _ -> cancel job);
| Some job -> false
release := Switch.add_cancel_hook sw (fun _ -> cancel job);
false
let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; sw; action} as req) = let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) =
let fd = FD.get "submit_rw_req" fd in let fd = FD.get "submit_rw_req" fd in
let {uring;io_q;_} = st in let {uring;io_q;_} = st in
let off = Uring.Region.to_offset buf + cur_off in let off = Uring.Region.to_offset buf + cur_off in
let len = match len with Exactly l | Upto l -> l in let len = match len with Exactly l | Upto l -> l in
let len = len - cur_off in let len = len - cur_off in
let retry = with_cancel_hook ?sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
match op with match op with
|`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read (req, cancel)) |`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read (req, cancel))
|`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write (req, cancel)) |`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write (req, cancel))
@ -210,26 +209,26 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; sw; action} a
(* TODO bind from unixsupport *) (* TODO bind from unixsupport *)
let errno_is_retry = function -62 | -11 | -4 -> true |_ -> false let errno_is_retry = function -62 | -11 | -4 -> true |_ -> false
let enqueue_read st action (sw,file_offset,fd,buf,len) = let enqueue_read st action (file_offset,fd,buf,len) =
let file_offset = let file_offset =
match file_offset with match file_offset with
| Some x -> x | Some x -> x
| None -> FD.uring_file_offset fd | None -> FD.uring_file_offset fd
in in
let req = { op=`R; file_offset; len; fd; cur_off = 0; buf; action; sw} in let req = { op=`R; file_offset; len; fd; cur_off = 0; buf; action } in
Log.debug (fun l -> l "read: submitting call"); Log.debug (fun l -> l "read: submitting call");
Ctf.label "read"; Ctf.label "read";
submit_rw_req st req submit_rw_req st req
let rec enqueue_readv args st action = let rec enqueue_readv args st action =
let (sw,file_offset,fd,bufs) = args in let (file_offset,fd,bufs) = args in
let file_offset = let file_offset =
match file_offset with match file_offset with
| Some x -> x | Some x -> x
| None -> FD.uring_file_offset fd | None -> FD.uring_file_offset fd
in in
Ctf.label "readv"; Ctf.label "readv";
let retry = with_cancel_hook ?sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.readv st.uring ~file_offset (FD.get "readv" fd) bufs (Job (action, cancel)) Uring.readv st.uring ~file_offset (FD.get "readv" fd) bufs (Job (action, cancel))
) )
in in
@ -237,29 +236,29 @@ let rec enqueue_readv args st action =
Queue.push (fun st -> enqueue_readv args st action) st.io_q Queue.push (fun st -> enqueue_readv args st action) st.io_q
let rec enqueue_writev args st action = let rec enqueue_writev args st action =
let (sw,file_offset,fd,bufs) = args in let (file_offset,fd,bufs) = args in
let file_offset = let file_offset =
match file_offset with match file_offset with
| Some x -> x | Some x -> x
| None -> FD.uring_file_offset fd | None -> FD.uring_file_offset fd
in in
Ctf.label "writev"; Ctf.label "writev";
let retry = with_cancel_hook ?sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.writev st.uring ~file_offset (FD.get "writev" fd) bufs (Job (action, cancel)) Uring.writev st.uring ~file_offset (FD.get "writev" fd) bufs (Job (action, cancel))
) )
in in
if retry then (* wait until an sqe is available *) if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_writev args st action) st.io_q Queue.push (fun st -> enqueue_writev args st action) st.io_q
let rec enqueue_poll_add ?sw fd poll_mask st action = let rec enqueue_poll_add fd poll_mask st action =
Log.debug (fun l -> l "poll_add: submitting call"); Log.debug (fun l -> l "poll_add: submitting call");
Ctf.label "poll_add"; Ctf.label "poll_add";
let retry = with_cancel_hook ?sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.poll_add st.uring (FD.get "poll_add" fd) poll_mask (Job (action, cancel)) Uring.poll_add st.uring (FD.get "poll_add" fd) poll_mask (Job (action, cancel))
) )
in in
if retry then (* wait until an sqe is available *) if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_poll_add ?sw fd poll_mask st action) st.io_q Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
let rec enqueue_close st action fd = let rec enqueue_close st action fd =
Log.debug (fun l -> l "close: submitting call"); Log.debug (fun l -> l "close: submitting call");
@ -268,57 +267,57 @@ let rec enqueue_close st action fd =
if subm = None then (* wait until an sqe is available *) if subm = None then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_close st action fd) st.io_q Queue.push (fun st -> enqueue_close st action fd) st.io_q
let enqueue_write st action (sw,file_offset,fd,buf,len) = let enqueue_write st action (file_offset,fd,buf,len) =
let file_offset = let file_offset =
match file_offset with match file_offset with
| Some x -> x | Some x -> x
| None -> FD.uring_file_offset fd | None -> FD.uring_file_offset fd
in in
let req = { op=`W; file_offset; len; fd; cur_off = 0; buf; action; sw } in let req = { op=`W; file_offset; len; fd; cur_off = 0; buf; action } in
Log.debug (fun l -> l "write: submitting call"); Log.debug (fun l -> l "write: submitting call");
Ctf.label "write"; Ctf.label "write";
submit_rw_req st req submit_rw_req st req
let rec enqueue_splice ?sw ~src ~dst ~len st action = let rec enqueue_splice ~src ~dst ~len st action =
Log.debug (fun l -> l "splice: submitting call"); Log.debug (fun l -> l "splice: submitting call");
Ctf.label "splice"; Ctf.label "splice";
let retry = with_cancel_hook ?sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.splice st.uring (Job (action, cancel)) ~src:(FD.get "splice" src) ~dst:(FD.get "splice" dst) ~len Uring.splice st.uring (Job (action, cancel)) ~src:(FD.get "splice" src) ~dst:(FD.get "splice" dst) ~len
) )
in in
if retry then (* wait until an sqe is available *) if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_splice ?sw ~src ~dst ~len st action) st.io_q Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
let rec enqueue_openat2 ((sw, access, flags, perm, resolve, dir, path) as args) st action = let rec enqueue_openat2 ((access, flags, perm, resolve, dir, path) as args) st action =
Log.debug (fun l -> l "openat2: submitting call"); Log.debug (fun l -> l "openat2: submitting call");
Ctf.label "openat2"; Ctf.label "openat2";
let fd = Option.map (FD.get "openat2") dir in let fd = Option.map (FD.get "openat2") dir in
let retry = with_cancel_hook ~sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job (action, cancel)) Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job (action, cancel))
) )
in in
if retry then (* wait until an sqe is available *) if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
let rec enqueue_connect ?sw fd addr st action = let rec enqueue_connect fd addr st action =
Log.debug (fun l -> l "connect: submitting call"); Log.debug (fun l -> l "connect: submitting call");
Ctf.label "connect"; Ctf.label "connect";
let retry = with_cancel_hook ?sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.connect st.uring (FD.get "connect" fd) addr (Job (action, cancel)) Uring.connect st.uring (FD.get "connect" fd) addr (Job (action, cancel))
) )
in in
if retry then (* wait until an sqe is available *) if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_connect ?sw fd addr st action) st.io_q Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
let rec enqueue_accept ~sw fd client_addr st action = let rec enqueue_accept fd client_addr st action =
Log.debug (fun l -> l "accept: submitting call"); Log.debug (fun l -> l "accept: submitting call");
Ctf.label "accept"; Ctf.label "accept";
let retry = with_cancel_hook ~sw ~action st (fun cancel -> let retry = with_cancel_hook ~action st (fun cancel ->
Uring.accept st.uring (FD.get "accept" fd) client_addr (Job (action, cancel)) Uring.accept st.uring (FD.get "accept" fd) client_addr (Job (action, cancel))
) in ) in
if retry then ( if retry then (
(* wait until an sqe is available *) (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_accept ~sw fd client_addr st action) st.io_q Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q
) )
let rec enqueue_noop st action = let rec enqueue_noop st action =
@ -345,6 +344,11 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
(* Wakeup any paused fibres *) (* Wakeup any paused fibres *)
match Queue.take run_q with match Queue.take run_q with
| Thread (k, v) -> Suspended.continue k v (* We already have a runnable task *) | Thread (k, v) -> Suspended.continue k v (* We already have a runnable task *)
| Thread_checked k ->
begin match Switch.get_error k.fibre.switch with
| Some e -> Suspended.discontinue k e
| None -> Suspended.continue k ()
end
| Failed_thread (k, ex) -> Suspended.discontinue k ex | Failed_thread (k, ex) -> Suspended.discontinue k ex
| exception Queue.Empty -> | exception Queue.Empty ->
let now = Unix.gettimeofday () in let now = Unix.gettimeofday () in
@ -396,19 +400,26 @@ and handle_complete st ~runnable result =
complete_rw_req st req result complete_rw_req st req result
| Job (k, cancel) -> | Job (k, cancel) ->
Switch.remove_hook !cancel; Switch.remove_hook !cancel;
Suspended.continue k result begin match Switch.get_error k.fibre.switch with
| Some e -> Suspended.discontinue k e (* If cancelled, report that instead. *)
| None -> Suspended.continue k result
end
| Job_no_cancel k -> | Job_no_cancel k ->
Suspended.continue k result Suspended.continue k result
and complete_rw_req st ({len; cur_off; action; _} as req) res = and complete_rw_req st ({len; cur_off; action; _} as req) res =
match res, len with match res, len with
| 0, _ -> Suspended.discontinue action End_of_file | 0, _ -> Suspended.discontinue action End_of_file
| e, _ when e < 0 -> | e, _ when e < 0 ->
if errno_is_retry e then ( begin match Switch.get_error action.fibre.switch with
submit_rw_req st req; | Some e -> Suspended.discontinue action e (* If cancelled, report that instead. *)
schedule st | None ->
) else ( if errno_is_retry e then (
Suspended.continue action e submit_rw_req st req;
) schedule st
) else (
Suspended.continue action e
)
end
| n, Exactly len when n < len - cur_off -> | n, Exactly len when n < len - cur_off ->
req.cur_off <- req.cur_off + n; req.cur_off <- req.cur_off + n;
submit_rw_req st req; submit_rw_req st req;
@ -434,25 +445,23 @@ let noop () =
Log.debug (fun l -> l "noop returned"); Log.debug (fun l -> l "noop returned");
if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", "")) if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", ""))
type _ eff += Sleep_until : Switch.t option * float -> unit eff type _ eff += Sleep_until : float -> unit eff
let sleep_until ?sw d = let sleep_until d =
perform (Sleep_until (sw, d)) perform (Sleep_until d)
type _ eff += ERead : (Switch.t option * Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff type _ eff += ERead : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff
let read_exactly ?sw ?file_offset fd buf len = let read_exactly ?file_offset fd buf len =
let res = perform (ERead (sw, file_offset, fd, buf, Exactly len)) in let res = perform (ERead (file_offset, fd, buf, Exactly len)) in
Log.debug (fun l -> l "read_exactly: woken up after read"); Log.debug (fun l -> l "read_exactly: woken up after read");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "read_exactly", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "read_exactly", ""))
) )
let read_upto ?sw ?file_offset fd buf len = let read_upto ?file_offset fd buf len =
let res = perform (ERead (sw, file_offset, fd, buf, Upto len)) in let res = perform (ERead (file_offset, fd, buf, Upto len)) in
Log.debug (fun l -> l "read_upto: woken up after read"); Log.debug (fun l -> l "read_upto: woken up after read");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
let err = Uring.error_of_errno res in let err = Uring.error_of_errno res in
let ex = Unix.Unix_error (err, "read_upto", "") in let ex = Unix.Unix_error (err, "read_upto", "") in
if err = Unix.ECONNRESET then raise (Eio.Net.Connection_reset ex) if err = Unix.ECONNRESET then raise (Eio.Net.Connection_reset ex)
@ -461,11 +470,10 @@ let read_upto ?sw ?file_offset fd buf len =
res res
) )
let readv ?sw ?file_offset fd bufs = let readv ?file_offset fd bufs =
let res = enter (enqueue_readv (sw, file_offset, fd, bufs)) in let res = enter (enqueue_readv (file_offset, fd, bufs)) in
Log.debug (fun l -> l "readv: woken up after read"); Log.debug (fun l -> l "readv: woken up after read");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "readv", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "readv", ""))
) else if res = 0 then ( ) else if res = 0 then (
raise End_of_file raise End_of_file
@ -473,11 +481,10 @@ let readv ?sw ?file_offset fd bufs =
res res
) )
let rec writev ?sw ?file_offset fd bufs = let rec writev ?file_offset fd bufs =
let res = enter (enqueue_writev (sw, file_offset, fd, bufs)) in let res = enter (enqueue_writev (file_offset, fd, bufs)) in
Log.debug (fun l -> l "writev: woken up after write"); Log.debug (fun l -> l "writev: woken up after write");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "writev", ""))
) else ( ) else (
match Cstruct.shiftv bufs res with match Cstruct.shiftv bufs res with
@ -490,32 +497,29 @@ let rec writev ?sw ?file_offset fd bufs =
| Some ofs when ofs = I63.minus_one -> Some I63.minus_one | Some ofs when ofs = I63.minus_one -> Some I63.minus_one
| Some ofs -> Some (I63.add ofs (I63.of_int res)) | Some ofs -> Some (I63.add ofs (I63.of_int res))
in in
writev ?sw ?file_offset fd bufs writev ?file_offset fd bufs
) )
let await_readable ?sw fd = let await_readable fd =
let res = enter (enqueue_poll_add ?sw fd (Uring.Poll_mask.(pollin + pollerr))) in let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in
Log.debug (fun l -> l "await_readable: woken up"); Log.debug (fun l -> l "await_readable: woken up");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "await_readable", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "await_readable", ""))
) )
let await_writable ?sw fd = let await_writable fd =
let res = enter (enqueue_poll_add ?sw fd (Uring.Poll_mask.(pollout + pollerr))) in let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in
Log.debug (fun l -> l "await_writable: woken up"); Log.debug (fun l -> l "await_writable: woken up");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "await_writable", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "await_writable", ""))
) )
type _ eff += EWrite : (Switch.t option * Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff type _ eff += EWrite : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff
let write ?sw ?file_offset fd buf len = let write ?file_offset fd buf len =
let res = perform (EWrite (sw, file_offset, fd, buf, Exactly len)) in let res = perform (EWrite (file_offset, fd, buf, Exactly len)) in
Log.debug (fun l -> l "write: woken up after write"); Log.debug (fun l -> l "write: woken up after write");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "write", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "write", ""))
) )
@ -525,21 +529,17 @@ let alloc () = perform Alloc
type _ eff += Free : Uring.Region.chunk -> unit eff type _ eff += Free : Uring.Region.chunk -> unit eff
let free buf = perform (Free buf) let free buf = perform (Free buf)
let splice ?sw src ~dst ~len = let splice src ~dst ~len =
let res = enter (enqueue_splice ?sw ~src ~dst ~len) in let res = enter (enqueue_splice ~src ~dst ~len) in
Log.debug (fun l -> l "splice returned"); Log.debug (fun l -> l "splice returned");
if res > 0 then res if res > 0 then res
else if res = 0 then raise End_of_file else if res = 0 then raise End_of_file
else ( else raise (Unix.Unix_error (Uring.error_of_errno res, "splice", ""))
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "splice", ""))
)
let connect ?sw fd addr = let connect fd addr =
let res = enter (enqueue_connect ?sw fd addr) in let res = enter (enqueue_connect fd addr) in
Log.debug (fun l -> l "connect returned"); Log.debug (fun l -> l "connect returned");
if res < 0 then ( if res < 0 then (
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "connect", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "connect", ""))
) )
@ -554,7 +554,7 @@ let openfile ~sw path flags mode =
let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path = let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path =
wrap_errors path @@ fun () -> wrap_errors path @@ fun () ->
let res = enter (enqueue_openat2 (sw, access, flags, perm, resolve, dir, path)) in let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in
Log.debug (fun l -> l "openat2 returned"); Log.debug (fun l -> l "openat2 returned");
if res < 0 then ( if res < 0 then (
Switch.check sw; (* If cancelled, report that instead. *) Switch.check sw; (* If cancelled, report that instead. *)
@ -574,17 +574,17 @@ let fstat fd =
external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat" external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat"
(* We ignore [sw] because this isn't a uring operation yet. *) (* We ignore [sw] because this isn't a uring operation yet. *)
let mkdirat ?sw:_ ~perm dir path = let mkdirat ~perm dir path =
wrap_errors path @@ fun () -> wrap_errors path @@ fun () ->
match dir with match dir with
| None -> Unix.mkdir path perm | None -> Unix.mkdir path perm
| Some dir -> eio_mkdirat (FD.get "mkdirat" dir) path perm | Some dir -> eio_mkdirat (FD.get "mkdirat" dir) path perm
let mkdir_beneath ?sw ~perm ?dir path = let mkdir_beneath ~perm ?dir path =
let dir_path = Filename.dirname path in let dir_path = Filename.dirname path in
let leaf = Filename.basename path in let leaf = Filename.basename path in
(* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *) (* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *)
Switch.sub_opt sw (fun sw -> Switch.top (fun sw ->
let parent = let parent =
wrap_errors path @@ fun () -> wrap_errors path @@ fun () ->
openat2 ~sw ~seekable:false ?dir dir_path openat2 ~sw ~seekable:false ?dir dir_path
@ -593,19 +593,18 @@ let mkdir_beneath ?sw ~perm ?dir path =
~perm:0 ~perm:0
~resolve:Uring.Resolve.beneath ~resolve:Uring.Resolve.beneath
in in
mkdirat ~sw ~perm (Some parent) leaf mkdirat ~perm (Some parent) leaf
) )
let shutdown socket command = let shutdown socket command =
Unix.shutdown (FD.get "shutdown" socket) command Unix.shutdown (FD.get "shutdown" socket) command
let accept_loose_fd ~sw socket = let accept_loose_fd socket =
Ctf.label "accept"; Ctf.label "accept";
let client_addr = Uring.Sockaddr.create () in let client_addr = Uring.Sockaddr.create () in
let res = enter (enqueue_accept ~sw socket client_addr) in let res = enter (enqueue_accept socket client_addr) in
Log.debug (fun l -> l "accept returned"); Log.debug (fun l -> l "accept returned");
if res < 0 then ( if res < 0 then (
Switch.check sw; (* If cancelled, report that instead. *)
raise (Unix.Unix_error (Uring.error_of_errno res, "accept", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "accept", ""))
) else ( ) else (
let unix : Unix.file_descr = Obj.magic res in let unix : Unix.file_descr = Obj.magic res in
@ -615,7 +614,7 @@ let accept_loose_fd ~sw socket =
) )
let accept ~sw fd = let accept ~sw fd =
let client, client_addr = accept_loose_fd ~sw fd in let client, client_addr = accept_loose_fd fd in
Switch.on_release sw (fun () -> FD.ensure_closed client); Switch.on_release sw (fun () -> FD.ensure_closed client);
client, client_addr client, client_addr
@ -643,46 +642,46 @@ module Objects = struct
(* When copying between a source with an FD and a sink with an FD, we can share the chunk (* When copying between a source with an FD and a sink with an FD, we can share the chunk
and avoid copying. *) and avoid copying. *)
let fast_copy ?sw src dst = let fast_copy src dst =
with_chunk @@ fun chunk -> with_chunk @@ fun chunk ->
let chunk_size = Uring.Region.length chunk in let chunk_size = Uring.Region.length chunk in
try try
while true do while true do
let got = read_upto ?sw src chunk chunk_size in let got = read_upto src chunk chunk_size in
write ?sw dst chunk got write dst chunk got
done done
with End_of_file -> () with End_of_file -> ()
(* Try a fast copy using splice. If the FDs don't support that, switch to copying. *) (* Try a fast copy using splice. If the FDs don't support that, switch to copying. *)
let fast_copy_try_splice ?sw src dst = let fast_copy_try_splice src dst =
try try
while true do while true do
let _ : int = splice ?sw src ~dst ~len:max_int in let _ : int = splice src ~dst ~len:max_int in
() ()
done done
with with
| End_of_file -> () | End_of_file -> ()
| Unix.Unix_error (Unix.EINVAL, "splice", _) -> fast_copy ?sw src dst | Unix.Unix_error (Unix.EINVAL, "splice", _) -> fast_copy src dst
(* Copy using the [Read_source_buffer] optimisation. (* Copy using the [Read_source_buffer] optimisation.
Avoids a copy if the source already has the data. *) Avoids a copy if the source already has the data. *)
let copy_with_rsb ?sw rsb dst = let copy_with_rsb rsb dst =
try try
while true do while true do
rsb ?sw (writev ?sw dst) rsb (writev dst)
done done
with End_of_file -> () with End_of_file -> ()
(* Copy by allocating a chunk from the pre-shared buffer and asking (* Copy by allocating a chunk from the pre-shared buffer and asking
the source to write into it. This used when the other methods the source to write into it. This used when the other methods
aren't available. *) aren't available. *)
let fallback_copy ?sw src dst = let fallback_copy src dst =
with_chunk @@ fun chunk -> with_chunk @@ fun chunk ->
let chunk_cs = Uring.Region.to_cstruct chunk in let chunk_cs = Uring.Region.to_cstruct chunk in
try try
while true do while true do
let got = Eio.Flow.read_into ?sw src chunk_cs in let got = Eio.Flow.read_into src chunk_cs in
write ?sw dst chunk got write dst chunk got
done done
with End_of_file -> () with End_of_file -> ()
@ -696,7 +695,7 @@ module Objects = struct
| FD -> Some fd | FD -> Some fd
| _ -> None | _ -> None
method read_into ?sw buf = method read_into buf =
(* Inefficient copying fallback *) (* Inefficient copying fallback *)
with_chunk @@ fun chunk -> with_chunk @@ fun chunk ->
let chunk_cs = Uring.Region.to_cstruct chunk in let chunk_cs = Uring.Region.to_cstruct chunk in
@ -704,22 +703,22 @@ module Objects = struct
if Lazy.force is_tty then ( if Lazy.force is_tty then (
(* Work-around for https://github.com/axboe/liburing/issues/354 (* Work-around for https://github.com/axboe/liburing/issues/354
(should be fixed in Linux 5.14) *) (should be fixed in Linux 5.14) *)
await_readable ?sw fd await_readable fd
); );
let got = read_upto ?sw fd chunk max_len in let got = read_upto fd chunk max_len in
Cstruct.blit chunk_cs 0 buf 0 got; Cstruct.blit chunk_cs 0 buf 0 got;
got got
method read_methods = [] method read_methods = []
method write ?sw src = method write src =
match get_fd_opt src with match get_fd_opt src with
| Some src -> fast_copy_try_splice ?sw src fd | Some src -> fast_copy_try_splice src fd
| None -> | None ->
let rec aux = function let rec aux = function
| Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb ?sw rsb fd | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb rsb fd
| _ :: xs -> aux xs | _ :: xs -> aux xs
| [] -> fallback_copy ?sw src fd | [] -> fallback_copy src fd
in in
aux (Eio.Flow.read_methods src) aux (Eio.Flow.read_methods src)
@ -739,7 +738,7 @@ module Objects = struct
method close = FD.close fd method close = FD.close fd
method accept_sub ~sw ~on_error fn = method accept_sub ~sw ~on_error fn =
let client, client_addr = accept_loose_fd ~sw fd in let client, client_addr = accept_loose_fd fd in
Fibre.fork_sub_ignore ~sw ~on_error Fibre.fork_sub_ignore ~sw ~on_error
(fun sw -> (fun sw ->
let client_addr = match client_addr with let client_addr = match client_addr with
@ -790,7 +789,7 @@ module Objects = struct
in in
let sock_unix = Unix.socket socket_domain socket_type 0 in let sock_unix = Unix.socket socket_domain socket_type 0 in
let sock = FD.of_unix ~sw ~seekable:false sock_unix in let sock = FD.of_unix ~sw ~seekable:false sock_unix in
connect ~sw sock addr; connect sock addr;
(flow sock :> <Eio.Flow.two_way; Eio.Flow.close>) (flow sock :> <Eio.Flow.two_way; Eio.Flow.close>)
end end
@ -870,8 +869,8 @@ module Objects = struct
in in
(new dir (Some fd) :> <Eio.Dir.t; Eio.Flow.close>) (new dir (Some fd) :> <Eio.Dir.t; Eio.Flow.close>)
method mkdir ?sw ~perm path = method mkdir ~perm path =
mkdir_beneath ?sw ~perm ?dir:fd path mkdir_beneath ~perm ?dir:fd path
method close = method close =
FD.close (Option.get fd) FD.close (Option.get fd)
@ -883,8 +882,8 @@ module Objects = struct
val! resolve_flags = Uring.Resolve.empty val! resolve_flags = Uring.Resolve.empty
method! mkdir ?sw ~perm path = method! mkdir ~perm path =
mkdirat ?sw ~perm None path mkdirat ~perm None path
end end
let stdenv () = let stdenv () =
@ -926,8 +925,9 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
let mem_q = Queue.create () in let mem_q = Queue.create () in
let st = { mem; uring; run_q; io_q; mem_q; sleep_q; io_jobs = 0 } in let st = { mem; uring; run_q; io_q; mem_q; sleep_q; io_jobs = 0 } in
Log.debug (fun l -> l "starting main thread"); Log.debug (fun l -> l "starting main thread");
let rec fork ~tid fn = let rec fork ~tid ~switch:initial_switch fn =
Ctf.note_switch tid; Ctf.note_switch tid;
let fibre = { Suspended.tid; switch = initial_switch } in
match_with fn () match_with fn ()
{ retc = (fun () -> schedule st); { retc = (fun () -> schedule st);
exnc = (fun e -> raise e); exnc = (fun e -> raise e);
@ -935,60 +935,92 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
match e with match e with
| Enter fn -> | Enter fn ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in begin match Switch.get_error fibre.switch with
fn st k; | Some e -> discontinue k e
schedule st) | None ->
let k = { Suspended.k; fibre } in
fn st k;
schedule st
end)
| Enter_unchecked fn ->
Some (fun k ->
let k = { Suspended.k; fibre } in
fn st k;
schedule st
)
| ERead args -> | ERead args ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
enqueue_read st k args; enqueue_read st k args;
schedule st) schedule st)
| Close fd -> | Close fd ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
enqueue_close st k fd; enqueue_close st k fd;
schedule st) schedule st)
| EWrite args -> | EWrite args ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
enqueue_write st k args; enqueue_write st k args;
schedule st) schedule st)
| Sleep_until (sw, time) -> | Sleep_until time ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
let cancel_hook = ref Switch.null_hook in let cancel_hook = ref Switch.null_hook in
begin match sw with let sw = fibre.switch in
begin match Switch.get_error sw with
| Some ex -> Suspended.discontinue k ex
| None -> | None ->
ignore (Zzz.add ~cancel_hook sleep_q time k : Zzz.Key.t); let job = Zzz.add ~cancel_hook sleep_q time k in
cancel_hook := Switch.add_cancel_hook sw (fun ex ->
Zzz.remove sleep_q job;
enqueue_failed_thread st k ex
);
schedule st schedule st
| Some sw ->
match Switch.get_error sw with
| Some ex -> Suspended.discontinue k ex
| None ->
let job = Zzz.add ~cancel_hook sleep_q time k in
cancel_hook := Switch.add_cancel_hook sw (fun ex ->
Zzz.remove sleep_q job;
enqueue_failed_thread st k ex
);
schedule st
end) end)
| Eio.Private.Effects.Set_switch switch ->
Some (fun k ->
let old = fibre.switch in
fibre.switch <- switch;
continue k old
)
| Eio.Private.Effects.Yield ->
Some (fun k ->
let k = { Suspended.k; fibre } in
Queue.push (Thread_checked k) st.run_q;
schedule st
)
| Eio.Private.Effects.Suspend_unchecked f ->
Some (fun k ->
let k = { Suspended.k; fibre } in
f tid (function
| Ok v -> enqueue_thread st k v
| Error ex -> enqueue_failed_thread st k ex
);
schedule st
)
| Eio.Private.Effects.Suspend f -> | Eio.Private.Effects.Suspend f ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in match Switch.get_error fibre.switch with
f tid (function | Some e -> discontinue k e
| Ok v -> enqueue_thread st k v | None ->
| Error ex -> enqueue_failed_thread st k ex let k = { Suspended.k; fibre } in
); f tid (function
schedule st) | Ok v -> enqueue_thread st k v
| Error ex -> enqueue_failed_thread st k ex
);
schedule st
)
| Eio.Private.Effects.Fork f -> | Eio.Private.Effects.Fork f ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
let id = Ctf.mint_id () in let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Task; Ctf.note_created id Ctf.Task;
let promise, resolver = Promise.create_with_id id in let promise, resolver = Promise.create_with_id id in
enqueue_thread st k promise; enqueue_thread st k promise;
fork fork
~tid:id ~tid:id
~switch:fibre.switch
(fun () -> (fun () ->
match f () with match f () with
| x -> Promise.fulfill resolver x | x -> Promise.fulfill resolver x
@ -998,11 +1030,11 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
)) ))
| Eio.Private.Effects.Fork_ignore f -> | Eio.Private.Effects.Fork_ignore f ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
enqueue_thread st k (); enqueue_thread st k ();
let child = Ctf.note_fork () in let child = Ctf.note_fork () in
Ctf.note_switch child; Ctf.note_switch child;
fork ~tid:child (fun () -> fork ~tid:child ~switch:fibre.switch (fun () ->
match f () with match f () with
| () -> | () ->
Ctf.note_resolved child ~ex:None Ctf.note_resolved child ~ex:None
@ -1012,7 +1044,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
| Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln) | Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln)
| Alloc -> | Alloc ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
alloc_buf st k) alloc_buf st k)
| Free buf -> | Free buf ->
Some (fun k -> Some (fun k ->
@ -1022,9 +1054,10 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
} }
in in
let main_done = ref false in let main_done = ref false in
let `Exit_scheduler = fork ~tid:(Ctf.mint_id ()) (fun () -> let `Exit_scheduler =
Fun.protect (fun () -> main stdenv) fork ~tid:(Ctf.mint_id ()) ~switch:Eio.Private.boot_switch (fun () ->
~finally:(fun () -> main_done := true) Fun.protect (fun () -> Switch.top (fun _sw -> main stdenv))
~finally:(fun () -> main_done := true)
) in ) in
if not !main_done then if not !main_done then
failwith "Deadlock detected: no events scheduled but main function hasn't returned"; failwith "Deadlock detected: no events scheduled but main function hasn't returned";

View File

@ -47,7 +47,7 @@ val noop : unit -> unit
(** {1 Time functions} *) (** {1 Time functions} *)
val sleep_until : ?sw:Switch.t -> float -> unit val sleep_until : float -> unit
(** [sleep_until time] blocks until the current time is [time]. (** [sleep_until time] blocks until the current time is [time].
@param sw Cancel the sleep if [sw] is turned off. *) @param sw Cancel the sleep if [sw] is turned off. *)
@ -76,48 +76,47 @@ val openat2 :
(** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path]. (** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path].
See {!Uring.openat2} for details. *) See {!Uring.openat2} for details. *)
val read_upto : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> int val read_upto : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> int
(** [read_upto fd chunk len] reads at most [len] bytes from [fd], (** [read_upto fd chunk len] reads at most [len] bytes from [fd],
returning as soon as some data is available. returning as soon as some data is available.
@param sw Abort the read if [sw] is turned off.
@param file_offset Read from the given position in [fd] (default: 0). @param file_offset Read from the given position in [fd] (default: 0).
@raise End_of_file Raised if all data has already been read. *) @raise End_of_file Raised if all data has already been read. *)
val read_exactly : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit val read_exactly : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit
(** [read_exactly fd chunk len] reads exactly [len] bytes from [fd], (** [read_exactly fd chunk len] reads exactly [len] bytes from [fd],
performing multiple read operations if necessary. performing multiple read operations if necessary.
@param file_offset Read from the given position in [fd] (default: 0). @param file_offset Read from the given position in [fd] (default: 0).
@raise End_of_file Raised if the stream ends before [len] bytes have been read. *) @raise End_of_file Raised if the stream ends before [len] bytes have been read. *)
val readv : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int val readv : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int
(** [readv] is like {!read_upto} but can read into any cstruct(s), (** [readv] is like {!read_upto} but can read into any cstruct(s),
not just chunks of the pre-shared buffer. not just chunks of the pre-shared buffer.
If multiple buffers are given, they are filled in order. *) If multiple buffers are given, they are filled in order. *)
val write : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit val write : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit
(** [write fd buf len] writes exactly [len] bytes from [buf] to [fd]. (** [write fd buf len] writes exactly [len] bytes from [buf] to [fd].
It blocks until the OS confirms the write is done, It blocks until the OS confirms the write is done,
and resubmits automatically if the OS doesn't write all of it at once. *) and resubmits automatically if the OS doesn't write all of it at once. *)
val writev : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit val writev : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit
(** [writev] is like {!write} but can write from any cstruct(s), (** [writev] is like {!write} but can write from any cstruct(s),
not just chunks of the pre-shared buffer. not just chunks of the pre-shared buffer.
If multiple buffers are given, they are sent in order. If multiple buffers are given, they are sent in order.
It will make multiple OS calls if the OS doesn't write all of it at once. *) It will make multiple OS calls if the OS doesn't write all of it at once. *)
val splice : ?sw:Switch.t -> FD.t -> dst:FD.t -> len:int -> int val splice : FD.t -> dst:FD.t -> len:int -> int
(** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst].
@return The number of bytes copied. @return The number of bytes copied.
@raise End_of_file [src] is at the end of the file. @raise End_of_file [src] is at the end of the file.
@raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *) @raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *)
val connect : ?sw:Switch.t -> FD.t -> Unix.sockaddr -> unit val connect : FD.t -> Unix.sockaddr -> unit
(** [connect fd addr] attempts to connect socket [fd] to [addr]. *) (** [connect fd addr] attempts to connect socket [fd] to [addr]. *)
val await_readable : ?sw:Switch.t -> FD.t -> unit val await_readable : FD.t -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *) (** [await_readable fd] blocks until [fd] is readable (or has an error). *)
val await_writable : ?sw:Switch.t -> FD.t -> unit val await_writable : FD.t -> unit
(** [await_writable fd] blocks until [fd] is writable (or has an error). *) (** [await_writable fd] blocks until [fd] is writable (or has an error). *)
val fstat : FD.t -> Unix.stats val fstat : FD.t -> Unix.stats

View File

@ -90,14 +90,14 @@ let test_iovec () =
let rec recv = function let rec recv = function
| [] -> () | [] -> ()
| cs -> | cs ->
let got = Eio_linux.readv ~sw from_pipe cs in let got = Eio_linux.readv from_pipe cs in
recv (Cstruct.shiftv cs got) recv (Cstruct.shiftv cs got)
in in
Fibre.both ~sw Fibre.both ~sw
(fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3]) (fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3])
(fun () -> (fun () ->
let b = Cstruct.of_string "barfoo" in let b = Cstruct.of_string "barfoo" in
Eio_linux.writev ~sw to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; Eio_linux.writev to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3];
Eio_linux.FD.close to_pipe Eio_linux.FD.close to_pipe
); );
Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message) Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message)

View File

@ -50,16 +50,16 @@ let or_raise_path path = function
module Suspended = struct module Suspended = struct
type 'a t = { type 'a t = {
tid : Ctf.id; fibre : Eunix.Suspended.state;
k : ('a, unit) continuation; k : ('a, unit) continuation;
} }
let continue t v = let continue t v =
Ctf.note_switch t.tid; Ctf.note_switch t.fibre.tid;
continue t.k v continue t.k v
let discontinue t ex = let discontinue t ex =
Ctf.note_switch t.tid; Ctf.note_switch t.fibre.tid;
discontinue t.k ex discontinue t.k ex
let continue_result t = function let continue_result t = function
@ -67,11 +67,14 @@ module Suspended = struct
| Error x -> discontinue t x | Error x -> discontinue t x
end end
type _ eff += Await : (('a -> unit) -> unit) -> 'a eff type _ eff += Await : (Eunix.Suspended.state -> ('a -> unit) -> unit) -> 'a eff
let await fn = perform (Await fn) let await fn = perform (Await fn)
type _ eff += Enter : ('a Suspended.t -> unit) -> 'a eff type _ eff += Enter : ('a Suspended.t -> unit) -> 'a eff
type _ eff += Enter_unchecked : ('a Suspended.t -> unit) -> 'a eff
let enter fn = perform (Enter fn) let enter fn = perform (Enter fn)
let enter_unchecked fn = perform (Enter_unchecked fn)
let await_exn fn = let await_exn fn =
perform (Await fn) |> or_raise perform (Await fn) |> or_raise
@ -88,13 +91,8 @@ let enqueue_failed_thread k ex =
let yield = Luv.Timer.init () |> or_raise in let yield = Luv.Timer.init () |> or_raise in
Luv.Timer.start yield 0 (fun () -> Suspended.discontinue k ex) |> or_raise Luv.Timer.start yield 0 (fun () -> Suspended.discontinue k ex) |> or_raise
let yield ?sw () = let with_cancel fibre ~request fn =
Option.iter Switch.check sw; let cancel = Switch.add_cancel_hook fibre.Eunix.Suspended.switch (fun _ ->
enter @@ fun k ->
enqueue_thread k ()
let with_cancel ?sw ~request fn =
let cancel = Switch.add_cancel_hook_opt sw (fun _ ->
match Luv.Request.cancel request with match Luv.Request.cancel request with
| Ok () -> () | Ok () -> ()
| Error e -> Log.debug (fun f -> f "Cancel failed: %s" (Luv.Error.strerror e)) | Error e -> Log.debug (fun f -> f "Cancel failed: %s" (Luv.Error.strerror e))
@ -120,7 +118,7 @@ module Handle = struct
let fd = get "close" t in let fd = get "close" t in
t.fd <- `Closed; t.fd <- `Closed;
Switch.remove_hook t.release_hook; Switch.remove_hook t.release_hook;
enter @@ fun k -> enter_unchecked @@ fun k ->
Luv.Handle.close fd (Suspended.continue k) Luv.Handle.close fd (Suspended.continue k)
let ensure_closed t = let ensure_closed t =
@ -156,7 +154,7 @@ module File = struct
let fd = get "close" t in let fd = get "close" t in
t.fd <- `Closed; t.fd <- `Closed;
Switch.remove_hook t.release_hook; Switch.remove_hook t.release_hook;
await_exn (Luv.File.close fd) await_exn (fun _fibre -> Luv.File.close fd)
let ensure_closed t = let ensure_closed t =
if is_open t then close t if is_open t then close t
@ -171,45 +169,45 @@ module File = struct
t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t); t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t);
t t
let await_with_cancel ~request fn =
await (fun fibre k ->
with_cancel fibre ~request (fun () -> fn k)
)
let open_ ~sw ?mode path flags = let open_ ~sw ?mode path flags =
let request = Luv.File.Request.make () in let request = Luv.File.Request.make () in
with_cancel ~sw ~request @@ fun () -> await_with_cancel ~request (Luv.File.open_ ?mode ~request path flags)
await (Luv.File.open_ ?mode ~request path flags) |> Result.map (of_luv ~sw) |> Result.map (of_luv ~sw)
let read ?sw fd bufs = let read fd bufs =
let request = Luv.File.Request.make () in let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () -> await_with_cancel ~request (Luv.File.read ~request (get "read" fd) bufs)
await (Luv.File.read ~request (get "read" fd) bufs)
let rec write ?sw fd bufs = let rec write fd bufs =
let request = Luv.File.Request.make () in let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () -> let sent = await_with_cancel ~request (Luv.File.write ~request (get "write" fd) bufs) |> or_raise in
let sent = await_exn (Luv.File.write ~request (get "write" fd) bufs) in
let rec aux = function let rec aux = function
| [] -> () | [] -> ()
| x :: xs when Luv.Buffer.size x = 0 -> aux xs | x :: xs when Luv.Buffer.size x = 0 -> aux xs
| bufs -> write ?sw fd bufs | bufs -> write fd bufs
in in
aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent) aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent)
let realpath ?sw path = let realpath path =
let request = Luv.File.Request.make () in let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () -> await_with_cancel ~request (Luv.File.realpath ~request path)
await (Luv.File.realpath ~request path)
let mkdir ?sw ~mode path = let mkdir ~mode path =
let request = Luv.File.Request.make () in let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () -> await_with_cancel ~request (Luv.File.mkdir ~request ~mode path)
await (Luv.File.mkdir ~request ~mode path)
end end
module Stream = struct module Stream = struct
type 'a t = [`Stream of 'a] Handle.t type 'a t = [`Stream of 'a] Handle.t
let rec read_into ?sw (sock:'a t) buf = let rec read_into (sock:'a t) buf =
Option.iter Switch.check sw;
let r = enter (fun k -> let r = enter (fun k ->
let cancel = Switch.add_cancel_hook_opt sw (fun ex -> let cancel = Switch.add_cancel_hook k.fibre.switch (fun ex ->
Luv.Stream.read_stop (Handle.get "read_into:cancel" sock) |> or_raise; Luv.Stream.read_stop (Handle.get "read_into:cancel" sock) |> or_raise;
enqueue_failed_thread k (Switch.Cancelled ex) enqueue_failed_thread k (Switch.Cancelled ex)
) in ) in
@ -223,7 +221,7 @@ module Stream = struct
| Ok buf' -> | Ok buf' ->
let len = Luv.Buffer.size buf' in let len = Luv.Buffer.size buf' in
if len > 0 then len if len > 0 then len
else read_into ?sw sock buf (* Luv uses a zero-length read to mean EINTR! *) else read_into sock buf (* Luv uses a zero-length read to mean EINTR! *)
| Error `EOF -> raise End_of_file | Error `EOF -> raise End_of_file
| Error (`ECONNRESET as e) -> raise (Eio.Net.Connection_reset (Luv_error e)) | Error (`ECONNRESET as e) -> raise (Eio.Net.Connection_reset (Luv_error e))
| Error x -> raise (Luv_error x) | Error x -> raise (Luv_error x)
@ -232,7 +230,7 @@ module Stream = struct
| empty :: xs when Luv.Buffer.size empty = 0 -> skip_empty xs | empty :: xs when Luv.Buffer.size empty = 0 -> skip_empty xs
| xs -> xs | xs -> xs
let rec write ?sw t bufs = let rec write t bufs =
let err, n = let err, n =
(* note: libuv doesn't seem to allow cancelling stream writes *) (* note: libuv doesn't seem to allow cancelling stream writes *)
enter (fun k -> enter (fun k ->
@ -243,15 +241,14 @@ module Stream = struct
or_raise err; or_raise err;
match Luv.Buffer.drop bufs n |> skip_empty with match Luv.Buffer.drop bufs n |> skip_empty with
| [] -> () | [] -> ()
| bufs -> write ?sw t bufs | bufs -> write t bufs
end end
let sleep_until ?sw due = let sleep_until due =
Option.iter Switch.check sw;
let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in
let timer = Luv.Timer.init () |> or_raise in let timer = Luv.Timer.init () |> or_raise in
enter @@ fun k -> enter @@ fun k ->
let cancel = Switch.add_cancel_hook_opt sw (fun ex -> let cancel = Switch.add_cancel_hook k.fibre.switch (fun ex ->
Luv.Timer.stop timer |> or_raise; Luv.Timer.stop timer |> or_raise;
Luv.Handle.close timer (fun () -> ()); Luv.Handle.close timer (fun () -> ());
enqueue_failed_thread k ex enqueue_failed_thread k ex
@ -291,21 +288,21 @@ module Objects = struct
| FD -> Some fd | FD -> Some fd
| _ -> None | _ -> None
method read_into ?sw buf = method read_into buf =
let buf = Cstruct.to_bigarray buf in let buf = Cstruct.to_bigarray buf in
match File.read ?sw fd [buf] |> or_raise |> Unsigned.Size_t.to_int with match File.read fd [buf] |> or_raise |> Unsigned.Size_t.to_int with
| 0 -> raise End_of_file | 0 -> raise End_of_file
| got -> got | got -> got
method read_methods = [] method read_methods = []
method write ?sw src = method write src =
let buf = Luv.Buffer.create 4096 in let buf = Luv.Buffer.create 4096 in
try try
while true do while true do
let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in
let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in
File.write ?sw fd [sub] File.write fd [sub]
done done
with End_of_file -> () with End_of_file -> ()
end end
@ -316,19 +313,19 @@ module Objects = struct
let socket sock = object let socket sock = object
inherit Eio.Flow.two_way inherit Eio.Flow.two_way
method read_into ?sw buf = method read_into buf =
let buf = Cstruct.to_bigarray buf in let buf = Cstruct.to_bigarray buf in
Stream.read_into ?sw sock buf Stream.read_into sock buf
method read_methods = [] method read_methods = []
method write ?sw src = method write src =
let buf = Luv.Buffer.create 4096 in let buf = Luv.Buffer.create 4096 in
try try
while true do while true do
let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in
let buf' = Luv.Buffer.sub buf ~offset:0 ~length:got in let buf' = Luv.Buffer.sub buf ~offset:0 ~length:got in
Stream.write ?sw sock [buf'] Stream.write sock [buf']
done done
with End_of_file -> () with End_of_file -> ()
@ -336,11 +333,11 @@ module Objects = struct
Handle.close sock Handle.close sock
method shutdown = function method shutdown = function
| `Send -> await_exn @@ Luv.Stream.shutdown (Handle.get "shutdown" sock) | `Send -> await_exn (fun _fibre -> Luv.Stream.shutdown (Handle.get "shutdown" sock))
| `Receive -> failwith "shutdown receive not supported" | `Receive -> failwith "shutdown receive not supported"
| `All -> | `All ->
Log.warn (fun f -> f "shutdown receive not supported"); Log.warn (fun f -> f "shutdown receive not supported");
await_exn @@ Luv.Stream.shutdown (Handle.get "shutdown" sock) await_exn (fun _fibre -> Luv.Stream.shutdown (Handle.get "shutdown" sock))
end end
class virtual ['a] listening_socket ~backlog sock = object (self) class virtual ['a] listening_socket ~backlog sock = object (self)
@ -438,11 +435,11 @@ module Objects = struct
| `Tcp (host, port) -> | `Tcp (host, port) ->
let sock = Luv.TCP.init () |> or_raise |> Handle.of_luv ~sw in let sock = Luv.TCP.init () |> or_raise |> Handle.of_luv ~sw in
let addr = luv_addr_of_unix host port in let addr = luv_addr_of_unix host port in
await_exn (Luv.TCP.connect (Handle.get "connect" sock) addr); await_exn (fun _fibre -> Luv.TCP.connect (Handle.get "connect" sock) addr);
socket sock socket sock
| `Unix path -> | `Unix path ->
let sock = Luv.Pipe.init () |> or_raise |> Handle.of_luv ~sw in let sock = Luv.Pipe.init () |> or_raise |> Handle.of_luv ~sw in
await_exn (Luv.Pipe.connect (Handle.get "connect" sock) path); await_exn (fun _fibre -> Luv.Pipe.connect (Handle.get "connect" sock) path);
socket sock socket sock
end end
@ -498,10 +495,10 @@ module Objects = struct
(* Resolve a relative path to an absolute one, with no symlinks. (* Resolve a relative path to an absolute one, with no symlinks.
@raise Eio.Dir.Permission_denied if it's outside of [dir_path]. *) @raise Eio.Dir.Permission_denied if it's outside of [dir_path]. *)
method private resolve ?sw path = method private resolve path =
if Filename.is_relative path then ( if Filename.is_relative path then (
let dir_path = File.realpath ?sw dir_path |> or_raise_path dir_path in let dir_path = File.realpath dir_path |> or_raise_path dir_path in
let full = File.realpath ?sw (Filename.concat dir_path path) |> or_raise_path path in let full = File.realpath (Filename.concat dir_path path) |> or_raise_path path in
let prefix_len = String.length dir_path + 1 in let prefix_len = String.length dir_path + 1 in
if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then
full full
@ -514,16 +511,16 @@ module Objects = struct
) )
(* We want to create [path]. Check that the parent is in the sandbox. *) (* We want to create [path]. Check that the parent is in the sandbox. *)
method private resolve_new ?sw path = method private resolve_new path =
let dir, leaf = Filename.dirname path, Filename.basename path in let dir, leaf = Filename.dirname path, Filename.basename path in
if leaf = ".." then Fmt.failwith "New path %S ends in '..'!" path if leaf = ".." then Fmt.failwith "New path %S ends in '..'!" path
else match self#resolve ?sw dir with else match self#resolve dir with
| dir -> Filename.concat dir leaf | dir -> Filename.concat dir leaf
| exception Eio.Dir.Permission_denied (dir, ex) -> | exception Eio.Dir.Permission_denied (dir, ex) ->
raise (Eio.Dir.Permission_denied (Filename.concat dir leaf, ex)) raise (Eio.Dir.Permission_denied (Filename.concat dir leaf, ex))
method open_in ~sw path = method open_in ~sw path =
let fd = File.open_ ~sw (self#resolve ~sw path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in let fd = File.open_ ~sw (self#resolve path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in
(flow fd :> <Eio.Flow.source; Eio.Flow.close>) (flow fd :> <Eio.Flow.source; Eio.Flow.close>)
method open_out ~sw ~append ~create path = method open_out ~sw ~append ~create path =
@ -545,11 +542,11 @@ module Objects = struct
method open_dir ~sw path = method open_dir ~sw path =
Switch.check sw; Switch.check sw;
new dir (self#resolve ~sw path) new dir (self#resolve path)
(* libuv doesn't seem to provide a race-free way to do this. *) (* libuv doesn't seem to provide a race-free way to do this. *)
method mkdir ?sw ~perm path = method mkdir ~perm path =
let real_path = self#resolve_new ?sw path in let real_path = self#resolve_new path in
File.mkdir ~mode:[`NUMERIC perm] real_path |> or_raise_path path File.mkdir ~mode:[`NUMERIC perm] real_path |> or_raise_path path
method close = () method close = ()
@ -560,7 +557,7 @@ module Objects = struct
inherit dir "/" inherit dir "/"
(* No checks *) (* No checks *)
method! private resolve ?sw:_ path = path method! private resolve path = path
end end
let cwd = object let cwd = object
@ -586,8 +583,9 @@ end
let run main = let run main =
Log.debug (fun l -> l "starting run"); Log.debug (fun l -> l "starting run");
let stdenv = Objects.stdenv () in let stdenv = Objects.stdenv () in
let rec fork ~tid fn = let rec fork ~tid ~switch:initial_switch fn =
Ctf.note_switch tid; Ctf.note_switch tid;
let fibre = { Eunix.Suspended.tid; switch = initial_switch } in
match_with fn () match_with fn ()
{ retc = (fun () -> ()); { retc = (fun () -> ());
exnc = (fun e -> raise e); exnc = (fun e -> raise e);
@ -595,19 +593,20 @@ let run main =
match e with match e with
| Await fn -> | Await fn ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
fn (Suspended.continue k)) fn fibre (Suspended.continue k))
| Eio.Private.Effects.Trace -> | Eio.Private.Effects.Trace ->
Some (fun k -> continue k Eunix.Trace.default_traceln) Some (fun k -> continue k Eunix.Trace.default_traceln)
| Eio.Private.Effects.Fork f -> | Eio.Private.Effects.Fork f ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
let id = Ctf.mint_id () in let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Task; Ctf.note_created id Ctf.Task;
let promise, resolver = Promise.create_with_id id in let promise, resolver = Promise.create_with_id id in
enqueue_thread k promise; enqueue_thread k promise;
fork fork
~tid:id ~tid:id
~switch:fibre.switch
(fun () -> (fun () ->
match f () with match f () with
| x -> Promise.fulfill resolver x | x -> Promise.fulfill resolver x
@ -617,28 +616,60 @@ let run main =
)) ))
| Eio.Private.Effects.Fork_ignore f -> | Eio.Private.Effects.Fork_ignore f ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in let k = { Suspended.k; fibre } in
enqueue_thread k (); enqueue_thread k ();
let child = Ctf.note_fork () in let child = Ctf.note_fork () in
Ctf.note_switch child; Ctf.note_switch child;
fork ~tid:child (fun () -> fork ~tid:child ~switch:fibre.switch (fun () ->
match f () with match f () with
| () -> | () ->
Ctf.note_resolved child ~ex:None Ctf.note_resolved child ~ex:None
| exception ex -> | exception ex ->
Ctf.note_resolved child ~ex:(Some ex) Ctf.note_resolved child ~ex:(Some ex)
)) ))
| Enter fn -> Some (fun k -> fn { Suspended.k; tid }) | Eio.Private.Effects.Set_switch switch ->
Some (fun k ->
let old = fibre.switch in
fibre.switch <- switch;
continue k old
)
| Enter_unchecked fn -> Some (fun k ->
fn { Suspended.k; fibre }
)
| Enter fn -> Some (fun k ->
match Switch.get_error fibre.switch with
| Some e -> discontinue k e
| None -> fn { Suspended.k; fibre }
)
| Eio.Private.Effects.Yield ->
Some (fun k ->
let yield = Luv.Timer.init () |> or_raise in
Luv.Timer.start yield 0 (fun () ->
match Switch.get_error fibre.switch with
| Some e -> discontinue k e
| None -> continue k ()
)
|> or_raise
)
| Eio.Private.Effects.Suspend_unchecked fn ->
Some (fun k ->
let k = { Suspended.k; fibre } in
fn tid (enqueue_result_thread k)
)
| Eio.Private.Effects.Suspend fn -> | Eio.Private.Effects.Suspend fn ->
Some (fun k -> Some (fun k ->
let k = { Suspended.k; tid } in begin match Switch.get_error fibre.switch with
fn tid (enqueue_result_thread k)) | Some e -> discontinue k e
| None ->
let k = { Suspended.k; fibre } in
fn tid (enqueue_result_thread k)
end)
| _ -> None | _ -> None
} }
in in
let main_status = ref `Running in let main_status = ref `Running in
fork ~tid:(Ctf.mint_id ()) (fun () -> fork ~tid:(Ctf.mint_id ()) ~switch:Eio.Private.boot_switch (fun () ->
match main stdenv with match Switch.top (fun _sw -> main stdenv) with
| () -> main_status := `Done | () -> main_status := `Done
| exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ()) | exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ())
); );

View File

@ -23,17 +23,15 @@ exception Luv_error of Luv.Error.t
val or_raise : 'a or_error -> 'a val or_raise : 'a or_error -> 'a
(** [or_error (Error e)] raises [Luv_error e]. *) (** [or_error (Error e)] raises [Luv_error e]. *)
val await : (('a -> unit) -> unit) -> 'a val await : (Eunix.Suspended.state -> ('a -> unit) -> unit) -> 'a
(** [await fn] converts a function using a luv-style callback to one using effects. (** [await fn] converts a function using a luv-style callback to one using effects.
Use it as e.g. [await (Luv.File.realpath path)]. *) Use it as e.g. [await (fun fibre -> Luv.File.realpath path)].
Use [fibre] to implement cancellation. *)
(** {1 Time functions} *) (** {1 Time functions} *)
val sleep_until : ?sw:Switch.t -> float -> unit val sleep_until : float -> unit
(** [sleep_until time] blocks until the current time is [time]. (** [sleep_until time] blocks until the current time is [time]. *)
@param sw Cancel the sleep if [sw] is turned off. *)
val yield : ?sw:Switch.t -> unit -> unit
(** {1 Low-level wrappers for Luv functions} *) (** {1 Low-level wrappers for Luv functions} *)
@ -63,16 +61,16 @@ module File : sig
string -> Luv.File.Open_flag.t list -> t or_error string -> Luv.File.Open_flag.t list -> t or_error
(** Wraps {!Luv.File.open_} *) (** Wraps {!Luv.File.open_} *)
val read : ?sw:Switch.t -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error val read : t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
(** Wraps {!Luv.File.read} *) (** Wraps {!Luv.File.read} *)
val write : ?sw:Switch.t -> t -> Luv.Buffer.t list -> unit val write : t -> Luv.Buffer.t list -> unit
(** [write t bufs] writes all the data in [bufs] (which may take several calls to {!Luv.File.write}). *) (** [write t bufs] writes all the data in [bufs] (which may take several calls to {!Luv.File.write}). *)
val realpath : ?sw:Switch.t -> string -> string or_error val realpath : string -> string or_error
(** Wraps {!Luv.File.realpath} *) (** Wraps {!Luv.File.realpath} *)
val mkdir : ?sw:Switch.t -> mode:Luv.File.Mode.t list -> string -> unit or_error val mkdir : mode:Luv.File.Mode.t list -> string -> unit or_error
(** Wraps {!Luv.File.mkdir} *) (** Wraps {!Luv.File.mkdir} *)
end end

View File

@ -6,12 +6,12 @@
``` ```
```ocaml ```ocaml
let rec read_exactly ~sw fd buf = let rec read_exactly fd buf =
let size = Luv.Buffer.size buf in let size = Luv.Buffer.size buf in
if size > 0 then ( if size > 0 then (
let got = Eio_luv.File.read ~sw fd [buf] |> Eio_luv.or_raise |> Unsigned.Size_t.to_int in let got = Eio_luv.File.read fd [buf] |> Eio_luv.or_raise |> Unsigned.Size_t.to_int in
let next = Luv.Buffer.sub buf ~offset:got ~length:(size - got) in let next = Luv.Buffer.sub buf ~offset:got ~length:(size - got) in
read_exactly ~sw fd next read_exactly fd next
) )
let () = let () =
@ -35,7 +35,7 @@ let main _stdenv =
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
let fd = Eio_luv.File.open_ ~sw "/dev/zero" [] |> Eio_luv.or_raise in let fd = Eio_luv.File.open_ ~sw "/dev/zero" [] |> Eio_luv.or_raise in
let buf = Luv.Buffer.create 4 in let buf = Luv.Buffer.create 4 in
read_exactly ~sw fd buf; read_exactly fd buf;
traceln "Read %S" (Luv.Buffer.to_string buf); traceln "Read %S" (Luv.Buffer.to_string buf);
Eio_luv.File.close fd Eio_luv.File.close fd
``` ```

View File

@ -1,14 +1,19 @@
open EffectHandlers.Deep open EffectHandlers.Deep
type 'a t = { type state = {
tid : Ctf.id; tid : Ctf.id;
mutable switch : Eio.Std.Switch.t;
}
type 'a t = {
fibre : state;
k : ('a, [`Exit_scheduler]) continuation; k : ('a, [`Exit_scheduler]) continuation;
} }
let continue t v = let continue t v =
Ctf.note_switch t.tid; Ctf.note_switch t.fibre.tid;
continue t.k v continue t.k v
let discontinue t ex = let discontinue t ex =
Ctf.note_switch t.tid; Ctf.note_switch t.fibre.tid;
discontinue t.k ex discontinue t.k ex

View File

@ -22,22 +22,22 @@ let run (fn : sw:Switch.t -> Eio.Stdenv.t -> unit) =
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
fn ~sw env fn ~sw env
let read_all ?sw flow = let read_all flow =
let b = Buffer.create 100 in let b = Buffer.create 100 in
Eio.Flow.copy ?sw flow (Eio.Flow.buffer_sink b); Eio.Flow.copy flow (Eio.Flow.buffer_sink b);
Buffer.contents b Buffer.contents b
let write_file ?sw ~create ?append dir path content = let write_file ~create ?append dir path content =
Eio.Dir.with_open_out ?sw ~create ?append dir path @@ fun flow -> Eio.Dir.with_open_out ~create ?append dir path @@ fun flow ->
Eio.Flow.copy_string content flow Eio.Flow.copy_string content flow
let try_write_file ~sw ~create ?append dir path content = let try_write_file ~create ?append dir path content =
match write_file ~sw ~create ?append dir path content with match write_file ~create ?append dir path content with
| () -> traceln "write %S -> ok" path | () -> traceln "write %S -> ok" path
| exception ex -> traceln "write %S -> %a" path Fmt.exn ex | exception ex -> traceln "write %S -> %a" path Fmt.exn ex
let read_file ?sw dir path = let read_file dir path =
Eio.Dir.with_open_in ?sw dir path read_all Eio.Dir.with_open_in dir path read_all
let try_mkdir dir path = let try_mkdir dir path =
match Eio.Dir.mkdir dir path ~perm:0o700 with match Eio.Dir.mkdir dir path ~perm:0o700 with
@ -55,8 +55,8 @@ Creating a file and reading it back:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`Exclusive 0o666) cwd "test-file" "my-data"; write_file ~create:(`Exclusive 0o666) cwd "test-file" "my-data";
traceln "Got %S" @@ read_file ~sw cwd "test-file";; traceln "Got %S" @@ read_file cwd "test-file";;
+Got "my-data" +Got "my-data"
- : unit = () - : unit = ()
``` ```
@ -74,7 +74,7 @@ Trying to use cwd to access a file outside of that subtree fails:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`Exclusive 0o666) cwd "../test-file" "my-data"; write_file ~create:(`Exclusive 0o666) cwd "../test-file" "my-data";
failwith "Should have failed";; failwith "Should have failed";;
Exception: Eio.Dir.Permission_denied ("../test-file", _) Exception: Eio.Dir.Permission_denied ("../test-file", _)
``` ```
@ -83,7 +83,7 @@ Trying to use cwd to access an absolute path fails:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`Exclusive 0o666) cwd "/tmp/test-file" "my-data"; write_file ~create:(`Exclusive 0o666) cwd "/tmp/test-file" "my-data";
failwith "Should have failed";; failwith "Should have failed";;
Exception: Eio.Dir.Permission_denied ("/tmp/test-file", _) Exception: Eio.Dir.Permission_denied ("/tmp/test-file", _)
``` ```
@ -94,8 +94,8 @@ Exclusive create fails if already exists:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`Exclusive 0o666) cwd "test-file" "first-write"; write_file ~create:(`Exclusive 0o666) cwd "test-file" "first-write";
write_file ~sw ~create:(`Exclusive 0o666) cwd "test-file" "first-write"; write_file ~create:(`Exclusive 0o666) cwd "test-file" "first-write";
failwith "Should have failed";; failwith "Should have failed";;
Exception: Eio.Dir.Already_exists ("test-file", _) Exception: Eio.Dir.Already_exists ("test-file", _)
``` ```
@ -104,9 +104,9 @@ If-missing create succeeds if already exists:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`If_missing 0o666) cwd "test-file" "1st-write-original"; write_file ~create:(`If_missing 0o666) cwd "test-file" "1st-write-original";
write_file ~sw ~create:(`If_missing 0o666) cwd "test-file" "2nd-write"; write_file ~create:(`If_missing 0o666) cwd "test-file" "2nd-write";
traceln "Got %S" @@ read_file ~sw cwd "test-file";; traceln "Got %S" @@ read_file cwd "test-file";;
+Got "2nd-write-original" +Got "2nd-write-original"
- : unit = () - : unit = ()
``` ```
@ -115,9 +115,9 @@ Truncate create succeeds if already exists, and truncates:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original"; write_file ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original";
write_file ~sw ~create:(`Or_truncate 0o666) cwd "test-file" "2nd-write"; write_file ~create:(`Or_truncate 0o666) cwd "test-file" "2nd-write";
traceln "Got %S" @@ read_file ~sw cwd "test-file";; traceln "Got %S" @@ read_file cwd "test-file";;
+Got "2nd-write" +Got "2nd-write"
- : unit = () - : unit = ()
# Unix.unlink "test-file";; # Unix.unlink "test-file";;
@ -128,8 +128,8 @@ Error if no create and doesn't exist:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:`Never cwd "test-file" "1st-write-original"; write_file ~create:`Never cwd "test-file" "1st-write-original";
traceln "Got %S" @@ read_file ~sw cwd "test-file";; traceln "Got %S" @@ read_file cwd "test-file";;
Exception: Eio.Dir.Not_found ("test-file", _) Exception: Eio.Dir.Not_found ("test-file", _)
``` ```
@ -137,9 +137,9 @@ Appending to an existing file:
```ocaml ```ocaml
# run @@ fun ~sw env -> # run @@ fun ~sw env ->
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
write_file ~sw ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original"; write_file ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original";
write_file ~sw ~create:`Never ~append:true cwd "test-file" "2nd-write"; write_file ~create:`Never ~append:true cwd "test-file" "2nd-write";
traceln "Got %S" @@ read_file ~sw cwd "test-file";; traceln "Got %S" @@ read_file cwd "test-file";;
+Got "1st-write-original2nd-write" +Got "1st-write-original2nd-write"
- : unit = () - : unit = ()
# Unix.unlink "test-file";; # Unix.unlink "test-file";;
@ -153,7 +153,7 @@ Appending to an existing file:
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
try_mkdir cwd "subdir"; try_mkdir cwd "subdir";
try_mkdir cwd "subdir/nested"; try_mkdir cwd "subdir/nested";
write_file ~sw ~create:(`Exclusive 0o600) cwd "subdir/nested/test-file" "data"; write_file ~create:(`Exclusive 0o600) cwd "subdir/nested/test-file" "data";
();; ();;
+mkdir "subdir" -> ok +mkdir "subdir" -> ok
+mkdir "subdir/nested" -> ok +mkdir "subdir/nested" -> ok
@ -196,9 +196,9 @@ Create a sandbox, write a file with it, then read it from outside:
let cwd = Eio.Stdenv.cwd env in let cwd = Eio.Stdenv.cwd env in
try_mkdir cwd "sandbox"; try_mkdir cwd "sandbox";
let subdir = Eio.Dir.open_dir ~sw cwd "sandbox" in let subdir = Eio.Dir.open_dir ~sw cwd "sandbox" in
write_file ~sw ~create:(`Exclusive 0o600) subdir "test-file" "data"; write_file ~create:(`Exclusive 0o600) subdir "test-file" "data";
try_mkdir subdir "../new-sandbox"; try_mkdir subdir "../new-sandbox";
traceln "Got %S" @@ read_file ~sw cwd "sandbox/test-file";; traceln "Got %S" @@ read_file cwd "sandbox/test-file";;
+mkdir "sandbox" -> ok +mkdir "sandbox" -> ok
+mkdir "../new-sandbox" -> Eio.Dir.Permission_denied ("../new-sandbox", _) +mkdir "../new-sandbox" -> Eio.Dir.Permission_denied ("../new-sandbox", _)
+Got "data" +Got "data"
@ -217,9 +217,9 @@ Using `cwd` we can't access the parent, but using `fs` we can:
chdir "fs-test"; chdir "fs-test";
Fun.protect ~finally:(fun () -> chdir "..") (fun () -> Fun.protect ~finally:(fun () -> chdir "..") (fun () ->
try_mkdir cwd "../outside-cwd"; try_mkdir cwd "../outside-cwd";
try_write_file ~sw ~create:(`Exclusive 0o600) cwd "../test-file" "data"; try_write_file ~create:(`Exclusive 0o600) cwd "../test-file" "data";
try_mkdir fs "../outside-cwd"; try_mkdir fs "../outside-cwd";
try_write_file ~sw ~create:(`Exclusive 0o600) fs "../test-file" "data"; try_write_file ~create:(`Exclusive 0o600) fs "../test-file" "data";
); );
Unix.unlink "test-file"; Unix.unlink "test-file";
Unix.rmdir "outside-cwd";; Unix.rmdir "outside-cwd";;

View File

@ -14,9 +14,9 @@ let run (fn : net:Eio.Net.t -> Switch.t -> unit) =
let addr = `Tcp (Unix.inet_addr_loopback, 8081) let addr = `Tcp (Unix.inet_addr_loopback, 8081)
let read_all ?sw flow = let read_all flow =
let b = Buffer.create 100 in let b = Buffer.create 100 in
Eio.Flow.copy ?sw flow (Eio.Flow.buffer_sink b); Eio.Flow.copy flow (Eio.Flow.buffer_sink b);
Buffer.contents b Buffer.contents b
exception Graceful_shutdown exception Graceful_shutdown
@ -32,7 +32,7 @@ let run_client ~sw ~net ~addr =
let flow = Eio.Net.connect ~sw net addr in let flow = Eio.Net.connect ~sw net addr in
Eio.Flow.copy_string "Hello from client" flow; Eio.Flow.copy_string "Hello from client" flow;
Eio.Flow.shutdown flow `Send; Eio.Flow.shutdown flow `Send;
let msg = read_all ~sw flow in let msg = read_all flow in
traceln "Client received: %S" msg traceln "Client received: %S" msg
``` ```
@ -44,7 +44,7 @@ let run_server ~sw socket =
Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr -> Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr ->
traceln "Server accepted connection from client"; traceln "Server accepted connection from client";
Fun.protect (fun () -> Fun.protect (fun () ->
let msg = read_all ~sw flow in let msg = read_all flow in
traceln "Server received: %S" msg traceln "Server received: %S" msg
) ~finally:(fun () -> Eio.Flow.copy_string "Bye" flow) ) ~finally:(fun () -> Eio.Flow.copy_string "Bye" flow)
) )
@ -61,7 +61,7 @@ let test_address addr ~net sw =
(fun () -> (fun () ->
run_client ~sw ~net ~addr; run_client ~sw ~net ~addr;
traceln "Client finished - cancelling server"; traceln "Client finished - cancelling server";
Switch.turn_off sw Graceful_shutdown raise Graceful_shutdown
) )
``` ```
@ -106,24 +106,29 @@ Cancelling the read:
```ocaml ```ocaml
# run @@ fun ~net sw -> # run @@ fun ~net sw ->
Switch.top @@ fun read_switch -> let shutdown, set_shutdown = Promise.create () in
let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
Fibre.both ~sw Fibre.both ~sw
(fun () -> (fun () ->
Eio.Net.accept_sub server ~sw (fun ~sw flow _addr -> Eio.Net.accept_sub server ~sw (fun ~sw flow _addr ->
try try
let msg = read_all ~sw:read_switch flow in Fibre.both ~sw
traceln "Server received: %S" msg (fun () -> raise (Promise.await shutdown))
with Switch.Cancelled Graceful_shutdown -> (fun () ->
Eio.Flow.copy_string "Request cancelled" flow let msg = read_all flow in
) ~on_error:raise traceln "Server received: %S" msg
)
with Graceful_shutdown ->
Switch.top @@ fun _sw ->
Eio.Flow.copy_string "Request cancelled" flow;
) ~on_error:raise
) )
(fun () -> (fun () ->
traceln "Connecting to server..."; traceln "Connecting to server...";
let flow = Eio.Net.connect ~sw net addr in let flow = Eio.Net.connect ~sw net addr in
traceln "Connection opened - cancelling server's read"; traceln "Connection opened - cancelling server's read";
Fibre.yield (); Fibre.yield ();
Switch.turn_off read_switch Graceful_shutdown; Promise.fulfill set_shutdown Graceful_shutdown;
let msg = read_all flow in let msg = read_all flow in
traceln "Client received: %S" msg traceln "Client received: %S" msg
);; );;

View File

@ -42,8 +42,8 @@ Exception: Failure "Cancel".
```ocaml ```ocaml
# run (fun sw -> # run (fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> for i = 1 to 2 do traceln "i = %d" i; Fibre.yield ~sw () done) (fun () -> for i = 1 to 2 do traceln "i = %d" i; Fibre.yield () done)
(fun () -> for j = 1 to 2 do traceln "j = %d" j; Fibre.yield ~sw () done) (fun () -> for j = 1 to 2 do traceln "j = %d" j; Fibre.yield () done)
);; );;
+i = 1 +i = 1
+j = 1 +j = 1
@ -57,7 +57,7 @@ Exception: Failure "Cancel".
```ocaml ```ocaml
# run (fun sw -> # run (fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield ~sw () done) (fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield () done)
(fun () -> failwith "Failed") (fun () -> failwith "Failed")
);; );;
+i = 1 +i = 1
@ -69,8 +69,8 @@ Exception: Failure "Failed".
```ocaml ```ocaml
# run (fun sw -> # run (fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> Fibre.yield ~sw (); failwith "Failed") (fun () -> Fibre.yield (); failwith "Failed")
(fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield ~sw () done) (fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield () done)
);; );;
+i = 1 +i = 1
Exception: Failure "Failed". Exception: Failure "Failed".
@ -239,7 +239,7 @@ A child can fail independently of the parent:
Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> traceln "Child 2"; Promise.await ~sw p2); Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> traceln "Child 2"; Promise.await ~sw p2);
Promise.break r1 (Failure "Child error"); Promise.break r1 (Failure "Child error");
Promise.fulfill r2 (); Promise.fulfill r2 ();
Fibre.yield ~sw (); Fibre.yield ();
traceln "Parent fibre is still running" traceln "Parent fibre is still running"
);; );;
+Child 1 +Child 1
@ -262,7 +262,7 @@ A child can be cancelled independently of the parent:
Promise.await ~sw p Promise.await ~sw p
); );
Switch.turn_off (Option.get !child) (Failure "Cancel child"); Switch.turn_off (Option.get !child) (Failure "Cancel child");
Fibre.yield ~sw (); Fibre.yield ();
traceln "Parent fibre is still running" traceln "Parent fibre is still running"
);; );;
+Child 1 +Child 1
@ -279,7 +279,7 @@ A child error handle raises:
let on_error = raise in let on_error = raise in
Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> traceln "Child"; Promise.await ~sw p); Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> traceln "Child"; Promise.await ~sw p);
Promise.break r (Failure "Child error escapes"); Promise.break r (Failure "Child error escapes");
Fibre.yield ~sw (); Fibre.yield ();
traceln "Not reached" traceln "Not reached"
);; );;
+Child +Child

View File

@ -30,9 +30,8 @@ Check sleep works with a switch:
```ocaml ```ocaml
# run @@ fun ~clock -> # run @@ fun ~clock ->
Switch.top @@ fun sw ->
let t0 = Unix.gettimeofday () in let t0 = Unix.gettimeofday () in
Eio.Time.sleep ~sw clock 0.01; Eio.Time.sleep clock 0.01;
let t1 = Unix.gettimeofday () in let t1 = Unix.gettimeofday () in
assert (t1 -. t0 >= 0.01);; assert (t1 -. t0 >= 0.01);;
- : unit = () - : unit = ()
@ -44,8 +43,8 @@ Cancelling sleep:
# run @@ fun ~clock -> # run @@ fun ~clock ->
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> Eio.Time.sleep ~sw clock 1200.; assert false) (fun () -> Eio.Time.sleep clock 1200.; assert false)
(fun () -> Switch.turn_off sw (Failure "Simulated cancel"));; (fun () -> failwith "Simulated cancel");;
Exception: Failure "Simulated cancel". Exception: Failure "Simulated cancel".
``` ```
@ -55,7 +54,7 @@ Switch is already off:
# run @@ fun ~clock -> # run @@ fun ~clock ->
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
Switch.turn_off sw (Failure "Simulated failure"); Switch.turn_off sw (Failure "Simulated failure");
Eio.Time.sleep ~sw clock 1200.0; Eio.Time.sleep clock 1200.0;
assert false;; assert false;;
Exception: Failure "Simulated failure". Exception: Failure "Simulated failure".
``` ```
@ -66,7 +65,7 @@ Scheduling a timer that's already due:
# run @@ fun ~clock -> # run @@ fun ~clock ->
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> traceln "First fibre runs"; Eio.Time.sleep ~sw clock (-1.0); traceln "Sleep done") (fun () -> traceln "First fibre runs"; Eio.Time.sleep clock (-1.0); traceln "Sleep done")
(fun () -> traceln "Second fibre runs");; (fun () -> traceln "Second fibre runs");;
+First fibre runs +First fibre runs
+Second fibre runs +Second fibre runs
@ -81,13 +80,13 @@ Check ordering works:
Switch.top @@ fun sw -> Switch.top @@ fun sw ->
Fibre.both ~sw Fibre.both ~sw
(fun () -> (fun () ->
Eio.Time.sleep ~sw clock 1200.0; Eio.Time.sleep clock 1200.0;
assert false assert false
) )
(fun () -> (fun () ->
Eio.Time.sleep clock 0.1; Eio.Time.sleep clock 0.1;
traceln "Short timer finished"; traceln "Short timer finished";
Switch.turn_off sw (Failure "Simulated cancel") failwith "Simulated cancel"
);; );;
+Short timer finished +Short timer finished
Exception: Failure "Simulated cancel". Exception: Failure "Simulated cancel".