Add eio_luv backend

This commit is contained in:
Thomas Leonard 2021-07-21 10:59:30 +01:00
parent de9d965455
commit b77176e212
24 changed files with 949 additions and 30 deletions

View File

@ -65,10 +65,11 @@ It is able to run a web-server with [good performance][http-bench], but many fea
## Structure of the code
- `eio` provides concurrency primitives (promises, etc), and a high-level, cross-platform OS API.
- `eio_luv` provides a cross-platform backend for these APIs using [luv](https://github.com/aantron/luv) (libuv),
- `eio_linux` provides a Linux io-uring backend for these APIs,
plus a low-level API that can be used directly (in non-portable code).
- `eunix` provides some common code shared by multiple backends.
- `eio_main` selects an appropriate backend (e.g. `eio_linux`), depending on your platform.
- `eio_main` selects an appropriate backend (e.g. `eio_linux` or `eio_luv`), depending on your platform.
- `ctf` provides tracing support.
## Getting started
@ -465,8 +466,8 @@ let try_mkdir dir path =
try_mkdir cwd "../dir2";
try_mkdir cwd "/tmp/dir3";
+mkdir "dir1" -> ok
+mkdir "../dir2" -> Eio.Dir.Permission_denied("..", _)
+mkdir "/tmp/dir3" -> Eio.Dir.Permission_denied("/tmp", _)
+mkdir "../dir2" -> Eio.Dir.Permission_denied("../dir2", _)
+mkdir "/tmp/dir3" -> Eio.Dir.Permission_denied("/tmp/dir3", _)
- : unit = ()
```
@ -508,6 +509,8 @@ A program that operates on the current directory will probably want to use `cwd`
whereas a program that accepts a path from the user will probably want to use `fs`,
perhaps with `open_dir` to constrain all access to be within that directory.
Note: the `eio_luv` backend doesn't have the `openat`, `mkdirat`, etc calls that are necessary to implement these checks without races.
## Time
The standard environment provides a clock with the usual POSIX time:

View File

@ -1,3 +1,4 @@
(executable
(name bench)
(enabled_if (= %{system} "linux"))
(libraries eio_main))

View File

@ -8,8 +8,7 @@
(package
(name eio)
(synopsis "Effect-based direct-style IO API for OCaml")
(description
"An effect-based IO API for multicore OCaml with fibres.")
(description "An effect-based IO API for multicore OCaml with fibres.")
(depends
(ctf (= :version))
(cstruct (>= 6.0.0))
@ -18,9 +17,9 @@
(package
(name eunix)
(synopsis "Eio implementation for Unix-compatible systems")
(description
"An eio implementation for Unix-compatible systems.")
(description "An eio implementation for Unix-compatible systems.")
(depends
(optint (>= 0.1.0))
(ocaml-variants (= "4.12.0+domains+effects"))
(ctf (= :version))
(eio (= :version))
@ -32,9 +31,9 @@
(package
(name eio_linux)
(synopsis "Eio implementation for Linux using io-uring")
(description
"An eio implementation for Linux using io-uring.")
(description "An eio implementation for Linux using io-uring.")
(depends
(alcotest (and (>= 1.4.0) :with-test))
(ocaml-variants (= "4.12.0+domains+effects"))
(ctf (= :version))
(eio (= :version))
@ -44,6 +43,20 @@
(fmt (>= 0.8.9))
(bigstringaf (>= 0.7.0))
uring))
(package
(name eio_luv)
(synopsis "Eio implementation using luv (libuv)")
(description "An eio implementation for most platforms, using luv.")
(depends
(ocaml-variants (= "4.12.0+domains+effects"))
(ctf (= :version))
(eio (= :version))
(eunix (= :version))
(luv (>= 0.5.8))
(mdx (and (>= 1.10.0) :with-test))
(logs (>= 0.7.0))
(fmt (>= 0.8.9))
(bigstringaf (>= 0.7.0))))
(package
(name ctf)
(synopsis "CTF tracing")
@ -57,8 +70,8 @@
(package
(name eio_main)
(synopsis "Effect-based direct-style IO mainloop for OCaml")
(description
"Selects an appropriate Eio backend for the current platform.")
(description "Selects an appropriate Eio backend for the current platform.")
(depends
(eio_linux (= :version))))
(eio_linux (= :version))
(eio_luv (= :version))))
(using mdx 0.1)

View File

@ -9,6 +9,7 @@ homepage: "https://github.com/ocaml-multicore/eio"
bug-reports: "https://github.com/ocaml-multicore/eio/issues"
depends: [
"dune" {>= "2.9"}
"alcotest" {>= "1.4.0" & with-test}
"ocaml-variants" {= "4.12.0+domains+effects"}
"ctf" {= version}
"eio" {= version}

40
eio_luv.opam Normal file
View File

@ -0,0 +1,40 @@
# This file is generated by dune, edit dune-project instead
opam-version: "2.0"
synopsis: "Eio implementation using luv (libuv)"
description: "An eio implementation for most platforms, using luv."
maintainer: ["anil@recoil.org"]
authors: ["Anil Madhavapeddy" "Thomas Leonard"]
license: "ISC"
homepage: "https://github.com/ocaml-multicore/eio"
bug-reports: "https://github.com/ocaml-multicore/eio/issues"
depends: [
"dune" {>= "2.9"}
"ocaml-variants" {= "4.12.0+domains+effects"}
"ctf" {= version}
"eio" {= version}
"eunix" {= version}
"luv" {>= "0.5.8"}
"mdx" {>= "1.10.0" & with-test}
"logs" {>= "0.7.0"}
"fmt" {>= "0.8.9"}
"bigstringaf" {>= "0.7.0"}
"odoc" {with-doc}
]
build: [
["dune" "subst" "--root" "."] {dev}
[
"dune"
"build"
"-p"
name
"-j"
jobs
"--promote-install-files"
"false"
"@install"
"@runtest" {with-test}
"@doc" {with-doc}
]
["dune" "install" "-p" name "--create-install-files" name]
]
dev-repo: "git+https://github.com/ocaml-multicore/eio.git"

View File

@ -10,6 +10,7 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues"
depends: [
"dune" {>= "2.9"}
"eio_linux" {= version}
"eio_luv" {= version}
"odoc" {with-doc}
]
build: [

View File

@ -9,6 +9,7 @@ homepage: "https://github.com/ocaml-multicore/eio"
bug-reports: "https://github.com/ocaml-multicore/eio/issues"
depends: [
"dune" {>= "2.9"}
"optint" {>= "0.1.0"}
"ocaml-variants" {= "4.12.0+domains+effects"}
"ctf" {= version}
"eio" {= version}

View File

@ -182,6 +182,12 @@ module Dir = struct
exception Not_found of path * exn
exception Permission_denied of path * exn
class virtual rw = object (_ : #Generic.t)
method probe _ = None
inherit Flow.read
inherit Flow.write
end
type create = [`Never | `If_missing of Unix.file_perm | `Or_truncate of Unix.file_perm | `Exclusive of Unix.file_perm]
class virtual t = object
@ -190,7 +196,7 @@ module Dir = struct
sw:Switch.t ->
append:bool ->
create:create ->
path -> <Flow.two_way; Flow.close>
path -> <rw; Flow.close>
method virtual mkdir : ?sw:Switch.t -> perm:Unix.file_perm -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> t_with_close
end

View File

@ -404,6 +404,12 @@ module Dir : sig
exception Not_found of path * exn
exception Permission_denied of path * exn
class virtual rw : object
inherit Generic.t
inherit Flow.read
inherit Flow.write
end
type create = [`Never | `If_missing of Unix.file_perm | `Or_truncate of Unix.file_perm | `Exclusive of Unix.file_perm]
(** When to create a new file:
If [`Never] then it's an error if the named file doesn't exist.
@ -419,7 +425,7 @@ module Dir : sig
sw:Switch.t ->
append:bool ->
create:create ->
path -> <Flow.two_way; Flow.close>
path -> <rw; Flow.close>
method virtual mkdir : ?sw:Switch.t -> perm:Unix.file_perm -> path -> unit
method virtual open_dir : sw:Switch.t -> path -> t_with_close
end
@ -440,7 +446,7 @@ module Dir : sig
sw:Switch.t ->
?append:bool ->
create:create ->
#t -> path -> <Flow.two_way; Flow.close>
#t -> path -> <rw; Flow.close>
(** [open_out ~sw t path] opens [t/path] for reading and writing.
Note: files are always opened in binary mode.
@param append Open for appending: always write at end of file.
@ -450,7 +456,7 @@ module Dir : sig
?sw:Switch.t ->
?append:bool ->
create:create ->
#t -> path -> (<Flow.two_way; Flow.close> -> 'a) -> 'a
#t -> path -> (<rw; Flow.close> -> 'a) -> 'a
(** [with_open_out] is like [open_out], but calls [fn flow] with the new flow and closes
it automatically when [fn] returns (if it hasn't already been closed by then). *)

View File

@ -1,6 +1,7 @@
(library
(name eio_linux)
(public_name eio_linux)
(enabled_if (= %{system} "linux"))
(foreign_stubs
(language c)
(names eio_stubs))

View File

@ -34,6 +34,7 @@ let wrap_errors path fn =
| Unix.Unix_error(Unix.EEXIST, _, _) as ex -> raise @@ Eio.Dir.Already_exists (path, ex)
| Unix.Unix_error(Unix.ENOENT, _, _) as ex -> raise @@ Eio.Dir.Not_found (path, ex)
| Unix.Unix_error(Unix.EXDEV, _, _) as ex -> raise @@ Eio.Dir.Permission_denied (path, ex)
| Eio.Dir.Permission_denied _ as ex -> raise @@ Eio.Dir.Permission_denied (path, ex)
let rec skip_empty = function
| c :: cs when Cstruct.length c = 0 -> skip_empty cs
@ -593,7 +594,7 @@ let mkdir_beneath ?sw ~perm ?dir path =
(* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *)
Switch.sub_opt sw (fun sw ->
let parent =
wrap_errors dir_path @@ fun () ->
wrap_errors path @@ fun () ->
openat2 ~sw ~seekable:false ?dir dir_path
~access:`R
~flags:Uring.Open_flags.(cloexec + path + directory)
@ -853,7 +854,7 @@ module Objects = struct
~perm
~resolve:resolve_flags
in
(flow fd :> <Eio.Flow.two_way; Eio.Flow.close>)
(flow fd :> <Eio.Dir.rw; Eio.Flow.close>)
method open_dir ~sw path =
let fd = openat2 ~sw ~seekable:false ?dir:fd path

View File

@ -1,19 +1,24 @@
(library
(name eurcp_lib)
(modules eurcp_lib)
(libraries eio_linux))
(name eurcp_lib)
(enabled_if (= %{system} "linux"))
(modules eurcp_lib)
(libraries eio_linux))
(executable
(name eurcp)
(enabled_if (= %{system} "linux"))
(modules eurcp)
(libraries cmdliner logs.cli logs.fmt fmt.tty fmt.cli eurcp_lib))
(executable
(name basic_eio_linux)
(enabled_if (= %{system} "linux"))
(modules basic_eio_linux)
(libraries logs.fmt fmt.tty eurcp_lib))
(test
(name test)
(package eio_linux)
(enabled_if (= %{system} "linux"))
(modules test)
(libraries alcotest eio_linux))

4
lib_eio_luv/dune Normal file
View File

@ -0,0 +1,4 @@
(library
(name eio_luv)
(public_name eio_luv)
(libraries eio luv eunix unix logs fmt bigstringaf ctf))

625
lib_eio_luv/eio_luv.ml Normal file
View File

@ -0,0 +1,625 @@
(*
* Copyright (C) 2021 Thomas Leonard
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)
let src = Logs.Src.create "eio_luv" ~doc:"Eio backend using luv"
module Log = (val Logs.src_log src : Logs.LOG)
open Eio.Std
(* SIGPIPE makes no sense in a modern application. *)
let () = Sys.(set_signal sigpipe Signal_ignore)
type 'a or_error = ('a, Luv.Error.t) result
exception Luv_error of Luv.Error.t
let wrap_error ~path e =
let ex = Luv_error e in
match e with
| `EEXIST -> Eio.Dir.Already_exists (path, ex)
| `ENOENT -> Eio.Dir.Not_found (path, ex)
| _ -> ex
let or_raise = function
| Ok x -> x
| Error e -> raise (Luv_error e)
let or_raise_path path = function
| Ok x -> x
| Error e -> raise (wrap_error ~path e)
module Suspended = struct
type 'a t = {
tid : Ctf.id;
k : ('a, unit) continuation;
}
let continue t v =
Ctf.note_switch t.tid;
continue t.k v
let discontinue t ex =
Ctf.note_switch t.tid;
discontinue t.k ex
let continue_result t = function
| Ok x -> continue t x
| Error x -> discontinue t x
end
effect Await : (('a -> unit) -> unit) -> 'a
let await fn = perform (Await fn)
effect Enter : ('a Suspended.t -> unit) -> 'a
let enter fn = perform (Enter fn)
let await_exn fn =
perform (Await fn) |> or_raise
let enqueue_thread k v =
let yield = Luv.Timer.init () |> or_raise in
Luv.Timer.start yield 0 (fun () -> Suspended.continue k v) |> or_raise
let enqueue_result_thread k r =
let yield = Luv.Timer.init () |> or_raise in
Luv.Timer.start yield 0 (fun () -> Suspended.continue_result k r) |> or_raise
let enqueue_failed_thread k ex =
let yield = Luv.Timer.init () |> or_raise in
Luv.Timer.start yield 0 (fun () -> Suspended.discontinue k ex) |> or_raise
let yield ?sw () =
Option.iter Switch.check sw;
enter @@ fun k ->
enqueue_thread k ()
let with_cancel ?sw ~request fn =
let cancel = Switch.add_cancel_hook_opt sw (fun _ ->
match Luv.Request.cancel request with
| Ok () -> ()
| Error e -> Log.debug (fun f -> f "Cancel failed: %s" (Luv.Error.strerror e))
) in
Fun.protect fn ~finally:(fun () -> Switch.remove_hook cancel)
module Handle = struct
type 'a t = {
mutable release_hook : Switch.hook; (* Use this on close to remove switch's [on_release] hook. *)
mutable fd : [`Open of 'a Luv.Handle.t | `Closed]
}
let get op = function
| { fd = `Open fd; _ } -> fd
| { fd = `Closed ; _ } -> invalid_arg (op ^ ": handle used after calling close!")
let is_open = function
| { fd = `Open _; _ } -> true
| { fd = `Closed; _ } -> false
let close t =
Ctf.label "close";
let fd = get "close" t in
t.fd <- `Closed;
Switch.remove_hook t.release_hook;
enter @@ fun k ->
Luv.Handle.close fd (Suspended.continue k)
let ensure_closed t =
if is_open t then close t
let to_luv x = get "to_luv" x
let of_luv_no_hook fd =
{ fd = `Open fd; release_hook = Switch.null_hook }
let of_luv ~sw fd =
let t = of_luv_no_hook fd in
t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t);
t
end
module File = struct
type t = {
mutable release_hook : Switch.hook; (* Use this on close to remove switch's [on_release] hook. *)
mutable fd : [`Open of Luv.File.t | `Closed]
}
let get op = function
| { fd = `Open fd; _ } -> fd
| { fd = `Closed ; _ } -> invalid_arg (op ^ ": file descriptor used after calling close!")
let is_open = function
| { fd = `Open _; _ } -> true
| { fd = `Closed; _ } -> false
let close t =
Ctf.label "close";
let fd = get "close" t in
t.fd <- `Closed;
Switch.remove_hook t.release_hook;
await_exn (Luv.File.close fd)
let ensure_closed t =
if is_open t then close t
let to_luv = get "to_luv"
let of_luv_no_hook fd =
{ fd = `Open fd; release_hook = Switch.null_hook }
let of_luv ~sw fd =
let t = of_luv_no_hook fd in
t.release_hook <- Switch.on_release_cancellable sw (fun () -> ensure_closed t);
t
let open_ ~sw ?mode path flags =
let request = Luv.File.Request.make () in
with_cancel ~sw ~request @@ fun () ->
await (Luv.File.open_ ?mode ~request path flags) |> Result.map (of_luv ~sw)
let read ?sw fd bufs =
let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () ->
await (Luv.File.read ~request (get "read" fd) bufs)
let rec write ?sw fd bufs =
let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () ->
let sent = await_exn (Luv.File.write ~request (get "write" fd) bufs) in
let rec aux = function
| [] -> ()
| x :: xs when Luv.Buffer.size x = 0 -> aux xs
| bufs -> write ?sw fd bufs
in
aux @@ Luv.Buffer.drop bufs (Unsigned.Size_t.to_int sent)
let realpath ?sw path =
let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () ->
await (Luv.File.realpath ~request path)
let mkdir ?sw ~mode path =
let request = Luv.File.Request.make () in
with_cancel ?sw ~request @@ fun () ->
await (Luv.File.mkdir ~request ~mode path)
end
module Stream = struct
type 'a t = [`Stream of 'a] Handle.t
let rec read_into ?sw (sock:'a t) buf =
Option.iter Switch.check sw;
let r = enter (fun k ->
let cancel = Switch.add_cancel_hook_opt sw (fun ex ->
Luv.Stream.read_stop (Handle.get "read_into:cancel" sock) |> or_raise;
enqueue_failed_thread k (Switch.Cancelled ex)
) in
Luv.Stream.read_start (Handle.get "read_start" sock) ~allocate:(fun _ -> buf) (fun r ->
Switch.remove_hook cancel;
Luv.Stream.read_stop (Handle.get "read_stop" sock) |> or_raise;
Suspended.continue k r
)
) in
match r with
| Ok buf' ->
let len = Luv.Buffer.size buf' in
if len > 0 then len
else read_into ?sw sock buf (* Luv uses a zero-length read to mean EINTR! *)
| Error `EOF -> raise End_of_file
| Error x -> raise (Luv_error x)
let rec skip_empty = function
| empty :: xs when Luv.Buffer.size empty = 0 -> skip_empty xs
| xs -> xs
let rec write ?sw t bufs =
let err, n =
(* note: libuv doesn't seem to allow cancelling stream writes *)
enter (fun k ->
Luv.Stream.write (Handle.get "write_stream" t) bufs @@ fun err n ->
Suspended.continue k (err, n)
)
in
or_raise err;
match Luv.Buffer.drop bufs n |> skip_empty with
| [] -> ()
| bufs -> write ?sw t bufs
end
let sleep_until ?sw due =
Option.iter Switch.check sw;
let delay = 1000. *. (due -. Unix.gettimeofday ()) |> ceil |> truncate |> max 0 in
let timer = Luv.Timer.init () |> or_raise in
enter @@ fun k ->
let cancel = Switch.add_cancel_hook_opt sw (fun ex ->
Luv.Timer.stop timer |> or_raise;
Luv.Handle.close timer (fun () -> ());
enqueue_failed_thread k ex
) in
Luv.Timer.start timer delay (fun () ->
Switch.remove_hook cancel;
Suspended.continue k ()
) |> or_raise
let run_compute fn =
match fn () with
| x -> x
| effect Eio.Private.Effects.Trace k -> continue k Eunix.Trace.default_traceln
module Objects = struct
type _ Eio.Generic.ty += FD : File.t Eio.Generic.ty
type has_fd = < fd : File.t >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd >
let get_fd (t : <has_fd; ..>) = t#fd
let get_fd_opt t = Eio.Generic.probe t FD
let flow fd = object (_ : <source; sink; ..>)
method fd = fd
method close = File.close fd
method probe : type a. a Eio.Generic.ty -> a option = function
| FD -> Some fd
| _ -> None
method read_into ?sw buf =
let buf = Cstruct.to_bigarray buf in
match File.read ?sw fd [buf] |> or_raise |> Unsigned.Size_t.to_int with
| 0 -> raise End_of_file
| got -> got
method read_methods = []
method write ?sw src =
let buf = Luv.Buffer.create 4096 in
try
while true do
let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in
let sub = Luv.Buffer.sub buf ~offset:0 ~length:got in
File.write ?sw fd [sub]
done
with End_of_file -> ()
end
let source fd = (flow fd :> source)
let sink fd = (flow fd :> sink)
let socket sock = object
inherit Eio.Flow.two_way
method read_into ?sw buf =
let buf = Cstruct.to_bigarray buf in
Stream.read_into ?sw sock buf
method read_methods = []
method write ?sw src =
let buf = Luv.Buffer.create 4096 in
try
while true do
let got = Eio.Flow.read_into src (Cstruct.of_bigarray buf) in
let buf' = Luv.Buffer.sub buf ~offset:0 ~length:got in
Stream.write ?sw sock [buf']
done
with End_of_file -> ()
method close =
Handle.close sock
method shutdown = function
| `Send -> await_exn @@ Luv.Stream.shutdown (Handle.get "shutdown" sock)
| `Receive -> failwith "shutdown receive not supported"
| `All ->
Log.warn (fun f -> f "shutdown receive not supported");
await_exn @@ Luv.Stream.shutdown (Handle.get "shutdown" sock)
end
class virtual ['a] listening_socket ~backlog sock = object (self)
inherit Eio.Net.listening_socket
val ready = Eio.Semaphore.make 0
method private virtual make_client : 'a Luv.Stream.t
method private virtual get_client_addr : 'a Stream.t -> Eio.Net.Sockaddr.t
method close = Handle.close sock
method accept_sub ~sw ~on_error fn =
Eio.Semaphore.acquire ~sw ready;
let client = self#make_client |> Handle.of_luv_no_hook in
match Luv.Stream.accept ~server:(Handle.get "accept" sock) ~client:(Handle.get "accept" client) with
| Error e ->
Handle.close client;
raise (Luv_error e)
| Ok () ->
Fibre.fork_sub_ignore ~sw ~on_error
(fun sw ->
let client_addr = self#get_client_addr client in
fn ~sw (socket client :> <Eio.Flow.two_way; Eio.Flow.close>) client_addr
)
~on_release:(fun () -> Handle.ensure_closed client)
initializer
Luv.Stream.listen ~backlog (Handle.get "listen" sock) (fun x ->
or_raise x;
Eio.Semaphore.release ready
)
end
(* TODO: implement, or maybe remove from the Eio API.
Luv makes TCP sockets reuse_addr by default, and maybe that's fine everywhere.
Extracting the FD will require https://github.com/aantron/luv/issues/120 *)
let luv_reuse_addr _sock _v = ()
(* This is messy. Should make a generic sockaddr type for eio. *)
let luv_addr_of_unix host port =
let host = Unix.string_of_inet_addr host in
match Luv.Sockaddr.ipv6 host port with
| Ok addr -> addr
| Error _ -> Luv.Sockaddr.ipv4 host port |> or_raise
let luv_ip_addr_to_unix addr =
let host = Luv.Sockaddr.to_string addr |> Option.get in
let port = Luv.Sockaddr.port addr |> Option.get in
(Unix.inet_addr_of_string host, port)
let listening_ip_socket ~backlog sock = object
inherit [[ `TCP ]] listening_socket ~backlog sock
method private make_client = Luv.TCP.init () |> or_raise
method private get_client_addr c =
`Tcp (Luv.TCP.getpeername (Handle.get "get_client_addr" c) |> or_raise |> luv_ip_addr_to_unix)
end
let listening_unix_socket ~backlog sock = object
inherit [[ `Pipe ]] listening_socket ~backlog sock
method private make_client = Luv.Pipe.init () |> or_raise
method private get_client_addr c =
`Unix (Luv.Pipe.getpeername (Handle.get "get_client_addr" c) |> or_raise)
end
let net = object
inherit Eio.Net.t
method listen ~reuse_addr ~backlog ~sw = function
| `Tcp (host, port) ->
let sock = Luv.TCP.init () |> or_raise |> Handle.of_luv ~sw in
luv_reuse_addr sock reuse_addr;
let addr = luv_addr_of_unix host port in
Luv.TCP.bind (Handle.get "bind" sock) addr |> or_raise;
listening_ip_socket ~backlog sock
| `Unix path ->
let sock = Luv.Pipe.init () |> or_raise |> Handle.of_luv ~sw in
luv_reuse_addr sock reuse_addr;
if reuse_addr then (
match Unix.lstat path with
| Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path
| _ -> ()
| exception Unix.Unix_error (Unix.ENOENT, _, _) -> ()
);
Luv.Pipe.bind (Handle.get "bind" sock) path |> or_raise;
(* Remove the path when done (except for abstract sockets). *)
if String.length path > 0 && path.[0] <> Char.chr 0 then
Switch.on_release sw (fun () -> Unix.unlink path);
listening_unix_socket ~backlog sock
(* todo: how do you cancel connect operations with luv? *)
method connect ~sw = function
| `Tcp (host, port) ->
let sock = Luv.TCP.init () |> or_raise |> Handle.of_luv ~sw in
let addr = luv_addr_of_unix host port in
await_exn (Luv.TCP.connect (Handle.get "connect" sock) addr);
socket sock
| `Unix path ->
let sock = Luv.Pipe.init () |> or_raise |> Handle.of_luv ~sw in
await_exn (Luv.Pipe.connect (Handle.get "connect" sock) path);
socket sock
end
type stdenv = <
stdin : source;
stdout : sink;
stderr : sink;
net : Eio.Net.t;
domain_mgr : Eio.Domain_manager.t;
clock : Eio.Time.clock;
fs : Eio.Dir.t;
cwd : Eio.Dir.t;
>
let domain_mgr = object
inherit Eio.Domain_manager.t
method run_compute_unsafe (type a) fn =
let domain_k : (unit Domain.t * a Suspended.t) option ref = ref None in
let result = ref None in
let async = Luv.Async.init (fun async ->
(* This is called in the parent domain after returning to the mainloop,
so [domain_k] must be set by then. *)
let domain, k = Option.get !domain_k in
Domain.join domain;
Luv.Handle.close async @@ fun () ->
Suspended.continue_result k (Option.get !result)
) |> or_raise
in
enter @@ fun k ->
let d = Domain.spawn (fun () ->
result := Some (match run_compute fn with
| v -> Ok v
| exception ex -> Error ex
);
Luv.Async.send async |> or_raise
) in
domain_k := Some (d, k)
end
let clock = object
inherit Eio.Time.clock
method now = Unix.gettimeofday ()
method sleep_until = sleep_until
end
(* Warning: libuv doesn't provide [openat], etc, and so there is probably no way to make this safe.
We make a best-efforts attempt to enforce the sandboxing using realpath and [`NOFOLLOW].
todo: this needs more testing *)
class dir dir_path = object (self)
inherit Eio.Dir.t
(* Resolve a relative path to an absolute one, with no symlinks.
@raise Eio.Dir.Permission_denied if it's outside of [dir_path]. *)
method private resolve ?sw path =
if Filename.is_relative path then (
let dir_path = File.realpath ?sw dir_path |> or_raise_path dir_path in
let full = File.realpath ?sw (Filename.concat dir_path path) |> or_raise_path path in
let prefix_len = String.length dir_path + 1 in
if String.length full >= prefix_len && String.sub full 0 prefix_len = dir_path ^ Filename.dir_sep then
full
else if full = dir_path then
full
else
raise (Eio.Dir.Permission_denied (path, Failure (Fmt.str "Path %S is outside of sandbox %S" full dir_path)))
) else (
raise (Eio.Dir.Permission_denied (path, Failure (Fmt.str "Path %S is absolute" path)))
)
(* We want to create [path]. Check that the parent is in the sandbox. *)
method private resolve_new ?sw path =
let dir, leaf = Filename.dirname path, Filename.basename path in
if leaf = ".." then Fmt.failwith "New path %S ends in '..'!" path
else match self#resolve ?sw dir with
| dir -> Filename.concat dir leaf
| exception Eio.Dir.Permission_denied (dir, ex) ->
raise (Eio.Dir.Permission_denied (Filename.concat dir leaf, ex))
method open_in ~sw path =
let fd = File.open_ ~sw (self#resolve ~sw path) [`NOFOLLOW; `RDONLY] |> or_raise_path path in
(flow fd :> <Eio.Flow.source; Eio.Flow.close>)
method open_out ~sw ~append ~create path =
let mode, flags =
match create with
| `Never -> 0, []
| `If_missing perm -> perm, [`CREAT]
| `Or_truncate perm -> perm, [`CREAT; `TRUNC]
| `Exclusive perm -> perm, [`CREAT; `EXCL]
in
let flags = if append then `APPEND :: flags else flags in
let flags = `RDWR :: `NOFOLLOW :: flags in
let real_path =
if create = `Never then self#resolve path
else self#resolve_new path
in
let fd = File.open_ ~sw real_path flags ~mode:[`NUMERIC mode] |> or_raise_path path in
(flow fd :> <Eio.Dir.rw; Eio.Flow.close>)
method open_dir ~sw path =
Switch.check sw;
new dir (self#resolve ~sw path)
(* libuv doesn't seem to provide a race-free way to do this. *)
method mkdir ?sw ~perm path =
let real_path = self#resolve_new ?sw path in
File.mkdir ~mode:[`NUMERIC perm] real_path |> or_raise_path path
method close = ()
end
(* Full access to the filesystem. *)
let fs = object
inherit dir "/"
(* No checks *)
method! private resolve ?sw:_ path = path
end
let cwd = object
inherit dir "."
end
let stdenv () =
let stdin = lazy (source (File.of_luv_no_hook Luv.File.stdin)) in
let stdout = lazy (sink (File.of_luv_no_hook Luv.File.stdout)) in
let stderr = lazy (sink (File.of_luv_no_hook Luv.File.stderr)) in
object (_ : stdenv)
method stdin = Lazy.force stdin
method stdout = Lazy.force stdout
method stderr = Lazy.force stderr
method net = net
method domain_mgr = domain_mgr
method clock = clock
method fs = (fs :> Eio.Dir.t)
method cwd = (cwd :> Eio.Dir.t)
end
end
let run main =
Log.debug (fun l -> l "starting run");
let stdenv = Objects.stdenv () in
let rec fork ~tid fn =
Ctf.note_switch tid;
match fn () with
| () -> ()
| effect (Await fn) k ->
let k = { Suspended.k; tid } in
fn (Suspended.continue k)
| effect Eio.Private.Effects.Trace k -> continue k Eunix.Trace.default_traceln
| effect (Eio.Private.Effects.Fork f) k ->
let k = { Suspended.k; tid } in
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Task;
let promise, resolver = Promise.create_with_id id in
enqueue_thread k promise;
fork
~tid:id
(fun () ->
match f () with
| x -> Promise.fulfill resolver x
| exception ex ->
Log.debug (fun f -> f "Forked fibre failed: %a" Fmt.exn ex);
Promise.break resolver ex
)
| effect (Eio.Private.Effects.Fork_ignore f) k ->
let k = { Suspended.k; tid } in
enqueue_thread k ();
let child = Ctf.note_fork () in
Ctf.note_switch child;
fork ~tid:child (fun () ->
match f () with
| () ->
Ctf.note_resolved child ~ex:None
| exception ex ->
Ctf.note_resolved child ~ex:(Some ex)
)
| effect (Enter fn) k -> fn { Suspended.k; tid }
| effect (Eio.Private.Effects.Suspend fn) k ->
let k = { Suspended.k; tid } in
fn tid (enqueue_result_thread k)
in
let main_status = ref `Running in
fork ~tid:(Ctf.mint_id ()) (fun () ->
match main stdenv with
| () -> main_status := `Done
| exception ex -> main_status := `Ex ex
);
ignore (Luv.Loop.run () : bool);
match !main_status with
| `Done -> ()
| `Ex ex -> raise ex
| `Running -> failwith "Deadlock detected: no events scheduled but main function hasn't returned"

124
lib_eio_luv/eio_luv.mli Normal file
View File

@ -0,0 +1,124 @@
(*
* Copyright (C) 2021 Thomas Leonard
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)
open Eio.Std
type 'a or_error = ('a, Luv.Error.t) result
exception Luv_error of Luv.Error.t
val or_raise : 'a or_error -> 'a
(** [or_error (Error e)] raises [Luv_error e]. *)
val await : (('a -> unit) -> unit) -> 'a
(** [await fn] converts a function using a luv-style callback to one using effects.
Use it as e.g. [await (Luv.File.realpath path)]. *)
(** {1 Time functions} *)
val sleep_until : ?sw:Switch.t -> float -> unit
(** [sleep_until time] blocks until the current time is [time].
@param sw Cancel the sleep if [sw] is turned off. *)
val yield : ?sw:Switch.t -> unit -> unit
(** {1 Low-level wrappers for Luv functions} *)
module File : sig
type t
val is_open : t -> bool
(** [is_open t] is [true] if {!close t} hasn't been called yet. *)
val close : t -> unit
(** [close t] closes [t].
@raise Invalid_arg if [t] is already closed. *)
val of_luv : sw:Switch.t -> Luv.File.t -> t
(** [of_luv ~sw fd] wraps [fd] as an open file descriptor.
This is unsafe if [fd] is closed directly (before or after wrapping it).
@param sw The FD is closed when [sw] is released, if not closed manually first. *)
val to_luv : t -> Luv.File.t
(** [to_luv t] returns the wrapped descriptor.
This allows unsafe access to the FD.
@raise Invalid_arg if [t] is closed. *)
val open_ :
sw:Switch.t ->
?mode:Luv.File.Mode.t list ->
string -> Luv.File.Open_flag.t list -> t or_error
(** Wraps {!Luv.File.open_} *)
val read : ?sw:Switch.t -> t -> Luv.Buffer.t list -> Unsigned.Size_t.t or_error
(** Wraps {!Luv.File.read} *)
val write : ?sw:Switch.t -> t -> Luv.Buffer.t list -> unit
(** [write t bufs] writes all the data in [bufs] (which may take several calls to {!Luv.File.write}). *)
val realpath : ?sw:Switch.t -> string -> string or_error
(** Wraps {!Luv.File.realpath} *)
val mkdir : ?sw:Switch.t -> mode:Luv.File.Mode.t list -> string -> unit or_error
(** Wraps {!Luv.File.mkdir} *)
end
module Handle : sig
type 'a t
val is_open : 'a t -> bool
(** [is_open t] is [true] if {!close t} hasn't been called yet. *)
val close : 'a t -> unit
(** [close t] closes [t].
@raise Invalid_arg if [t] is already closed. *)
val to_luv : 'a t -> 'a Luv.Handle.t
(** [to_luv t] returns the wrapped handle.
This allows unsafe access to the handle.
@raise Invalid_arg if [t] is closed. *)
val of_luv : sw:Switch.t -> 'a Luv.Handle.t -> 'a t
(** [of_luv ~sw h] wraps [h] as an open handle.
This is unsafe if [h] is closed directly (before or after wrapping it).
@param sw The handle is closed when [sw] is released, if not closed manually first. *)
end
(** {1 Eio API} *)
module Objects : sig
type has_fd = < fd : File.t >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd >
type stdenv = <
stdin : source;
stdout : sink;
stderr : sink;
net : Eio.Net.t;
domain_mgr : Eio.Domain_manager.t;
clock : Eio.Time.clock;
fs : Eio.Dir.t;
cwd : Eio.Dir.t;
>
val get_fd : <has_fd; ..> -> File.t
val get_fd_opt : #Eio.Generic.t -> File.t option
end
(** {1 Main Loop} *)
val run : (Objects.stdenv -> unit) -> unit

3
lib_eio_luv/tests/dune Normal file
View File

@ -0,0 +1,3 @@
(mdx
(package eio_luv)
(packages eio_luv))

View File

@ -0,0 +1,47 @@
# Set up the test environment
```ocaml
# #require "eio_luv";;
# open Eio.Std;;
```
```ocaml
let rec read_exactly ~sw fd buf =
let size = Luv.Buffer.size buf in
if size > 0 then (
let got = Eio_luv.File.read ~sw fd [buf] |> Eio_luv.or_raise |> Unsigned.Size_t.to_int in
let next = Luv.Buffer.sub buf ~offset:got ~length:(size - got) in
read_exactly ~sw fd next
)
let () =
Luv.Error.set_on_unhandled_exception @@ fun ex ->
Printf.printf "Unhandled luv exception: %s\n%!" (Printexc.to_string ex)
```
# Hello, world
```ocaml
# Eio_luv.run @@ fun env ->
Eio.Flow.copy_string "Hello, world!\n" (Eio.Stdenv.stdout env)
Hello, world!
- : unit = ()
```
# Read a few bytes from /dev/zero
```ocaml
let main _stdenv =
Switch.top @@ fun sw ->
let fd = Eio_luv.File.open_ ~sw "/dev/zero" [] |> Eio_luv.or_raise in
let buf = Luv.Buffer.create 4 in
read_exactly ~sw fd buf;
traceln "Read %S" (Luv.Buffer.to_string buf);
Eio_luv.File.close fd
```
```ocaml
# Eio_luv.run main
+Read "\000\000\000\000"
- : unit = ()
```

View File

@ -1,4 +1,7 @@
(library
(name eio_main)
(public_name eio_main)
(libraries eio_linux))
(libraries eio_luv
(select eio_main.ml from
(eio_linux -> eio_main.linux.ml)
( -> eio_main.default.ml))))

View File

@ -0,0 +1 @@
let run fn = Eio_luv.run (fun env -> fn (env :> Eio.Stdenv.t))

View File

@ -0,0 +1,24 @@
let has_working_uring v =
match String.split_on_char '.' v with
| "5" :: minor :: _ -> int_of_string minor >= 10
| major :: _ -> int_of_string major > 5
| [] -> false
let run_io_uring fn =
Logs.info (fun f -> f "Selecting io-uring backend");
Eio_linux.run (fun env -> fn (env :> Eio.Stdenv.t))
let run_luv fn =
Logs.info (fun f -> f "Selecting luv backend (io-uring needs Linux >= 5.10)");
Eio_luv.run (fun env -> fn (env :> Eio.Stdenv.t))
let run fn =
match Sys.getenv_opt "EIO_BACKEND" with
| Some "io-uring" -> run_io_uring fn
| Some "luv" -> run_luv fn
| None | Some "" ->
begin match Luv.System_info.uname () with
| Ok x when has_working_uring x.release -> run_io_uring fn
| _ -> run_luv fn
end
| Some x -> Fmt.failwith "Unknown eio backend %S (from $EIO_BACKEND)" x

View File

@ -1 +0,0 @@
let run fn = Eio_linux.run (fun env -> fn (env :> Eio.Stdenv.t))

View File

@ -32,19 +32,28 @@ The domain raises an exception:
Exception: Failure "Exception from new domain".
```
We can still run other fibres in the main domain while waiting:
We can still run other fibres in the main domain while waiting.
Here, we use a mutex to check that the parent domain really did run while waiting for the child domain.
```ocaml
# run @@ fun mgr ->
Switch.top @@ fun sw ->
let mutex = Stdlib.Mutex.create () in
Mutex.lock mutex;
Fibre.both ~sw
(fun () ->
traceln "Spawning new domain...";
let response = Eio.Domain_manager.run_compute_unsafe mgr (fun () -> "Hello from new domain") in
let response = Eio.Domain_manager.run_compute_unsafe mgr (fun () ->
Mutex.lock mutex;
Mutex.unlock mutex;
"Hello from new domain"
) in
traceln "Got %S from spawned domain" response
)
(fun () ->
traceln "Other fibres can still run"
traceln "Other fibres can still run";
Mutex.unlock mutex
)
+Spawning new domain...
+Other fibres can still run

View File

@ -181,8 +181,8 @@ Creating directories with nesting, symlinks, etc:
()
+mkdir "subdir" -> ok
+mkdir "to-subdir/nested" -> ok
+mkdir "to-root/tmp/foo" -> Eio.Dir.Permission_denied ("to-root/tmp", _)
+mkdir "../foo" -> Eio.Dir.Permission_denied ("..", _)
+mkdir "to-root/tmp/foo" -> Eio.Dir.Permission_denied ("to-root/tmp/foo", _)
+mkdir "../foo" -> Eio.Dir.Permission_denied ("../foo", _)
+mkdir "to-subdir" -> Eio.Dir.Already_exists ("to-subdir", _)
+mkdir "dangle/foo" -> Eio.Dir.Not_found ("dangle", _)
- : unit = ()
@ -200,7 +200,7 @@ Create a sandbox, write a file with it, then read it from outside:
try_mkdir subdir "../new-sandbox";
traceln "Got %S" @@ read_file ~sw cwd "sandbox/test-file"
+mkdir "sandbox" -> ok
+mkdir "../new-sandbox" -> Eio.Dir.Permission_denied ("..", _)
+mkdir "../new-sandbox" -> Eio.Dir.Permission_denied ("../new-sandbox", _)
+Got "data"
- : unit = ()
```
@ -225,7 +225,7 @@ Using `cwd` we can't access the parent, but using `fs` we can:
Unix.rmdir "outside-cwd"
+mkdir "fs-test" -> ok
+chdir "fs-test"
+mkdir "../outside-cwd" -> Eio.Dir.Permission_denied ("..", _)
+mkdir "../outside-cwd" -> Eio.Dir.Permission_denied ("../outside-cwd", _)
+write "../test-file" -> Eio.Dir.Permission_denied ("../test-file", _)
+mkdir "../outside-cwd" -> ok
+write "../test-file" -> ok

View File

@ -121,6 +121,7 @@ Cancelling the read:
traceln "Connecting to server...";
let flow = Eio.Net.connect ~sw net addr in
traceln "Connection opened - cancelling server's read";
Fibre.yield ();
Switch.turn_off read_switch Graceful_shutdown;
let msg = read_all flow in
traceln "Client received: %S" msg