diff --git a/README.md b/README.md index 534928b..c30a525 100644 --- a/README.md +++ b/README.md @@ -155,8 +155,8 @@ Here's an example running two threads of execution (fibres) concurrently: let main _env = Switch.top @@ fun sw -> Fibre.both ~sw - (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done) - (fun () -> for y = 1 to 3 do traceln "y = %d" y; Fibre.yield ~sw () done);; + (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield () done) + (fun () -> for y = 1 to 3 do traceln "y = %d" y; Fibre.yield () done);; ``` ```ocaml @@ -170,12 +170,8 @@ let main _env = - : unit = () ``` -Notes: - -- The two fibres run on a single core, so only one can be running at a time. - Calling an operation that performs an effect (such as `yield`) can switch to a different thread. - -- The `sw` argument is used to handle exceptions (described later). +The two fibres run on a single core, so only one can be running at a time. +Calling an operation that performs an effect (such as `yield`) can switch to a different thread. ## Tracing @@ -219,7 +215,7 @@ Here's what happens if one of the two threads above fails: # Eio_main.run @@ fun _env -> Switch.top @@ fun sw -> Fibre.both ~sw - (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done) + (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield () done) (fun () -> failwith "Simulated error");; +x = 1 Exception: Failure "Simulated error". @@ -230,24 +226,19 @@ What happened here was: 1. The first fibre ran, printed `x = 1` and yielded. 2. The second fibre raised an exception. 3. `Fibre.both` caught the exception and turned off the switch. -4. The first thread's `yield` saw the switch was off and raised the exception there too. +4. The first thread's `yield` saw the switch was off and raised a `Cancelled` exception there. 5. Once both threads had finished, `Fibre.both` re-raised the exception. -Please note: turning off a switch only asks the other thread(s) to cancel. -A thread is free to ignore the switch and continue (perhaps to clean up some resources). - -Any operation that can be cancelled should take a `~sw` argument. - Switches can also be used to wait for threads even when there isn't an error. e.g. ```ocaml # Eio_main.run @@ fun _env -> Switch.top (fun sw -> Fibre.fork_ignore ~sw - (fun () -> for i = 1 to 3 do traceln "i = %d" i; Fibre.yield ~sw () done); + (fun () -> for i = 1 to 3 do traceln "i = %d" i; Fibre.yield () done); traceln "First thread forked"; Fibre.fork_ignore ~sw - (fun () -> for j = 1 to 3 do traceln "j = %d" j; Fibre.yield ~sw () done); + (fun () -> for j = 1 to 3 do traceln "j = %d" j; Fibre.yield () done); traceln "Second thread forked; top-level code is finished" ); traceln "Switch is finished";; @@ -270,6 +261,8 @@ For example, a web-server might use one switch for the whole server and then cre This allows you to end all fibres handling a single connection by turning off that connection's switch, or to exit the whole application using the top-level switch. +If you want to make an operation non-cancellable, wrap it in a `Switch.top` to create a fresh switch. + ## Design Note: Results vs Exceptions The OCaml standard library uses exceptions to report errors in most cases. diff --git a/doc/prelude.ml b/doc/prelude.ml index f690a62..cca8863 100644 --- a/doc/prelude.ml +++ b/doc/prelude.ml @@ -7,14 +7,14 @@ module Eio_main = struct let fake_clock real_clock = object (_ : #Eio.Time.clock) method now = !now - method sleep_until ?sw time = + method sleep_until time = (* The fake times are all in the past, so we just ask to wait until the fake time is due and it will happen immediately. If we wait for multiple times, they'll get woken in the right order. At the moment, the scheduler only checks for expired timers when the run-queue is empty, so this is a convenient way to wait for the system to be idle. Will need revising if we make the scheduler fair at some point. *) - Eio.Time.sleep_until ?sw real_clock time; + Eio.Time.sleep_until real_clock time; now := max !now time end diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index c7dc2cd..698ad8d 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -30,7 +30,7 @@ module Flow = struct type shutdown_command = [ `Receive | `Send | `All ] type read_method = .. - type read_method += Read_source_buffer of (?sw:Switch.t -> (Cstruct.t list -> unit) -> unit) + type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) class type close = object method close : unit @@ -40,11 +40,11 @@ module Flow = struct class virtual read = object method virtual read_methods : read_method list - method virtual read_into : ?sw:Switch.t -> Cstruct.t -> int + method virtual read_into : Cstruct.t -> int end - let read_into ?sw (t : #read) buf = - let got = t#read_into ?sw buf in + let read_into (t : #read) buf = + let got = t#read_into buf in assert (got > 0); got @@ -61,8 +61,8 @@ module Flow = struct inherit source - method private read_source_buffer ?sw fn = - Option.iter Switch.check sw; + method private read_source_buffer fn = + Fibre.yield (); let rec aux () = match data with | [] -> raise End_of_file @@ -74,8 +74,8 @@ module Flow = struct method read_methods = [ Read_source_buffer self#read_source_buffer ] - method read_into ?sw dst = - Option.iter Switch.check sw; + method read_into dst = + Fibre.yield (); let avail, src = Cstruct.fillv ~dst ~src:data in if avail = 0 then raise End_of_file; data <- src; @@ -85,12 +85,12 @@ module Flow = struct let string_source s = cstruct_source [Cstruct.of_string s] class virtual write = object - method virtual write : 'a. ?sw:Switch.t -> (#source as 'a) -> unit + method virtual write : 'a. (#source as 'a) -> unit end - let copy ?sw (src : #source) (dst : #write) = dst#write ?sw src + let copy (src : #source) (dst : #write) = dst#write src - let copy_string ?sw s = copy ?sw (string_source s) + let copy_string s = copy (string_source s) class virtual sink = object (_ : #Generic.t) method probe _ = None @@ -101,11 +101,11 @@ module Flow = struct object inherit sink - method write ?sw src = + method write src = let buf = Cstruct.create 4096 in try while true do - let got = src#read_into ?sw buf in + let got = src#read_into buf in Buffer.add_string b (Cstruct.to_string ~len:got buf) done with End_of_file -> () @@ -171,14 +171,14 @@ end module Time = struct class virtual clock = object method virtual now : float - method virtual sleep_until : ?sw:Switch.t -> float -> unit + method virtual sleep_until : float -> unit end let now (t : #clock) = t#now - let sleep_until ?sw (t : #clock) time = t#sleep_until ?sw time + let sleep_until (t : #clock) time = t#sleep_until time - let sleep ?sw t d = sleep_until ?sw t (now t +. d) + let sleep t d = sleep_until t (now t +. d) end module Dir = struct @@ -203,7 +203,7 @@ module Dir = struct append:bool -> create:create -> path -> - method virtual mkdir : ?sw:Switch.t -> perm:Unix.file_perm -> path -> unit + method virtual mkdir : perm:Unix.file_perm -> path -> unit method virtual open_dir : sw:Switch.t -> path -> t_with_close end and virtual t_with_close = object @@ -215,7 +215,7 @@ module Dir = struct let open_in ~sw (t:#t) = t#open_in ~sw let open_out ~sw ?(append=false) ~create (t:#t) path = t#open_out ~sw ~append ~create path let open_dir ~sw (t:#t) = t#open_dir ~sw - let mkdir ?sw (t:#t) = t#mkdir ?sw + let mkdir (t:#t) = t#mkdir let with_open_in ?sw (t:#t) path fn = Switch.sub_opt sw @@ fun sw -> fn (open_in ~sw t path) @@ -254,9 +254,12 @@ module Private = struct type 'a enqueue = 'a Suspend.enqueue type _ eff += | Suspend = Suspend.Suspend + | Suspend_unchecked = Suspend.Suspend_unchecked | Fork = Fibre.Fork | Fork_ignore = Fibre.Fork_ignore | Trace = Std.Trace + | Yield = Fibre.Yield + | Set_switch = Switch.Set_switch end - module Switch = Switch + let boot_switch = Switch.boot_switch end diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 2b9821d..120f5ce 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -170,10 +170,9 @@ module Std : sig The new fibre is attached to [sw] (which can't finish until the fibre ends). @param exn_turn_off If [true] and [fn] raises an exception, [sw] is turned off (in addition to breaking the promise). *) - val yield : ?sw:Switch.t -> unit -> unit + val yield : unit -> unit (** [yield ()] asks the scheduler to switch to the next runnable task. - The current task remains runnable, but goes to the back of the queue. - @param sw Ensure that the switch is still on before returning. *) + The current task remains runnable, but goes to the back of the queue. *) end val traceln : @@ -266,7 +265,7 @@ module Flow : sig type read_method = .. (** Sources can offer a list of ways to read them, in order of preference. *) - type read_method += Read_source_buffer of (?sw:Switch.t -> (Cstruct.t list -> unit) -> unit) + type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) (** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn] to borrow a view of the source's buffers. [rb] will raise [End_of_file] if no more data will be produced. @@ -282,15 +281,14 @@ module Flow : sig class virtual read : object method virtual read_methods : read_method list - method virtual read_into : ?sw:Switch.t -> Cstruct.t -> int + method virtual read_into : Cstruct.t -> int end - val read_into : ?sw:Switch.t -> #read -> Cstruct.t -> int + val read_into : #read -> Cstruct.t -> int (** [read_into reader buf] reads one or more bytes into [buf]. It returns the number of bytes written (which may be less than the buffer size even if there is more data to be read). [buf] must not be zero-length. - @param sw Abort the read if [sw] is turned off. @raise End_of_file if there is no more data to read *) val read_methods : #read -> read_method list @@ -309,13 +307,13 @@ module Flow : sig val cstruct_source : Cstruct.t list -> source class virtual write : object - method virtual write : 'a. ?sw:Switch.t -> (#source as 'a) -> unit + method virtual write : 'a. (#source as 'a) -> unit end - val copy : ?sw:Switch.t -> #source -> #write -> unit + val copy : #source -> #write -> unit (** [copy src dst] copies data from [src] to [dst] until end-of-file. *) - val copy_string : ?sw:Switch.t -> string -> #write -> unit + val copy_string : string -> #write -> unit (** Consumer base class. *) class virtual sink : object @@ -404,19 +402,17 @@ end module Time : sig class virtual clock : object method virtual now : float - method virtual sleep_until : ?sw:Switch.t -> float -> unit + method virtual sleep_until : float -> unit end val now : #clock -> float (** [now t] is the current time according to [t]. *) - val sleep_until : ?sw:Switch.t -> #clock -> float -> unit - (** [sleep_until t time] waits until the given time is reached. - @param sw The sleep is aborted if the switch is turned off. *) + val sleep_until : #clock -> float -> unit + (** [sleep_until t time] waits until the given time is reached. *) - val sleep : ?sw:Switch.t -> #clock -> float -> unit - (** [sleep t d] waits for [d] seconds. - @param sw The sleep is aborted if the switch is turned off. *) + val sleep : #clock -> float -> unit + (** [sleep t d] waits for [d] seconds. *) end module Dir : sig @@ -448,7 +444,7 @@ module Dir : sig append:bool -> create:create -> path -> - method virtual mkdir : ?sw:Switch.t -> perm:Unix.file_perm -> path -> unit + method virtual mkdir : perm:Unix.file_perm -> path -> unit method virtual open_dir : sw:Switch.t -> path -> t_with_close end and virtual t_with_close : object @@ -482,7 +478,7 @@ module Dir : sig (** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes it automatically when [fn] returns (if it hasn't already been closed by then). *) - val mkdir : ?sw:Switch.t -> #t -> perm:Unix.file_perm -> path -> unit + val mkdir : #t -> perm:Unix.file_perm -> path -> unit (** [mkdir t ~perm path] creates a new directory [t/path] with permissions [perm]. *) val open_dir : sw:Switch.t -> #t -> path -> @@ -543,7 +539,11 @@ module Private : sig (e.g. because it called {!Promise.await} on an unresolved promise). The effect handler runs [fn tid enqueue] in the scheduler context, passing it the suspended fibre's thread ID (for tracing) and a function to resume it. - [fn] should arrange for [enqueue] to be called once the thread is ready to run again. *) + [fn] should arrange for [enqueue] to be called once the thread is ready to run again. + If a cancellation is pending, this will raise it. *) + + | Suspend_unchecked : (Ctf.id -> 'a enqueue -> unit) -> 'a eff + (** [Suspend_unchecked] is like [Suspend], but doesn't raise pending exceptions. *) | Fork : (unit -> 'a) -> 'a Promise.t eff (** See {!Fibre.fork} *) @@ -551,12 +551,18 @@ module Private : sig | Fork_ignore : (unit -> unit) -> unit eff (** See {!Fibre.fork_ignore} *) + | Yield : unit eff + | Trace : (?__POS__:(string * int * int * int) -> ('a, Format.formatter, unit, unit) format4 -> 'a) eff (** [perform Trace fmt] writes trace logging to the configured trace output. It must not switch fibres, as tracing must not affect scheduling. If the system is not ready to receive the trace output, the whole domain must block until it is. *) + + | Set_switch : Switch.t -> Switch.t eff end + + val boot_switch : Switch.t end diff --git a/lib_eio/fibre.ml b/lib_eio/fibre.ml index b4e8b76..577a412 100644 --- a/lib_eio/fibre.ml +++ b/lib_eio/fibre.ml @@ -1,6 +1,7 @@ open EffectHandlers type _ eff += Fork : (unit -> 'a) -> 'a Promise.t eff +type _ eff += Yield : unit eff let fork ~sw ~exn_turn_off f = let f () = @@ -24,9 +25,8 @@ let fork_ignore ~sw f = in perform (Fork_ignore f) -let yield ?sw () = - Suspend.enter (fun _id enqueue -> enqueue (Ok ())); - Option.iter Switch.check sw +let yield () = + perform Yield let both ~sw f g = let x = fork ~sw ~exn_turn_off:true f in @@ -34,8 +34,12 @@ let both ~sw f g = try g () with ex -> Switch.turn_off sw ex end; - Promise.await x; - Switch.check sw + ignore (Promise.await_result x : (unit, exn) result); + match sw.state with + | On _ -> () + | Off (ex, bt) -> + Switch.raise_with_extras sw ex bt + | Finished -> assert false let fork_sub_ignore ?on_release ~sw ~on_error f = if Switch.is_finished sw then ( diff --git a/lib_eio/suspend.ml b/lib_eio/suspend.ml index 2138f57..360991f 100644 --- a/lib_eio/suspend.ml +++ b/lib_eio/suspend.ml @@ -2,5 +2,7 @@ open EffectHandlers type 'a enqueue = ('a, exn) result -> unit type _ eff += Suspend : (Ctf.id -> 'a enqueue -> unit) -> 'a eff +type _ eff += Suspend_unchecked : (Ctf.id -> 'a enqueue -> unit) -> 'a eff let enter fn = perform (Suspend fn) +let enter_unchecked fn = perform (Suspend_unchecked fn) diff --git a/lib_eio/switch.ml b/lib_eio/switch.ml index f7e3c5f..d880010 100644 --- a/lib_eio/switch.ml +++ b/lib_eio/switch.ml @@ -1,3 +1,5 @@ +open EffectHandlers + exception Multiple_exceptions of exn list exception Cancelled of exn @@ -24,6 +26,23 @@ type t = { waiter : unit Waiters.t; (* The main [top]/[sub] function may wait here for fibres to finish. *) } +(* A dummy switch for bootstrapping *) +let boot_switch = { + id = Ctf.mint_id (); + state = Finished; + fibres = 0; + extra_exceptions = []; + on_release = Lwt_dllist.create (); + waiter = Waiters.create (); +} + +type _ eff += Set_switch : t -> t eff + +let with_switch t fn = + let old = perform (Set_switch t) in + Fun.protect fn + ~finally:(fun () -> ignore (perform (Set_switch old))) + let null_hook = ignore let remove_hook h = h () @@ -52,6 +71,7 @@ let rec turn_off t ex = | Off _ -> begin match ex with | Cancelled _ -> () (* The original exception will be reported elsewhere *) + | Multiple_exceptions exns -> List.iter (turn_off t) exns | _ -> t.extra_exceptions <- ex :: t.extra_exceptions end | On q -> @@ -108,7 +128,7 @@ let await_internal ?sw waiters id tid enqueue = Queue.add resolved_waiter cleanup_hooks let await ?sw waiters id = - Suspend.enter (await_internal ?sw waiters id) + Suspend.enter_unchecked (await_internal ?sw waiters id) let rec await_idle t = (* Wait for fibres to finish: *) @@ -149,6 +169,7 @@ let top fn = waiter = Waiters.create (); on_release = Lwt_dllist.create (); } in + with_switch t @@ fun () -> match fn t with | v -> await_idle t; diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 542cad1..b11c02b 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -95,7 +95,6 @@ type rw_req = { buf : Uring.Region.chunk; mutable cur_off : int; action : int Suspended.t; - sw : Switch.t option; } type cancel_hook = Switch.hook ref @@ -109,6 +108,7 @@ type io_job = type runnable = | Thread : 'a Suspended.t * 'a -> runnable + | Thread_checked : unit Suspended.t -> runnable | Failed_thread : 'a Suspended.t * exn -> runnable type t = { @@ -127,6 +127,7 @@ let enqueue_thread st k x = let enqueue_failed_thread st k ex = Queue.push (Failed_thread (k, ex)) st.run_q +type _ eff += Enter_unchecked : (t -> 'a Suspended.t -> unit) -> 'a eff type _ eff += Enter : (t -> 'a Suspended.t -> unit) -> 'a eff let enter fn = perform (Enter fn) @@ -138,7 +139,7 @@ let rec enqueue_cancel job st action = | Some _ -> () let cancel job = - let res = enter (enqueue_cancel job) in + let res = perform (Enter_unchecked (enqueue_cancel job)) in Log.debug (fun l -> l "cancel returned"); if res = -2 then ( Log.debug (fun f -> f "Cancel returned ENOENT - operation completed before cancel took effect") @@ -175,27 +176,25 @@ let cancel job = When [fn cancel_hook] returns, it registers a cancellation callback with [sw] and stores its handle in [cancel_hook]. If [sw] is already off, it schedules [action] to be discontinued. @return Whether to retry the operation later, once there is space. *) -let with_cancel_hook ?sw ~action st fn = +let with_cancel_hook ~action st fn = let release = ref Switch.null_hook in - match sw with - | None -> fn release = None - | Some sw -> - match Switch.get_error sw with - | Some ex -> enqueue_failed_thread st action ex; false - | None -> - match fn release with - | None -> true - | Some job -> - release := Switch.add_cancel_hook sw (fun _ -> cancel job); - false + let sw = action.Suspended.fibre.switch in + match Switch.get_error sw with + | Some ex -> enqueue_failed_thread st action ex; false + | None -> + match fn release with + | None -> true + | Some job -> + release := Switch.add_cancel_hook sw (fun _ -> cancel job); + false -let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; sw; action} as req) = +let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) = let fd = FD.get "submit_rw_req" fd in let {uring;io_q;_} = st in let off = Uring.Region.to_offset buf + cur_off in let len = match len with Exactly l | Upto l -> l in let len = len - cur_off in - let retry = with_cancel_hook ?sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> match op with |`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read (req, cancel)) |`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write (req, cancel)) @@ -210,26 +209,26 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; sw; action} a (* TODO bind from unixsupport *) let errno_is_retry = function -62 | -11 | -4 -> true |_ -> false -let enqueue_read st action (sw,file_offset,fd,buf,len) = +let enqueue_read st action (file_offset,fd,buf,len) = let file_offset = match file_offset with | Some x -> x | None -> FD.uring_file_offset fd in - let req = { op=`R; file_offset; len; fd; cur_off = 0; buf; action; sw} in + let req = { op=`R; file_offset; len; fd; cur_off = 0; buf; action } in Log.debug (fun l -> l "read: submitting call"); Ctf.label "read"; submit_rw_req st req let rec enqueue_readv args st action = - let (sw,file_offset,fd,bufs) = args in + let (file_offset,fd,bufs) = args in let file_offset = match file_offset with | Some x -> x | None -> FD.uring_file_offset fd in Ctf.label "readv"; - let retry = with_cancel_hook ?sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.readv st.uring ~file_offset (FD.get "readv" fd) bufs (Job (action, cancel)) ) in @@ -237,29 +236,29 @@ let rec enqueue_readv args st action = Queue.push (fun st -> enqueue_readv args st action) st.io_q let rec enqueue_writev args st action = - let (sw,file_offset,fd,bufs) = args in + let (file_offset,fd,bufs) = args in let file_offset = match file_offset with | Some x -> x | None -> FD.uring_file_offset fd in Ctf.label "writev"; - let retry = with_cancel_hook ?sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.writev st.uring ~file_offset (FD.get "writev" fd) bufs (Job (action, cancel)) ) in if retry then (* wait until an sqe is available *) Queue.push (fun st -> enqueue_writev args st action) st.io_q -let rec enqueue_poll_add ?sw fd poll_mask st action = +let rec enqueue_poll_add fd poll_mask st action = Log.debug (fun l -> l "poll_add: submitting call"); Ctf.label "poll_add"; - let retry = with_cancel_hook ?sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.poll_add st.uring (FD.get "poll_add" fd) poll_mask (Job (action, cancel)) ) in if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_poll_add ?sw fd poll_mask st action) st.io_q + Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q let rec enqueue_close st action fd = Log.debug (fun l -> l "close: submitting call"); @@ -268,57 +267,57 @@ let rec enqueue_close st action fd = if subm = None then (* wait until an sqe is available *) Queue.push (fun st -> enqueue_close st action fd) st.io_q -let enqueue_write st action (sw,file_offset,fd,buf,len) = +let enqueue_write st action (file_offset,fd,buf,len) = let file_offset = match file_offset with | Some x -> x | None -> FD.uring_file_offset fd in - let req = { op=`W; file_offset; len; fd; cur_off = 0; buf; action; sw } in + let req = { op=`W; file_offset; len; fd; cur_off = 0; buf; action } in Log.debug (fun l -> l "write: submitting call"); Ctf.label "write"; submit_rw_req st req -let rec enqueue_splice ?sw ~src ~dst ~len st action = +let rec enqueue_splice ~src ~dst ~len st action = Log.debug (fun l -> l "splice: submitting call"); Ctf.label "splice"; - let retry = with_cancel_hook ?sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.splice st.uring (Job (action, cancel)) ~src:(FD.get "splice" src) ~dst:(FD.get "splice" dst) ~len ) in if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_splice ?sw ~src ~dst ~len st action) st.io_q + Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q -let rec enqueue_openat2 ((sw, access, flags, perm, resolve, dir, path) as args) st action = +let rec enqueue_openat2 ((access, flags, perm, resolve, dir, path) as args) st action = Log.debug (fun l -> l "openat2: submitting call"); Ctf.label "openat2"; let fd = Option.map (FD.get "openat2") dir in - let retry = with_cancel_hook ~sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job (action, cancel)) ) in if retry then (* wait until an sqe is available *) Queue.push (fun st -> enqueue_openat2 args st action) st.io_q -let rec enqueue_connect ?sw fd addr st action = +let rec enqueue_connect fd addr st action = Log.debug (fun l -> l "connect: submitting call"); Ctf.label "connect"; - let retry = with_cancel_hook ?sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.connect st.uring (FD.get "connect" fd) addr (Job (action, cancel)) ) in if retry then (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_connect ?sw fd addr st action) st.io_q + Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q -let rec enqueue_accept ~sw fd client_addr st action = +let rec enqueue_accept fd client_addr st action = Log.debug (fun l -> l "accept: submitting call"); Ctf.label "accept"; - let retry = with_cancel_hook ~sw ~action st (fun cancel -> + let retry = with_cancel_hook ~action st (fun cancel -> Uring.accept st.uring (FD.get "accept" fd) client_addr (Job (action, cancel)) ) in if retry then ( (* wait until an sqe is available *) - Queue.push (fun st -> enqueue_accept ~sw fd client_addr st action) st.io_q + Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q ) let rec enqueue_noop st action = @@ -345,6 +344,11 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = (* Wakeup any paused fibres *) match Queue.take run_q with | Thread (k, v) -> Suspended.continue k v (* We already have a runnable task *) + | Thread_checked k -> + begin match Switch.get_error k.fibre.switch with + | Some e -> Suspended.discontinue k e + | None -> Suspended.continue k () + end | Failed_thread (k, ex) -> Suspended.discontinue k ex | exception Queue.Empty -> let now = Unix.gettimeofday () in @@ -396,19 +400,26 @@ and handle_complete st ~runnable result = complete_rw_req st req result | Job (k, cancel) -> Switch.remove_hook !cancel; - Suspended.continue k result + begin match Switch.get_error k.fibre.switch with + | Some e -> Suspended.discontinue k e (* If cancelled, report that instead. *) + | None -> Suspended.continue k result + end | Job_no_cancel k -> Suspended.continue k result and complete_rw_req st ({len; cur_off; action; _} as req) res = match res, len with | 0, _ -> Suspended.discontinue action End_of_file | e, _ when e < 0 -> - if errno_is_retry e then ( - submit_rw_req st req; - schedule st - ) else ( - Suspended.continue action e - ) + begin match Switch.get_error action.fibre.switch with + | Some e -> Suspended.discontinue action e (* If cancelled, report that instead. *) + | None -> + if errno_is_retry e then ( + submit_rw_req st req; + schedule st + ) else ( + Suspended.continue action e + ) + end | n, Exactly len when n < len - cur_off -> req.cur_off <- req.cur_off + n; submit_rw_req st req; @@ -434,25 +445,23 @@ let noop () = Log.debug (fun l -> l "noop returned"); if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", "")) -type _ eff += Sleep_until : Switch.t option * float -> unit eff -let sleep_until ?sw d = - perform (Sleep_until (sw, d)) +type _ eff += Sleep_until : float -> unit eff +let sleep_until d = + perform (Sleep_until d) -type _ eff += ERead : (Switch.t option * Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff +type _ eff += ERead : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff -let read_exactly ?sw ?file_offset fd buf len = - let res = perform (ERead (sw, file_offset, fd, buf, Exactly len)) in +let read_exactly ?file_offset fd buf len = + let res = perform (ERead (file_offset, fd, buf, Exactly len)) in Log.debug (fun l -> l "read_exactly: woken up after read"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "read_exactly", "")) ) -let read_upto ?sw ?file_offset fd buf len = - let res = perform (ERead (sw, file_offset, fd, buf, Upto len)) in +let read_upto ?file_offset fd buf len = + let res = perform (ERead (file_offset, fd, buf, Upto len)) in Log.debug (fun l -> l "read_upto: woken up after read"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) let err = Uring.error_of_errno res in let ex = Unix.Unix_error (err, "read_upto", "") in if err = Unix.ECONNRESET then raise (Eio.Net.Connection_reset ex) @@ -461,11 +470,10 @@ let read_upto ?sw ?file_offset fd buf len = res ) -let readv ?sw ?file_offset fd bufs = - let res = enter (enqueue_readv (sw, file_offset, fd, bufs)) in +let readv ?file_offset fd bufs = + let res = enter (enqueue_readv (file_offset, fd, bufs)) in Log.debug (fun l -> l "readv: woken up after read"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "readv", "")) ) else if res = 0 then ( raise End_of_file @@ -473,11 +481,10 @@ let readv ?sw ?file_offset fd bufs = res ) -let rec writev ?sw ?file_offset fd bufs = - let res = enter (enqueue_writev (sw, file_offset, fd, bufs)) in +let rec writev ?file_offset fd bufs = + let res = enter (enqueue_writev (file_offset, fd, bufs)) in Log.debug (fun l -> l "writev: woken up after write"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) ) else ( match Cstruct.shiftv bufs res with @@ -490,32 +497,29 @@ let rec writev ?sw ?file_offset fd bufs = | Some ofs when ofs = I63.minus_one -> Some I63.minus_one | Some ofs -> Some (I63.add ofs (I63.of_int res)) in - writev ?sw ?file_offset fd bufs + writev ?file_offset fd bufs ) -let await_readable ?sw fd = - let res = enter (enqueue_poll_add ?sw fd (Uring.Poll_mask.(pollin + pollerr))) in +let await_readable fd = + let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in Log.debug (fun l -> l "await_readable: woken up"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "await_readable", "")) ) -let await_writable ?sw fd = - let res = enter (enqueue_poll_add ?sw fd (Uring.Poll_mask.(pollout + pollerr))) in +let await_writable fd = + let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in Log.debug (fun l -> l "await_writable: woken up"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "await_writable", "")) ) -type _ eff += EWrite : (Switch.t option * Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff +type _ eff += EWrite : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff -let write ?sw ?file_offset fd buf len = - let res = perform (EWrite (sw, file_offset, fd, buf, Exactly len)) in +let write ?file_offset fd buf len = + let res = perform (EWrite (file_offset, fd, buf, Exactly len)) in Log.debug (fun l -> l "write: woken up after write"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "write", "")) ) @@ -525,21 +529,17 @@ let alloc () = perform Alloc type _ eff += Free : Uring.Region.chunk -> unit eff let free buf = perform (Free buf) -let splice ?sw src ~dst ~len = - let res = enter (enqueue_splice ?sw ~src ~dst ~len) in +let splice src ~dst ~len = + let res = enter (enqueue_splice ~src ~dst ~len) in Log.debug (fun l -> l "splice returned"); if res > 0 then res else if res = 0 then raise End_of_file - else ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) - raise (Unix.Unix_error (Uring.error_of_errno res, "splice", "")) - ) + else raise (Unix.Unix_error (Uring.error_of_errno res, "splice", "")) -let connect ?sw fd addr = - let res = enter (enqueue_connect ?sw fd addr) in +let connect fd addr = + let res = enter (enqueue_connect fd addr) in Log.debug (fun l -> l "connect returned"); if res < 0 then ( - Option.iter Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "connect", "")) ) @@ -554,7 +554,7 @@ let openfile ~sw path flags mode = let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path = wrap_errors path @@ fun () -> - let res = enter (enqueue_openat2 (sw, access, flags, perm, resolve, dir, path)) in + let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in Log.debug (fun l -> l "openat2 returned"); if res < 0 then ( Switch.check sw; (* If cancelled, report that instead. *) @@ -574,17 +574,17 @@ let fstat fd = external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat" (* We ignore [sw] because this isn't a uring operation yet. *) -let mkdirat ?sw:_ ~perm dir path = +let mkdirat ~perm dir path = wrap_errors path @@ fun () -> match dir with | None -> Unix.mkdir path perm | Some dir -> eio_mkdirat (FD.get "mkdirat" dir) path perm -let mkdir_beneath ?sw ~perm ?dir path = +let mkdir_beneath ~perm ?dir path = let dir_path = Filename.dirname path in let leaf = Filename.basename path in (* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *) - Switch.sub_opt sw (fun sw -> + Switch.top (fun sw -> let parent = wrap_errors path @@ fun () -> openat2 ~sw ~seekable:false ?dir dir_path @@ -593,19 +593,18 @@ let mkdir_beneath ?sw ~perm ?dir path = ~perm:0 ~resolve:Uring.Resolve.beneath in - mkdirat ~sw ~perm (Some parent) leaf + mkdirat ~perm (Some parent) leaf ) let shutdown socket command = Unix.shutdown (FD.get "shutdown" socket) command -let accept_loose_fd ~sw socket = +let accept_loose_fd socket = Ctf.label "accept"; let client_addr = Uring.Sockaddr.create () in - let res = enter (enqueue_accept ~sw socket client_addr) in + let res = enter (enqueue_accept socket client_addr) in Log.debug (fun l -> l "accept returned"); if res < 0 then ( - Switch.check sw; (* If cancelled, report that instead. *) raise (Unix.Unix_error (Uring.error_of_errno res, "accept", "")) ) else ( let unix : Unix.file_descr = Obj.magic res in @@ -615,7 +614,7 @@ let accept_loose_fd ~sw socket = ) let accept ~sw fd = - let client, client_addr = accept_loose_fd ~sw fd in + let client, client_addr = accept_loose_fd fd in Switch.on_release sw (fun () -> FD.ensure_closed client); client, client_addr @@ -643,46 +642,46 @@ module Objects = struct (* When copying between a source with an FD and a sink with an FD, we can share the chunk and avoid copying. *) - let fast_copy ?sw src dst = + let fast_copy src dst = with_chunk @@ fun chunk -> let chunk_size = Uring.Region.length chunk in try while true do - let got = read_upto ?sw src chunk chunk_size in - write ?sw dst chunk got + let got = read_upto src chunk chunk_size in + write dst chunk got done with End_of_file -> () (* Try a fast copy using splice. If the FDs don't support that, switch to copying. *) - let fast_copy_try_splice ?sw src dst = + let fast_copy_try_splice src dst = try while true do - let _ : int = splice ?sw src ~dst ~len:max_int in + let _ : int = splice src ~dst ~len:max_int in () done with | End_of_file -> () - | Unix.Unix_error (Unix.EINVAL, "splice", _) -> fast_copy ?sw src dst + | Unix.Unix_error (Unix.EINVAL, "splice", _) -> fast_copy src dst (* Copy using the [Read_source_buffer] optimisation. Avoids a copy if the source already has the data. *) - let copy_with_rsb ?sw rsb dst = + let copy_with_rsb rsb dst = try while true do - rsb ?sw (writev ?sw dst) + rsb (writev dst) done with End_of_file -> () (* Copy by allocating a chunk from the pre-shared buffer and asking the source to write into it. This used when the other methods aren't available. *) - let fallback_copy ?sw src dst = + let fallback_copy src dst = with_chunk @@ fun chunk -> let chunk_cs = Uring.Region.to_cstruct chunk in try while true do - let got = Eio.Flow.read_into ?sw src chunk_cs in - write ?sw dst chunk got + let got = Eio.Flow.read_into src chunk_cs in + write dst chunk got done with End_of_file -> () @@ -696,7 +695,7 @@ module Objects = struct | FD -> Some fd | _ -> None - method read_into ?sw buf = + method read_into buf = (* Inefficient copying fallback *) with_chunk @@ fun chunk -> let chunk_cs = Uring.Region.to_cstruct chunk in @@ -704,22 +703,22 @@ module Objects = struct if Lazy.force is_tty then ( (* Work-around for https://github.com/axboe/liburing/issues/354 (should be fixed in Linux 5.14) *) - await_readable ?sw fd + await_readable fd ); - let got = read_upto ?sw fd chunk max_len in + let got = read_upto fd chunk max_len in Cstruct.blit chunk_cs 0 buf 0 got; got method read_methods = [] - method write ?sw src = + method write src = match get_fd_opt src with - | Some src -> fast_copy_try_splice ?sw src fd + | Some src -> fast_copy_try_splice src fd | None -> let rec aux = function - | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb ?sw rsb fd + | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb rsb fd | _ :: xs -> aux xs - | [] -> fallback_copy ?sw src fd + | [] -> fallback_copy src fd in aux (Eio.Flow.read_methods src) @@ -739,7 +738,7 @@ module Objects = struct method close = FD.close fd method accept_sub ~sw ~on_error fn = - let client, client_addr = accept_loose_fd ~sw fd in + let client, client_addr = accept_loose_fd fd in Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> let client_addr = match client_addr with @@ -790,7 +789,7 @@ module Objects = struct in let sock_unix = Unix.socket socket_domain socket_type 0 in let sock = FD.of_unix ~sw ~seekable:false sock_unix in - connect ~sw sock addr; + connect sock addr; (flow sock :> ) end @@ -870,8 +869,8 @@ module Objects = struct in (new dir (Some fd) :> ) - method mkdir ?sw ~perm path = - mkdir_beneath ?sw ~perm ?dir:fd path + method mkdir ~perm path = + mkdir_beneath ~perm ?dir:fd path method close = FD.close (Option.get fd) @@ -883,8 +882,8 @@ module Objects = struct val! resolve_flags = Uring.Resolve.empty - method! mkdir ?sw ~perm path = - mkdirat ?sw ~perm None path + method! mkdir ~perm path = + mkdirat ~perm None path end let stdenv () = @@ -926,8 +925,9 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let mem_q = Queue.create () in let st = { mem; uring; run_q; io_q; mem_q; sleep_q; io_jobs = 0 } in Log.debug (fun l -> l "starting main thread"); - let rec fork ~tid fn = + let rec fork ~tid ~switch:initial_switch fn = Ctf.note_switch tid; + let fibre = { Suspended.tid; switch = initial_switch } in match_with fn () { retc = (fun () -> schedule st); exnc = (fun e -> raise e); @@ -935,60 +935,92 @@ let run ?(queue_depth=64) ?(block_size=4096) main = match e with | Enter fn -> Some (fun k -> - let k = { Suspended.k; tid } in - fn st k; - schedule st) + begin match Switch.get_error fibre.switch with + | Some e -> discontinue k e + | None -> + let k = { Suspended.k; fibre } in + fn st k; + schedule st + end) + | Enter_unchecked fn -> + Some (fun k -> + let k = { Suspended.k; fibre } in + fn st k; + schedule st + ) | ERead args -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in enqueue_read st k args; schedule st) | Close fd -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in enqueue_close st k fd; schedule st) | EWrite args -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in enqueue_write st k args; schedule st) - | Sleep_until (sw, time) -> + | Sleep_until time -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in let cancel_hook = ref Switch.null_hook in - begin match sw with + let sw = fibre.switch in + begin match Switch.get_error sw with + | Some ex -> Suspended.discontinue k ex | None -> - ignore (Zzz.add ~cancel_hook sleep_q time k : Zzz.Key.t); + let job = Zzz.add ~cancel_hook sleep_q time k in + cancel_hook := Switch.add_cancel_hook sw (fun ex -> + Zzz.remove sleep_q job; + enqueue_failed_thread st k ex + ); schedule st - | Some sw -> - match Switch.get_error sw with - | Some ex -> Suspended.discontinue k ex - | None -> - let job = Zzz.add ~cancel_hook sleep_q time k in - cancel_hook := Switch.add_cancel_hook sw (fun ex -> - Zzz.remove sleep_q job; - enqueue_failed_thread st k ex - ); - schedule st end) + | Eio.Private.Effects.Set_switch switch -> + Some (fun k -> + let old = fibre.switch in + fibre.switch <- switch; + continue k old + ) + | Eio.Private.Effects.Yield -> + Some (fun k -> + let k = { Suspended.k; fibre } in + Queue.push (Thread_checked k) st.run_q; + schedule st + ) + | Eio.Private.Effects.Suspend_unchecked f -> + Some (fun k -> + let k = { Suspended.k; fibre } in + f tid (function + | Ok v -> enqueue_thread st k v + | Error ex -> enqueue_failed_thread st k ex + ); + schedule st + ) | Eio.Private.Effects.Suspend f -> Some (fun k -> - let k = { Suspended.k; tid } in - f tid (function - | Ok v -> enqueue_thread st k v - | Error ex -> enqueue_failed_thread st k ex - ); - schedule st) + match Switch.get_error fibre.switch with + | Some e -> discontinue k e + | None -> + let k = { Suspended.k; fibre } in + f tid (function + | Ok v -> enqueue_thread st k v + | Error ex -> enqueue_failed_thread st k ex + ); + schedule st + ) | Eio.Private.Effects.Fork f -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in let id = Ctf.mint_id () in Ctf.note_created id Ctf.Task; let promise, resolver = Promise.create_with_id id in enqueue_thread st k promise; fork ~tid:id + ~switch:fibre.switch (fun () -> match f () with | x -> Promise.fulfill resolver x @@ -998,11 +1030,11 @@ let run ?(queue_depth=64) ?(block_size=4096) main = )) | Eio.Private.Effects.Fork_ignore f -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in enqueue_thread st k (); let child = Ctf.note_fork () in Ctf.note_switch child; - fork ~tid:child (fun () -> + fork ~tid:child ~switch:fibre.switch (fun () -> match f () with | () -> Ctf.note_resolved child ~ex:None @@ -1012,7 +1044,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = | Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln) | Alloc -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in alloc_buf st k) | Free buf -> Some (fun k -> @@ -1022,9 +1054,10 @@ let run ?(queue_depth=64) ?(block_size=4096) main = } in let main_done = ref false in - let `Exit_scheduler = fork ~tid:(Ctf.mint_id ()) (fun () -> - Fun.protect (fun () -> main stdenv) - ~finally:(fun () -> main_done := true) + let `Exit_scheduler = + fork ~tid:(Ctf.mint_id ()) ~switch:Eio.Private.boot_switch (fun () -> + Fun.protect (fun () -> Switch.top (fun _sw -> main stdenv)) + ~finally:(fun () -> main_done := true) ) in if not !main_done then failwith "Deadlock detected: no events scheduled but main function hasn't returned"; diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index 718570f..40b82ab 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -47,7 +47,7 @@ val noop : unit -> unit (** {1 Time functions} *) -val sleep_until : ?sw:Switch.t -> float -> unit +val sleep_until : float -> unit (** [sleep_until time] blocks until the current time is [time]. @param sw Cancel the sleep if [sw] is turned off. *) @@ -76,48 +76,47 @@ val openat2 : (** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path]. See {!Uring.openat2} for details. *) -val read_upto : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> int +val read_upto : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> int (** [read_upto fd chunk len] reads at most [len] bytes from [fd], returning as soon as some data is available. - @param sw Abort the read if [sw] is turned off. @param file_offset Read from the given position in [fd] (default: 0). @raise End_of_file Raised if all data has already been read. *) -val read_exactly : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit +val read_exactly : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit (** [read_exactly fd chunk len] reads exactly [len] bytes from [fd], performing multiple read operations if necessary. @param file_offset Read from the given position in [fd] (default: 0). @raise End_of_file Raised if the stream ends before [len] bytes have been read. *) -val readv : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int +val readv : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int (** [readv] is like {!read_upto} but can read into any cstruct(s), not just chunks of the pre-shared buffer. If multiple buffers are given, they are filled in order. *) -val write : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit +val write : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit (** [write fd buf len] writes exactly [len] bytes from [buf] to [fd]. It blocks until the OS confirms the write is done, and resubmits automatically if the OS doesn't write all of it at once. *) -val writev : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit +val writev : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit (** [writev] is like {!write} but can write from any cstruct(s), not just chunks of the pre-shared buffer. If multiple buffers are given, they are sent in order. It will make multiple OS calls if the OS doesn't write all of it at once. *) -val splice : ?sw:Switch.t -> FD.t -> dst:FD.t -> len:int -> int +val splice : FD.t -> dst:FD.t -> len:int -> int (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. @return The number of bytes copied. @raise End_of_file [src] is at the end of the file. @raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *) -val connect : ?sw:Switch.t -> FD.t -> Unix.sockaddr -> unit +val connect : FD.t -> Unix.sockaddr -> unit (** [connect fd addr] attempts to connect socket [fd] to [addr]. *) -val await_readable : ?sw:Switch.t -> FD.t -> unit +val await_readable : FD.t -> unit (** [await_readable fd] blocks until [fd] is readable (or has an error). *) -val await_writable : ?sw:Switch.t -> FD.t -> unit +val await_writable : FD.t -> unit (** [await_writable fd] blocks until [fd] is writable (or has an error). *) val fstat : FD.t -> Unix.stats diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 8d2afbd..3c34c1e 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -90,14 +90,14 @@ let test_iovec () = let rec recv = function | [] -> () | cs -> - let got = Eio_linux.readv ~sw from_pipe cs in + let got = Eio_linux.readv from_pipe cs in recv (Cstruct.shiftv cs got) in Fibre.both ~sw (fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3]) (fun () -> let b = Cstruct.of_string "barfoo" in - Eio_linux.writev ~sw to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; + Eio_linux.writev to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; Eio_linux.FD.close to_pipe ); Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message) diff --git a/lib_eio_luv/eio_luv.ml b/lib_eio_luv/eio_luv.ml index c4d8faa..ad1970f 100644 --- a/lib_eio_luv/eio_luv.ml +++ b/lib_eio_luv/eio_luv.ml @@ -50,16 +50,16 @@ let or_raise_path path = function module Suspended = struct type 'a t = { - tid : Ctf.id; + fibre : Eunix.Suspended.state; k : ('a, unit) continuation; } let continue t v = - Ctf.note_switch t.tid; + Ctf.note_switch t.fibre.tid; continue t.k v let discontinue t ex = - Ctf.note_switch t.tid; + Ctf.note_switch t.fibre.tid; discontinue t.k ex let continue_result t = function @@ -67,11 +67,14 @@ module Suspended = struct | Error x -> discontinue t x end -type _ eff += Await : (('a -> unit) -> unit) -> 'a eff +type _ eff += Await : (Eunix.Suspended.state -> ('a -> unit) -> unit) -> 'a eff let await fn = perform (Await fn) type _ eff += Enter : ('a Suspended.t -> unit) -> 'a eff +type _ eff += Enter_unchecked : ('a Suspended.t -> unit) -> 'a eff + let enter fn = perform (Enter fn) +let enter_unchecked fn = perform (Enter_unchecked fn) let await_exn fn = perform (Await fn) |> or_raise @@ -88,13 +91,8 @@ let enqueue_failed_thread k ex = let yield = Luv.Timer.init () |> or_raise in Luv.Timer.start yield 0 (fun () -> Suspended.discontinue k ex) |> or_raise -let yield ?sw () = - Option.iter Switch.check sw; - enter @@ fun k -> - enqueue_thread k () - -let with_cancel ?sw ~request fn = - let cancel = Switch.add_cancel_hook_opt sw (fun _ -> +let with_cancel fibre ~request fn = + let cancel = Switch.add_cancel_hook fibre.Eunix.Suspended.switch (fun _ -> match Luv.Request.cancel request with | Ok () -> () | Error e -> Log.debug (fun f -> f "Cancel failed: %s" (Luv.Error.strerror e)) @@ -120,7 +118,7 @@ module Handle = struct let fd = get "close" t in t.fd <- `Closed; Switch.remove_hook t.release_hook; - enter @@ fun k -> + enter_unchecked @@ fun k -> Luv.Handle.close fd (Suspended.continue k) let ensure_closed t = @@ -156,7 +154,7 @@ module File = struct let fd = get "close" t in t.fd <- `Closed; Switch.remove_hook t.release_hook; - await_exn (Luv.File.close fd) + await_exn (fun _fibre -> Luv.File.close fd) let ensure_closed t = if is_open t then close t @@ -171,45 +169,45 @@ module File = struct t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t); t + let await_with_cancel ~request fn = + await (fun fibre k -> + with_cancel fibre ~request (fun () -> fn k) + ) + let open_ ~sw ?mode path flags = let request = Luv.File.Request.make () in - with_cancel ~sw ~request @@ fun () -> - await (Luv.File.open_ ?mode ~request path flags) |> Result.map (of_luv ~sw) + await_with_cancel ~request (Luv.File.open_ ?mode ~request path flags) + |> Result.map (of_luv ~sw) - let read ?sw fd bufs = + let read fd bufs = let request = Luv.File.Request.make () in - with_cancel ?sw ~request @@ fun () -> - await (Luv.File.read ~request (get "read" fd) bufs) + await_with_cancel ~request (Luv.File.read ~request (get "read" fd) bufs) - let rec write ?sw fd bufs = + let rec write fd bufs = let request = Luv.File.Request.make () in - with_cancel ?sw ~request @@ fun () -> - let sent = await_exn (Luv.File.write ~request (get "write" fd) bufs) in + let sent = await_with_cancel ~request (Luv.File.write ~request (get "write" fd) bufs) |> or_raise in let rec aux = function | [] -> () | x :: xs when Luv.Buffer.size x = 0 -> aux xs - | bufs -> write ?sw fd bufs + | bufs -> write fd bufs in aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent) - let realpath ?sw path = + let realpath path = let request = Luv.File.Request.make () in - with_cancel ?sw ~request @@ fun () -> - await (Luv.File.realpath ~request path) + await_with_cancel ~request (Luv.File.realpath ~request path) - let mkdir ?sw ~mode path = + let mkdir ~mode path = let request = Luv.File.Request.make () in - with_cancel ?sw ~request @@ fun () -> - await (Luv.File.mkdir ~request ~mode path) + await_with_cancel ~request (Luv.File.mkdir ~request ~mode path) end module Stream = struct type 'a t = [`Stream of 'a] Handle.t - let rec read_into ?sw (sock:'a t) buf = - Option.iter Switch.check sw; + let rec read_into (sock:'a t) buf = let r = enter (fun k -> - let cancel = Switch.add_cancel_hook_opt sw (fun ex -> + let cancel = Switch.add_cancel_hook k.fibre.switch (fun ex -> Luv.Stream.read_stop (Handle.get "read_into:cancel" sock) |> or_raise; enqueue_failed_thread k (Switch.Cancelled ex) ) in @@ -223,7 +221,7 @@ module Stream = struct | Ok buf' -> let len = Luv.Buffer.size buf' in if len > 0 then len - else read_into ?sw sock buf (* Luv uses a zero-length read to mean EINTR! *) + else read_into sock buf (* Luv uses a zero-length read to mean EINTR! *) | Error `EOF -> raise End_of_file | Error (`ECONNRESET as e) -> raise (Eio.Net.Connection_reset (Luv_error e)) | Error x -> raise (Luv_error x) @@ -232,7 +230,7 @@ module Stream = struct | empty :: xs when Luv.Buffer.size empty = 0 -> skip_empty xs | xs -> xs - let rec write ?sw t bufs = + let rec write t bufs = let err, n = (* note: libuv doesn't seem to allow cancelling stream writes *) enter (fun k -> @@ -243,15 +241,14 @@ module Stream = struct or_raise err; match Luv.Buffer.drop bufs n |> skip_empty with | [] -> () - | bufs -> write ?sw t bufs + | bufs -> write t bufs end -let sleep_until ?sw due = - Option.iter Switch.check sw; +let sleep_until due = let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in let timer = Luv.Timer.init () |> or_raise in enter @@ fun k -> - let cancel = Switch.add_cancel_hook_opt sw (fun ex -> + let cancel = Switch.add_cancel_hook k.fibre.switch (fun ex -> Luv.Timer.stop timer |> or_raise; Luv.Handle.close timer (fun () -> ()); enqueue_failed_thread k ex @@ -291,21 +288,21 @@ module Objects = struct | FD -> Some fd | _ -> None - method read_into ?sw buf = + method read_into buf = let buf = Cstruct.to_bigarray buf in - match File.read ?sw fd [buf] |> or_raise |> Unsigned.Size_t.to_int with + match File.read fd [buf] |> or_raise |> Unsigned.Size_t.to_int with | 0 -> raise End_of_file | got -> got method read_methods = [] - method write ?sw src = + method write src = let buf = Luv.Buffer.create 4096 in try while true do let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in - File.write ?sw fd [sub] + File.write fd [sub] done with End_of_file -> () end @@ -316,19 +313,19 @@ module Objects = struct let socket sock = object inherit Eio.Flow.two_way - method read_into ?sw buf = + method read_into buf = let buf = Cstruct.to_bigarray buf in - Stream.read_into ?sw sock buf + Stream.read_into sock buf method read_methods = [] - method write ?sw src = + method write src = let buf = Luv.Buffer.create 4096 in try while true do let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in let buf' = Luv.Buffer.sub buf ~offset:0 ~length:got in - Stream.write ?sw sock [buf'] + Stream.write sock [buf'] done with End_of_file -> () @@ -336,11 +333,11 @@ module Objects = struct Handle.close sock method shutdown = function - | `Send -> await_exn @@ Luv.Stream.shutdown (Handle.get "shutdown" sock) + | `Send -> await_exn (fun _fibre -> Luv.Stream.shutdown (Handle.get "shutdown" sock)) | `Receive -> failwith "shutdown receive not supported" | `All -> Log.warn (fun f -> f "shutdown receive not supported"); - await_exn @@ Luv.Stream.shutdown (Handle.get "shutdown" sock) + await_exn (fun _fibre -> Luv.Stream.shutdown (Handle.get "shutdown" sock)) end class virtual ['a] listening_socket ~backlog sock = object (self) @@ -438,11 +435,11 @@ module Objects = struct | `Tcp (host, port) -> let sock = Luv.TCP.init () |> or_raise |> Handle.of_luv ~sw in let addr = luv_addr_of_unix host port in - await_exn (Luv.TCP.connect (Handle.get "connect" sock) addr); + await_exn (fun _fibre -> Luv.TCP.connect (Handle.get "connect" sock) addr); socket sock | `Unix path -> let sock = Luv.Pipe.init () |> or_raise |> Handle.of_luv ~sw in - await_exn (Luv.Pipe.connect (Handle.get "connect" sock) path); + await_exn (fun _fibre -> Luv.Pipe.connect (Handle.get "connect" sock) path); socket sock end @@ -498,10 +495,10 @@ module Objects = struct (* Resolve a relative path to an absolute one, with no symlinks. @raise Eio.Dir.Permission_denied if it's outside of [dir_path]. *) - method private resolve ?sw path = + method private resolve path = if Filename.is_relative path then ( - let dir_path = File.realpath ?sw dir_path |> or_raise_path dir_path in - let full = File.realpath ?sw (Filename.concat dir_path path) |> or_raise_path path in + let dir_path = File.realpath dir_path |> or_raise_path dir_path in + let full = File.realpath (Filename.concat dir_path path) |> or_raise_path path in let prefix_len = String.length dir_path + 1 in if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then full @@ -514,16 +511,16 @@ module Objects = struct ) (* We want to create [path]. Check that the parent is in the sandbox. *) - method private resolve_new ?sw path = + method private resolve_new path = let dir, leaf = Filename.dirname path, Filename.basename path in if leaf = ".." then Fmt.failwith "New path %S ends in '..'!" path - else match self#resolve ?sw dir with + else match self#resolve dir with | dir -> Filename.concat dir leaf | exception Eio.Dir.Permission_denied (dir, ex) -> raise (Eio.Dir.Permission_denied (Filename.concat dir leaf, ex)) method open_in ~sw path = - let fd = File.open_ ~sw (self#resolve ~sw path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in + let fd = File.open_ ~sw (self#resolve path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in (flow fd :> ) method open_out ~sw ~append ~create path = @@ -545,11 +542,11 @@ module Objects = struct method open_dir ~sw path = Switch.check sw; - new dir (self#resolve ~sw path) + new dir (self#resolve path) (* libuv doesn't seem to provide a race-free way to do this. *) - method mkdir ?sw ~perm path = - let real_path = self#resolve_new ?sw path in + method mkdir ~perm path = + let real_path = self#resolve_new path in File.mkdir ~mode:[`NUMERIC perm] real_path |> or_raise_path path method close = () @@ -560,7 +557,7 @@ module Objects = struct inherit dir "/" (* No checks *) - method! private resolve ?sw:_ path = path + method! private resolve path = path end let cwd = object @@ -586,8 +583,9 @@ end let run main = Log.debug (fun l -> l "starting run"); let stdenv = Objects.stdenv () in - let rec fork ~tid fn = + let rec fork ~tid ~switch:initial_switch fn = Ctf.note_switch tid; + let fibre = { Eunix.Suspended.tid; switch = initial_switch } in match_with fn () { retc = (fun () -> ()); exnc = (fun e -> raise e); @@ -595,19 +593,20 @@ let run main = match e with | Await fn -> Some (fun k -> - let k = { Suspended.k; tid } in - fn (Suspended.continue k)) + let k = { Suspended.k; fibre } in + fn fibre (Suspended.continue k)) | Eio.Private.Effects.Trace -> Some (fun k -> continue k Eunix.Trace.default_traceln) | Eio.Private.Effects.Fork f -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in let id = Ctf.mint_id () in Ctf.note_created id Ctf.Task; let promise, resolver = Promise.create_with_id id in enqueue_thread k promise; fork ~tid:id + ~switch:fibre.switch (fun () -> match f () with | x -> Promise.fulfill resolver x @@ -617,28 +616,60 @@ let run main = )) | Eio.Private.Effects.Fork_ignore f -> Some (fun k -> - let k = { Suspended.k; tid } in + let k = { Suspended.k; fibre } in enqueue_thread k (); let child = Ctf.note_fork () in Ctf.note_switch child; - fork ~tid:child (fun () -> + fork ~tid:child ~switch:fibre.switch (fun () -> match f () with | () -> Ctf.note_resolved child ~ex:None | exception ex -> Ctf.note_resolved child ~ex:(Some ex) )) - | Enter fn -> Some (fun k -> fn { Suspended.k; tid }) + | Eio.Private.Effects.Set_switch switch -> + Some (fun k -> + let old = fibre.switch in + fibre.switch <- switch; + continue k old + ) + | Enter_unchecked fn -> Some (fun k -> + fn { Suspended.k; fibre } + ) + | Enter fn -> Some (fun k -> + match Switch.get_error fibre.switch with + | Some e -> discontinue k e + | None -> fn { Suspended.k; fibre } + ) + | Eio.Private.Effects.Yield -> + Some (fun k -> + let yield = Luv.Timer.init () |> or_raise in + Luv.Timer.start yield 0 (fun () -> + match Switch.get_error fibre.switch with + | Some e -> discontinue k e + | None -> continue k () + ) + |> or_raise + ) + | Eio.Private.Effects.Suspend_unchecked fn -> + Some (fun k -> + let k = { Suspended.k; fibre } in + fn tid (enqueue_result_thread k) + ) | Eio.Private.Effects.Suspend fn -> Some (fun k -> - let k = { Suspended.k; tid } in - fn tid (enqueue_result_thread k)) + begin match Switch.get_error fibre.switch with + | Some e -> discontinue k e + | None -> + let k = { Suspended.k; fibre } in + fn tid (enqueue_result_thread k) + end) | _ -> None } in let main_status = ref `Running in - fork ~tid:(Ctf.mint_id ()) (fun () -> - match main stdenv with + fork ~tid:(Ctf.mint_id ()) ~switch:Eio.Private.boot_switch (fun () -> + match Switch.top (fun _sw -> main stdenv) with | () -> main_status := `Done | exception ex -> main_status := `Ex (ex, Printexc.get_raw_backtrace ()) ); diff --git a/lib_eio_luv/eio_luv.mli b/lib_eio_luv/eio_luv.mli index de9ea0d..90e69e8 100644 --- a/lib_eio_luv/eio_luv.mli +++ b/lib_eio_luv/eio_luv.mli @@ -23,17 +23,15 @@ exception Luv_error of Luv.Error.t val or_raise : 'a or_error -> 'a (** [or_error (Error e)] raises [Luv_error e]. *) -val await : (('a -> unit) -> unit) -> 'a +val await : (Eunix.Suspended.state -> ('a -> unit) -> unit) -> 'a (** [await fn] converts a function using a luv-style callback to one using effects. - Use it as e.g. [await (Luv.File.realpath path)]. *) + Use it as e.g. [await (fun fibre -> Luv.File.realpath path)]. + Use [fibre] to implement cancellation. *) (** {1 Time functions} *) -val sleep_until : ?sw:Switch.t -> float -> unit -(** [sleep_until time] blocks until the current time is [time]. - @param sw Cancel the sleep if [sw] is turned off. *) - -val yield : ?sw:Switch.t -> unit -> unit +val sleep_until : float -> unit +(** [sleep_until time] blocks until the current time is [time]. *) (** {1 Low-level wrappers for Luv functions} *) @@ -63,16 +61,16 @@ module File : sig string -> Luv.File.Open_flag.t list -> t or_error (** Wraps {!Luv.File.open_} *) - val read : ?sw:Switch.t -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error + val read : t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error (** Wraps {!Luv.File.read} *) - val write : ?sw:Switch.t -> t -> Luv.Buffer.t list -> unit + val write : t -> Luv.Buffer.t list -> unit (** [write t bufs] writes all the data in [bufs] (which may take several calls to {!Luv.File.write}). *) - val realpath : ?sw:Switch.t -> string -> string or_error + val realpath : string -> string or_error (** Wraps {!Luv.File.realpath} *) - val mkdir : ?sw:Switch.t -> mode:Luv.File.Mode.t list -> string -> unit or_error + val mkdir : mode:Luv.File.Mode.t list -> string -> unit or_error (** Wraps {!Luv.File.mkdir} *) end diff --git a/lib_eio_luv/tests/files.md b/lib_eio_luv/tests/files.md index 12d8f66..9521013 100644 --- a/lib_eio_luv/tests/files.md +++ b/lib_eio_luv/tests/files.md @@ -6,12 +6,12 @@ ``` ```ocaml -let rec read_exactly ~sw fd buf = +let rec read_exactly fd buf = let size = Luv.Buffer.size buf in if size > 0 then ( - let got = Eio_luv.File.read ~sw fd [buf] |> Eio_luv.or_raise |> Unsigned.Size_t.to_int in + let got = Eio_luv.File.read fd [buf] |> Eio_luv.or_raise |> Unsigned.Size_t.to_int in let next = Luv.Buffer.sub buf ~offset:got ~length:(size - got) in - read_exactly ~sw fd next + read_exactly fd next ) let () = @@ -35,7 +35,7 @@ let main _stdenv = Switch.top @@ fun sw -> let fd = Eio_luv.File.open_ ~sw "/dev/zero" [] |> Eio_luv.or_raise in let buf = Luv.Buffer.create 4 in - read_exactly ~sw fd buf; + read_exactly fd buf; traceln "Read %S" (Luv.Buffer.to_string buf); Eio_luv.File.close fd ``` diff --git a/lib_eunix/suspended.ml b/lib_eunix/suspended.ml index 47bb0a8..b7b35f2 100644 --- a/lib_eunix/suspended.ml +++ b/lib_eunix/suspended.ml @@ -1,14 +1,19 @@ open EffectHandlers.Deep -type 'a t = { +type state = { tid : Ctf.id; + mutable switch : Eio.Std.Switch.t; +} + +type 'a t = { + fibre : state; k : ('a, [`Exit_scheduler]) continuation; } let continue t v = - Ctf.note_switch t.tid; + Ctf.note_switch t.fibre.tid; continue t.k v let discontinue t ex = - Ctf.note_switch t.tid; + Ctf.note_switch t.fibre.tid; discontinue t.k ex diff --git a/tests/test_fs.md b/tests/test_fs.md index 3f2e653..9a7f448 100644 --- a/tests/test_fs.md +++ b/tests/test_fs.md @@ -22,22 +22,22 @@ let run (fn : sw:Switch.t -> Eio.Stdenv.t -> unit) = Switch.top @@ fun sw -> fn ~sw env -let read_all ?sw flow = +let read_all flow = let b = Buffer.create 100 in - Eio.Flow.copy ?sw flow (Eio.Flow.buffer_sink b); + Eio.Flow.copy flow (Eio.Flow.buffer_sink b); Buffer.contents b -let write_file ?sw ~create ?append dir path content = - Eio.Dir.with_open_out ?sw ~create ?append dir path @@ fun flow -> +let write_file ~create ?append dir path content = + Eio.Dir.with_open_out ~create ?append dir path @@ fun flow -> Eio.Flow.copy_string content flow -let try_write_file ~sw ~create ?append dir path content = - match write_file ~sw ~create ?append dir path content with +let try_write_file ~create ?append dir path content = + match write_file ~create ?append dir path content with | () -> traceln "write %S -> ok" path | exception ex -> traceln "write %S -> %a" path Fmt.exn ex -let read_file ?sw dir path = - Eio.Dir.with_open_in ?sw dir path read_all +let read_file dir path = + Eio.Dir.with_open_in dir path read_all let try_mkdir dir path = match Eio.Dir.mkdir dir path ~perm:0o700 with @@ -55,8 +55,8 @@ Creating a file and reading it back: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`Exclusive 0o666) cwd "test-file" "my-data"; - traceln "Got %S" @@ read_file ~sw cwd "test-file";; + write_file ~create:(`Exclusive 0o666) cwd "test-file" "my-data"; + traceln "Got %S" @@ read_file cwd "test-file";; +Got "my-data" - : unit = () ``` @@ -74,7 +74,7 @@ Trying to use cwd to access a file outside of that subtree fails: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`Exclusive 0o666) cwd "../test-file" "my-data"; + write_file ~create:(`Exclusive 0o666) cwd "../test-file" "my-data"; failwith "Should have failed";; Exception: Eio.Dir.Permission_denied ("../test-file", _) ``` @@ -83,7 +83,7 @@ Trying to use cwd to access an absolute path fails: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`Exclusive 0o666) cwd "/tmp/test-file" "my-data"; + write_file ~create:(`Exclusive 0o666) cwd "/tmp/test-file" "my-data"; failwith "Should have failed";; Exception: Eio.Dir.Permission_denied ("/tmp/test-file", _) ``` @@ -94,8 +94,8 @@ Exclusive create fails if already exists: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`Exclusive 0o666) cwd "test-file" "first-write"; - write_file ~sw ~create:(`Exclusive 0o666) cwd "test-file" "first-write"; + write_file ~create:(`Exclusive 0o666) cwd "test-file" "first-write"; + write_file ~create:(`Exclusive 0o666) cwd "test-file" "first-write"; failwith "Should have failed";; Exception: Eio.Dir.Already_exists ("test-file", _) ``` @@ -104,9 +104,9 @@ If-missing create succeeds if already exists: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`If_missing 0o666) cwd "test-file" "1st-write-original"; - write_file ~sw ~create:(`If_missing 0o666) cwd "test-file" "2nd-write"; - traceln "Got %S" @@ read_file ~sw cwd "test-file";; + write_file ~create:(`If_missing 0o666) cwd "test-file" "1st-write-original"; + write_file ~create:(`If_missing 0o666) cwd "test-file" "2nd-write"; + traceln "Got %S" @@ read_file cwd "test-file";; +Got "2nd-write-original" - : unit = () ``` @@ -115,9 +115,9 @@ Truncate create succeeds if already exists, and truncates: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original"; - write_file ~sw ~create:(`Or_truncate 0o666) cwd "test-file" "2nd-write"; - traceln "Got %S" @@ read_file ~sw cwd "test-file";; + write_file ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original"; + write_file ~create:(`Or_truncate 0o666) cwd "test-file" "2nd-write"; + traceln "Got %S" @@ read_file cwd "test-file";; +Got "2nd-write" - : unit = () # Unix.unlink "test-file";; @@ -128,8 +128,8 @@ Error if no create and doesn't exist: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:`Never cwd "test-file" "1st-write-original"; - traceln "Got %S" @@ read_file ~sw cwd "test-file";; + write_file ~create:`Never cwd "test-file" "1st-write-original"; + traceln "Got %S" @@ read_file cwd "test-file";; Exception: Eio.Dir.Not_found ("test-file", _) ``` @@ -137,9 +137,9 @@ Appending to an existing file: ```ocaml # run @@ fun ~sw env -> let cwd = Eio.Stdenv.cwd env in - write_file ~sw ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original"; - write_file ~sw ~create:`Never ~append:true cwd "test-file" "2nd-write"; - traceln "Got %S" @@ read_file ~sw cwd "test-file";; + write_file ~create:(`Or_truncate 0o666) cwd "test-file" "1st-write-original"; + write_file ~create:`Never ~append:true cwd "test-file" "2nd-write"; + traceln "Got %S" @@ read_file cwd "test-file";; +Got "1st-write-original2nd-write" - : unit = () # Unix.unlink "test-file";; @@ -153,7 +153,7 @@ Appending to an existing file: let cwd = Eio.Stdenv.cwd env in try_mkdir cwd "subdir"; try_mkdir cwd "subdir/nested"; - write_file ~sw ~create:(`Exclusive 0o600) cwd "subdir/nested/test-file" "data"; + write_file ~create:(`Exclusive 0o600) cwd "subdir/nested/test-file" "data"; ();; +mkdir "subdir" -> ok +mkdir "subdir/nested" -> ok @@ -196,9 +196,9 @@ Create a sandbox, write a file with it, then read it from outside: let cwd = Eio.Stdenv.cwd env in try_mkdir cwd "sandbox"; let subdir = Eio.Dir.open_dir ~sw cwd "sandbox" in - write_file ~sw ~create:(`Exclusive 0o600) subdir "test-file" "data"; + write_file ~create:(`Exclusive 0o600) subdir "test-file" "data"; try_mkdir subdir "../new-sandbox"; - traceln "Got %S" @@ read_file ~sw cwd "sandbox/test-file";; + traceln "Got %S" @@ read_file cwd "sandbox/test-file";; +mkdir "sandbox" -> ok +mkdir "../new-sandbox" -> Eio.Dir.Permission_denied ("../new-sandbox", _) +Got "data" @@ -217,9 +217,9 @@ Using `cwd` we can't access the parent, but using `fs` we can: chdir "fs-test"; Fun.protect ~finally:(fun () -> chdir "..") (fun () -> try_mkdir cwd "../outside-cwd"; - try_write_file ~sw ~create:(`Exclusive 0o600) cwd "../test-file" "data"; + try_write_file ~create:(`Exclusive 0o600) cwd "../test-file" "data"; try_mkdir fs "../outside-cwd"; - try_write_file ~sw ~create:(`Exclusive 0o600) fs "../test-file" "data"; + try_write_file ~create:(`Exclusive 0o600) fs "../test-file" "data"; ); Unix.unlink "test-file"; Unix.rmdir "outside-cwd";; diff --git a/tests/test_network.md b/tests/test_network.md index 5301d77..ede27c2 100644 --- a/tests/test_network.md +++ b/tests/test_network.md @@ -14,9 +14,9 @@ let run (fn : net:Eio.Net.t -> Switch.t -> unit) = let addr = `Tcp (Unix.inet_addr_loopback, 8081) -let read_all ?sw flow = +let read_all flow = let b = Buffer.create 100 in - Eio.Flow.copy ?sw flow (Eio.Flow.buffer_sink b); + Eio.Flow.copy flow (Eio.Flow.buffer_sink b); Buffer.contents b exception Graceful_shutdown @@ -32,7 +32,7 @@ let run_client ~sw ~net ~addr = let flow = Eio.Net.connect ~sw net addr in Eio.Flow.copy_string "Hello from client" flow; Eio.Flow.shutdown flow `Send; - let msg = read_all ~sw flow in + let msg = read_all flow in traceln "Client received: %S" msg ``` @@ -44,7 +44,7 @@ let run_server ~sw socket = Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr -> traceln "Server accepted connection from client"; Fun.protect (fun () -> - let msg = read_all ~sw flow in + let msg = read_all flow in traceln "Server received: %S" msg ) ~finally:(fun () -> Eio.Flow.copy_string "Bye" flow) ) @@ -61,7 +61,7 @@ let test_address addr ~net sw = (fun () -> run_client ~sw ~net ~addr; traceln "Client finished - cancelling server"; - Switch.turn_off sw Graceful_shutdown + raise Graceful_shutdown ) ``` @@ -106,24 +106,29 @@ Cancelling the read: ```ocaml # run @@ fun ~net sw -> - Switch.top @@ fun read_switch -> + let shutdown, set_shutdown = Promise.create () in let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in Fibre.both ~sw (fun () -> - Eio.Net.accept_sub server ~sw (fun ~sw flow _addr -> - try - let msg = read_all ~sw:read_switch flow in - traceln "Server received: %S" msg - with Switch.Cancelled Graceful_shutdown -> - Eio.Flow.copy_string "Request cancelled" flow - ) ~on_error:raise + Eio.Net.accept_sub server ~sw (fun ~sw flow _addr -> + try + Fibre.both ~sw + (fun () -> raise (Promise.await shutdown)) + (fun () -> + let msg = read_all flow in + traceln "Server received: %S" msg + ) + with Graceful_shutdown -> + Switch.top @@ fun _sw -> + Eio.Flow.copy_string "Request cancelled" flow; + ) ~on_error:raise ) (fun () -> traceln "Connecting to server..."; let flow = Eio.Net.connect ~sw net addr in traceln "Connection opened - cancelling server's read"; Fibre.yield (); - Switch.turn_off read_switch Graceful_shutdown; + Promise.fulfill set_shutdown Graceful_shutdown; let msg = read_all flow in traceln "Client received: %S" msg );; diff --git a/tests/test_switch.md b/tests/test_switch.md index 59b98e2..365ff89 100644 --- a/tests/test_switch.md +++ b/tests/test_switch.md @@ -42,8 +42,8 @@ Exception: Failure "Cancel". ```ocaml # run (fun sw -> Fibre.both ~sw - (fun () -> for i = 1 to 2 do traceln "i = %d" i; Fibre.yield ~sw () done) - (fun () -> for j = 1 to 2 do traceln "j = %d" j; Fibre.yield ~sw () done) + (fun () -> for i = 1 to 2 do traceln "i = %d" i; Fibre.yield () done) + (fun () -> for j = 1 to 2 do traceln "j = %d" j; Fibre.yield () done) );; +i = 1 +j = 1 @@ -57,7 +57,7 @@ Exception: Failure "Cancel". ```ocaml # run (fun sw -> Fibre.both ~sw - (fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield ~sw () done) + (fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield () done) (fun () -> failwith "Failed") );; +i = 1 @@ -69,8 +69,8 @@ Exception: Failure "Failed". ```ocaml # run (fun sw -> Fibre.both ~sw - (fun () -> Fibre.yield ~sw (); failwith "Failed") - (fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield ~sw () done) + (fun () -> Fibre.yield (); failwith "Failed") + (fun () -> for i = 1 to 5 do traceln "i = %d" i; Fibre.yield () done) );; +i = 1 Exception: Failure "Failed". @@ -239,7 +239,7 @@ A child can fail independently of the parent: Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> traceln "Child 2"; Promise.await ~sw p2); Promise.break r1 (Failure "Child error"); Promise.fulfill r2 (); - Fibre.yield ~sw (); + Fibre.yield (); traceln "Parent fibre is still running" );; +Child 1 @@ -262,7 +262,7 @@ A child can be cancelled independently of the parent: Promise.await ~sw p ); Switch.turn_off (Option.get !child) (Failure "Cancel child"); - Fibre.yield ~sw (); + Fibre.yield (); traceln "Parent fibre is still running" );; +Child 1 @@ -279,7 +279,7 @@ A child error handle raises: let on_error = raise in Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> traceln "Child"; Promise.await ~sw p); Promise.break r (Failure "Child error escapes"); - Fibre.yield ~sw (); + Fibre.yield (); traceln "Not reached" );; +Child diff --git a/tests/test_time.md b/tests/test_time.md index 4faf2eb..662690c 100644 --- a/tests/test_time.md +++ b/tests/test_time.md @@ -30,9 +30,8 @@ Check sleep works with a switch: ```ocaml # run @@ fun ~clock -> - Switch.top @@ fun sw -> let t0 = Unix.gettimeofday () in - Eio.Time.sleep ~sw clock 0.01; + Eio.Time.sleep clock 0.01; let t1 = Unix.gettimeofday () in assert (t1 -. t0 >= 0.01);; - : unit = () @@ -44,8 +43,8 @@ Cancelling sleep: # run @@ fun ~clock -> Switch.top @@ fun sw -> Fibre.both ~sw - (fun () -> Eio.Time.sleep ~sw clock 1200.; assert false) - (fun () -> Switch.turn_off sw (Failure "Simulated cancel"));; + (fun () -> Eio.Time.sleep clock 1200.; assert false) + (fun () -> failwith "Simulated cancel");; Exception: Failure "Simulated cancel". ``` @@ -55,7 +54,7 @@ Switch is already off: # run @@ fun ~clock -> Switch.top @@ fun sw -> Switch.turn_off sw (Failure "Simulated failure"); - Eio.Time.sleep ~sw clock 1200.0; + Eio.Time.sleep clock 1200.0; assert false;; Exception: Failure "Simulated failure". ``` @@ -66,7 +65,7 @@ Scheduling a timer that's already due: # run @@ fun ~clock -> Switch.top @@ fun sw -> Fibre.both ~sw - (fun () -> traceln "First fibre runs"; Eio.Time.sleep ~sw clock (-1.0); traceln "Sleep done") + (fun () -> traceln "First fibre runs"; Eio.Time.sleep clock (-1.0); traceln "Sleep done") (fun () -> traceln "Second fibre runs");; +First fibre runs +Second fibre runs @@ -81,13 +80,13 @@ Check ordering works: Switch.top @@ fun sw -> Fibre.both ~sw (fun () -> - Eio.Time.sleep ~sw clock 1200.0; + Eio.Time.sleep clock 1200.0; assert false ) (fun () -> Eio.Time.sleep clock 0.1; traceln "Short timer finished"; - Switch.turn_off sw (Failure "Simulated cancel") + failwith "Simulated cancel" );; +Short timer finished Exception: Failure "Simulated cancel".