Move low-level Eio_linux functions to sub-module

The high-level API that was in `Eio_linux.Objects` is now the top-level
API.
This commit is contained in:
Thomas Leonard 2022-02-02 21:05:58 +00:00
parent 33055fca45
commit ecd195da5d
6 changed files with 600 additions and 598 deletions

View File

@ -512,6 +512,7 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res =
| _, Exactly len -> Suspended.continue action len | _, Exactly len -> Suspended.continue action len
| n, Upto _ -> Suspended.continue action n | n, Upto _ -> Suspended.continue action n
module Low_level = struct
let alloc_buf st k = let alloc_buf st k =
Log.debug (fun l -> l "alloc: %d" (Uring.Region.avail st.mem)); Log.debug (fun l -> l "alloc: %d" (Uring.Region.avail st.mem));
match Uring.Region.alloc st.mem with match Uring.Region.alloc st.mem with
@ -702,10 +703,10 @@ let accept ~sw fd =
let client_addr = Uring.Sockaddr.get client_addr in let client_addr = Uring.Sockaddr.get client_addr in
client, client_addr client, client_addr
) )
end
external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd"
module Objects = struct
type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty
type has_fd = < fd : FD.t > type has_fd = < fd : FD.t >
@ -719,12 +720,12 @@ module Objects = struct
(* When copying between a source with an FD and a sink with an FD, we can share the chunk (* When copying between a source with an FD and a sink with an FD, we can share the chunk
and avoid copying. *) and avoid copying. *)
let fast_copy src dst = let fast_copy src dst =
with_chunk @@ fun chunk -> Low_level.with_chunk @@ fun chunk ->
let chunk_size = Uring.Region.length chunk in let chunk_size = Uring.Region.length chunk in
try try
while true do while true do
let got = read_upto src chunk chunk_size in let got = Low_level.read_upto src chunk chunk_size in
write dst chunk got Low_level.write dst chunk got
done done
with End_of_file -> () with End_of_file -> ()
@ -732,7 +733,7 @@ module Objects = struct
let fast_copy_try_splice src dst = let fast_copy_try_splice src dst =
try try
while true do while true do
let _ : int = splice src ~dst ~len:max_int in let _ : int = Low_level.splice src ~dst ~len:max_int in
() ()
done done
with with
@ -744,7 +745,7 @@ module Objects = struct
let copy_with_rsb rsb dst = let copy_with_rsb rsb dst =
try try
while true do while true do
rsb (writev dst) rsb (Low_level.writev dst)
done done
with End_of_file -> () with End_of_file -> ()
@ -752,12 +753,12 @@ module Objects = struct
the source to write into it. This used when the other methods the source to write into it. This used when the other methods
aren't available. *) aren't available. *)
let fallback_copy src dst = let fallback_copy src dst =
with_chunk @@ fun chunk -> Low_level.with_chunk @@ fun chunk ->
let chunk_cs = Uring.Region.to_cstruct chunk in let chunk_cs = Uring.Region.to_cstruct chunk in
try try
while true do while true do
let got = Eio.Flow.read src chunk_cs in let got = Eio.Flow.read src chunk_cs in
write dst chunk got Low_level.write dst chunk got
done done
with End_of_file -> () with End_of_file -> ()
@ -776,9 +777,9 @@ module Objects = struct
if Lazy.force is_tty then ( if Lazy.force is_tty then (
(* Work-around for https://github.com/axboe/liburing/issues/354 (* Work-around for https://github.com/axboe/liburing/issues/354
(should be fixed in Linux 5.14) *) (should be fixed in Linux 5.14) *)
await_readable fd Low_level.await_readable fd
); );
readv fd [buf] Low_level.readv fd [buf]
method read_methods = [] method read_methods = []
@ -814,7 +815,7 @@ module Objects = struct
method accept ~sw = method accept ~sw =
Switch.check sw; Switch.check sw;
let client, client_addr = accept ~sw fd in let client, client_addr = Low_level.accept ~sw fd in
let client_addr = match client_addr with let client_addr = match client_addr with
| Unix.ADDR_UNIX path -> `Unix path | Unix.ADDR_UNIX path -> `Unix path
| Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port) | Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port)
@ -868,7 +869,7 @@ module Objects = struct
in in
let sock_unix = Unix.socket socket_domain socket_type 0 in let sock_unix = Unix.socket socket_domain socket_type 0 in
let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in
connect sock addr; Low_level.connect sock addr;
(flow sock :> <Eio.Flow.two_way; Eio.Flow.close>) (flow sock :> <Eio.Flow.two_way; Eio.Flow.close>)
end end
@ -906,7 +907,7 @@ module Objects = struct
inherit Eio.Time.clock inherit Eio.Time.clock
method now = Unix.gettimeofday () method now = Unix.gettimeofday ()
method sleep_until = sleep_until method sleep_until = Low_level.sleep_until
end end
class dir fd = object class dir fd = object
@ -915,7 +916,7 @@ module Objects = struct
val resolve_flags = Uring.Resolve.beneath val resolve_flags = Uring.Resolve.beneath
method open_in ~sw path = method open_in ~sw path =
let fd = openat2 ~sw ?dir:fd path let fd = Low_level.openat2 ~sw ?dir:fd path
~access:`R ~access:`R
~flags:Uring.Open_flags.cloexec ~flags:Uring.Open_flags.cloexec
~perm:0 ~perm:0
@ -932,7 +933,7 @@ module Objects = struct
| `Exclusive perm -> perm, Uring.Open_flags.(creat + excl) | `Exclusive perm -> perm, Uring.Open_flags.(creat + excl)
in in
let flags = if append then Uring.Open_flags.(flags + append) else flags in let flags = if append then Uring.Open_flags.(flags + append) else flags in
let fd = openat2 ~sw ?dir:fd path let fd = Low_level.openat2 ~sw ?dir:fd path
~access:`RW ~access:`RW
~flags:Uring.Open_flags.(cloexec + flags) ~flags:Uring.Open_flags.(cloexec + flags)
~perm ~perm
@ -941,7 +942,7 @@ module Objects = struct
(flow fd :> <Eio.Dir.rw; Eio.Flow.close>) (flow fd :> <Eio.Dir.rw; Eio.Flow.close>)
method open_dir ~sw path = method open_dir ~sw path =
let fd = openat2 ~sw ~seekable:false ?dir:fd path let fd = Low_level.openat2 ~sw ~seekable:false ?dir:fd path
~access:`R ~access:`R
~flags:Uring.Open_flags.(cloexec + path + directory) ~flags:Uring.Open_flags.(cloexec + path + directory)
~perm:0 ~perm:0
@ -950,7 +951,7 @@ module Objects = struct
(new dir (Some fd) :> <Eio.Dir.t; Eio.Flow.close>) (new dir (Some fd) :> <Eio.Dir.t; Eio.Flow.close>)
method mkdir ~perm path = method mkdir ~perm path =
mkdir_beneath ~perm ?dir:fd path Low_level.mkdir_beneath ~perm ?dir:fd path
method close = method close =
FD.close (Option.get fd) FD.close (Option.get fd)
@ -963,13 +964,13 @@ module Objects = struct
val! resolve_flags = Uring.Resolve.empty val! resolve_flags = Uring.Resolve.empty
method! mkdir ~perm path = method! mkdir ~perm path =
mkdirat ~perm None path Low_level.mkdirat ~perm None path
end end
let secure_random = object let secure_random = object
inherit Eio.Flow.source inherit Eio.Flow.source
method read_methods = [] method read_methods = []
method read_into buf = getrandom buf method read_into buf = Low_level.getrandom buf
end end
let stdenv ~run_event_loop = let stdenv ~run_event_loop =
@ -989,18 +990,17 @@ module Objects = struct
method cwd = (cwd :> Eio.Dir.t) method cwd = (cwd :> Eio.Dir.t)
method secure_random = secure_random method secure_random = secure_random
end end
end
let pipe sw = let pipe sw =
let r, w = Unix.pipe () in let r, w = Unix.pipe () in
let r = Objects.source (FD.of_unix ~sw ~seekable:false ~close_unix:true r) in let r = source (FD.of_unix ~sw ~seekable:false ~close_unix:true r) in
let w = Objects.sink (FD.of_unix ~sw ~seekable:false ~close_unix:true w) in let w = sink (FD.of_unix ~sw ~seekable:false ~close_unix:true w) in
r, w r, w
let monitor_event_fd t = let monitor_event_fd t =
let buf = Cstruct.create 8 in let buf = Cstruct.create 8 in
while true do while true do
let got = readv t.eventfd [buf] in let got = Low_level.readv t.eventfd [buf] in
Log.debug (fun f -> f "Received wakeup on eventfd %a" FD.pp t.eventfd); Log.debug (fun f -> f "Received wakeup on eventfd %a" FD.pp t.eventfd);
assert (got = 8); assert (got = 8);
(* We just go back to sleep now, but this will cause the scheduler to look (* We just go back to sleep now, but this will cause the scheduler to look
@ -1022,7 +1022,7 @@ let with_uring ~fixed_buf_len ~queue_depth ?polling_timeout fn =
let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main = let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main =
Log.debug (fun l -> l "starting run"); Log.debug (fun l -> l "starting run");
let stdenv = Objects.stdenv ~run_event_loop:(run ~queue_depth ~block_size ?polling_timeout) in let stdenv = stdenv ~run_event_loop:(run ~queue_depth ~block_size ?polling_timeout) in
(* TODO unify this allocation API around baregion/uring *) (* TODO unify this allocation API around baregion/uring *)
let fixed_buf_len = block_size * queue_depth in let fixed_buf_len = block_size * queue_depth in
with_uring ~fixed_buf_len ~queue_depth ?polling_timeout @@ fun uring -> with_uring ~fixed_buf_len ~queue_depth ?polling_timeout @@ fun uring ->
@ -1059,7 +1059,7 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main =
fn st k; fn st k;
schedule st schedule st
) )
| ERead args -> Some (fun k -> | Low_level.ERead args -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
enqueue_read st k args; enqueue_read st k args;
schedule st) schedule st)
@ -1068,12 +1068,12 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main =
enqueue_close st k fd; enqueue_close st k fd;
schedule st schedule st
) )
| EWrite args -> Some (fun k -> | Low_level.EWrite args -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
enqueue_write st k args; enqueue_write st k args;
schedule st schedule st
) )
| Sleep_until time -> Some (fun k -> | Low_level.Sleep_until time -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
match Fibre_context.get_error fibre with match Fibre_context.get_error fibre with
| Some ex -> Suspended.discontinue k ex | Some ex -> Suspended.discontinue k ex
@ -1128,12 +1128,12 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main =
); );
schedule st schedule st
) )
| Alloc -> Some (fun k -> | Low_level.Alloc -> Some (fun k ->
let k = { Suspended.k; fibre } in let k = { Suspended.k; fibre } in
alloc_buf st k Low_level.alloc_buf st k
) )
| Free buf -> Some (fun k -> | Low_level.Free buf -> Some (fun k ->
free_buf st buf; Low_level.free_buf st buf;
continue k () continue k ()
) )
| _ -> None | _ -> None

View File

@ -46,6 +46,39 @@ module FD : sig
@raise Invalid_arg if [t] is closed. *) @raise Invalid_arg if [t] is closed. *)
end end
(** {1 Eio API} *)
type has_fd = < fd : FD.t >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd >
type stdenv = <
stdin : source;
stdout : sink;
stderr : sink;
net : Eio.Net.t;
domain_mgr : Eio.Domain_manager.t;
clock : Eio.Time.clock;
fs : Eio.Dir.t;
cwd : Eio.Dir.t;
secure_random : Eio.Flow.source;
>
val get_fd : <has_fd; ..> -> FD.t
val get_fd_opt : #Eio.Generic.t -> FD.t option
val pipe : Switch.t -> source * sink
(** [pipe sw] is a source-sink pair [(r, w)], where data written to [w] can be read from [r].
It is implemented as a Unix pipe. *)
(** {1 Main Loop} *)
val run : ?queue_depth:int -> ?block_size:int -> ?polling_timeout:int -> (stdenv -> unit) -> unit
(** Run an event loop using io_uring.
For portable code, you should use {!Eio_main.run} instead, which will use this automatically
if running on Linux with a recent-enough kernel version. *)
module Low_level : sig
val noop : unit -> unit val noop : unit -> unit
(** [noop ()] performs a uring noop. This is only useful for benchmarking. *) (** [noop ()] performs a uring noop. This is only useful for benchmarking. *)
@ -144,35 +177,4 @@ val getrandom : Cstruct.t -> int
It uses Linux's [getrandom] call, which is like reading from /dev/urandom It uses Linux's [getrandom] call, which is like reading from /dev/urandom
except that it will block (the whole domain) if used at early boot except that it will block (the whole domain) if used at early boot
when the random system hasn't been initialised yet. *) when the random system hasn't been initialised yet. *)
(** {1 Eio API} *)
module Objects : sig
type has_fd = < fd : FD.t >
type source = < Eio.Flow.source; Eio.Flow.close; has_fd >
type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd >
type stdenv = <
stdin : source;
stdout : sink;
stderr : sink;
net : Eio.Net.t;
domain_mgr : Eio.Domain_manager.t;
clock : Eio.Time.clock;
fs : Eio.Dir.t;
cwd : Eio.Dir.t;
secure_random : Eio.Flow.source;
>
val get_fd : <has_fd; ..> -> FD.t
val get_fd_opt : #Eio.Generic.t -> FD.t option
end end
val pipe : Switch.t -> Objects.source * Objects.sink
(** [pipe sw] is a source-sink pair [(r, w)], where data written to [w] can be read from [r].
It is implemented as a Unix pipe. *)
(** {1 Main Loop} *)
val run : ?queue_depth:int -> ?block_size:int -> ?polling_timeout:int -> (Objects.stdenv -> unit) -> unit
(** FIXME queue_depth and block_size should be in a handler and not the mainloop *)

View File

@ -1,6 +1,6 @@
(* basic tests using effects *) (* basic tests using effects *)
open Eio_linux open Eio_linux.Low_level
open Eio.Std open Eio.Std
module Int63 = Optint.Int63 module Int63 = Optint.Int63
@ -11,7 +11,7 @@ let setup_log level =
let () = let () =
setup_log (Some Logs.Debug); setup_log (Some Logs.Debug);
run @@ fun _stdenv -> Eio_linux.run @@ fun _stdenv ->
Switch.run @@ fun sw -> Switch.run @@ fun sw ->
let fd = Unix.handle_unix_error (openfile ~sw "test.txt" Unix.[O_RDONLY]) 0 in let fd = Unix.handle_unix_error (openfile ~sw "test.txt" Unix.[O_RDONLY]) 0 in
let buf = alloc () in let buf = alloc () in

View File

@ -13,7 +13,7 @@ let main ~clock =
for _ = 1 to n_fibres do for _ = 1 to n_fibres do
Fibre.fork ~sw (fun () -> Fibre.fork ~sw (fun () ->
for _ = 1 to n_iters do for _ = 1 to n_iters do
Eio_linux.noop () Eio_linux.Low_level.noop ()
done done
) )
done done

View File

@ -2,7 +2,7 @@
open Eio.Std open Eio.Std
module U = Eio_linux module U = Eio_linux.Low_level
module Int63 = Optint.Int63 module Int63 = Optint.Int63
let read_then_write_chunk infd outfd file_offset len = let read_then_write_chunk infd outfd file_offset len =
@ -26,12 +26,12 @@ let copy_file infd outfd insize block_size =
copy_block Int63.zero copy_block Int63.zero
let run_cp block_size queue_depth infile outfile () = let run_cp block_size queue_depth infile outfile () =
U.run ~queue_depth ~block_size @@ fun _stdenv -> Eio_linux.run ~queue_depth ~block_size @@ fun _stdenv ->
Switch.run @@ fun sw -> Switch.run @@ fun sw ->
let open Unix in let open Unix in
let infd = Eio_linux.openfile ~sw infile [O_RDONLY] 0 in let infd = U.openfile ~sw infile [O_RDONLY] 0 in
let outfd = Eio_linux.openfile ~sw outfile [O_WRONLY; O_CREAT; O_TRUNC] 0o644 in let outfd = U.openfile ~sw outfile [O_WRONLY; O_CREAT; O_TRUNC] 0o644 in
let insize = Eio_linux.fstat infd |> fun {st_size; _} -> Int63.of_int st_size in let insize = U.fstat infd |> fun {st_size; _} -> Int63.of_int st_size in
Logs.debug (fun l -> l "eurcp: %s -> %s size %a queue %d bs %d" Logs.debug (fun l -> l "eurcp: %s -> %s size %a queue %d bs %d"
infile infile
outfile outfile

View File

@ -7,8 +7,8 @@ let () =
let read_one_byte ~sw r = let read_one_byte ~sw r =
Fibre.fork_promise ~sw (fun () -> Fibre.fork_promise ~sw (fun () ->
let r = Option.get (Eio_linux.Objects.get_fd_opt r) in let r = Option.get (Eio_linux.get_fd_opt r) in
Eio_linux.await_readable r; Eio_linux.Low_level.await_readable r;
let b = Bytes.create 1 in let b = Bytes.create 1 in
let got = Unix.read (Eio_linux.FD.to_unix `Peek r) b 0 1 in let got = Unix.read (Eio_linux.FD.to_unix `Peek r) b 0 1 in
assert (got = 1); assert (got = 1);
@ -21,8 +21,8 @@ let test_poll_add () =
let r, w = Eio_linux.pipe sw in let r, w = Eio_linux.pipe sw in
let thread = read_one_byte ~sw r in let thread = read_one_byte ~sw r in
Fibre.yield (); Fibre.yield ();
let w = Option.get (Eio_linux.Objects.get_fd_opt w) in let w = Option.get (Eio_linux.get_fd_opt w) in
Eio_linux.await_writable w; Eio_linux.Low_level.await_writable w;
let sent = Unix.write (Eio_linux.FD.to_unix `Peek w) (Bytes.of_string "!") 0 1 in let sent = Unix.write (Eio_linux.FD.to_unix `Peek w) (Bytes.of_string "!") 0 1 in
assert (sent = 1); assert (sent = 1);
let result = Promise.await thread in let result = Promise.await thread in
@ -35,7 +35,7 @@ let test_poll_add_busy () =
let a = read_one_byte ~sw r in let a = read_one_byte ~sw r in
let b = read_one_byte ~sw r in let b = read_one_byte ~sw r in
Fibre.yield (); Fibre.yield ();
let w = Option.get (Eio_linux.Objects.get_fd_opt w) |> Eio_linux.FD.to_unix `Peek in let w = Option.get (Eio_linux.get_fd_opt w) |> Eio_linux.FD.to_unix `Peek in
let sent = Unix.write w (Bytes.of_string "!!") 0 2 in let sent = Unix.write w (Bytes.of_string "!!") 0 2 in
assert (sent = 2); assert (sent = 2);
let a = Promise.await a in let a = Promise.await a in
@ -84,20 +84,20 @@ let test_iovec () =
Eio_linux.run ~queue_depth:4 @@ fun _stdenv -> Eio_linux.run ~queue_depth:4 @@ fun _stdenv ->
Switch.run @@ fun sw -> Switch.run @@ fun sw ->
let from_pipe, to_pipe = Eio_linux.pipe sw in let from_pipe, to_pipe = Eio_linux.pipe sw in
let from_pipe = Eio_linux.Objects.get_fd from_pipe in let from_pipe = Eio_linux.get_fd from_pipe in
let to_pipe = Eio_linux.Objects.get_fd to_pipe in let to_pipe = Eio_linux.get_fd to_pipe in
let message = Cstruct.of_string "Got [ ] and [ ]" in let message = Cstruct.of_string "Got [ ] and [ ]" in
let rec recv = function let rec recv = function
| [] -> () | [] -> ()
| cs -> | cs ->
let got = Eio_linux.readv from_pipe cs in let got = Eio_linux.Low_level.readv from_pipe cs in
recv (Cstruct.shiftv cs got) recv (Cstruct.shiftv cs got)
in in
Fibre.both Fibre.both
(fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3]) (fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3])
(fun () -> (fun () ->
let b = Cstruct.of_string "barfoo" in let b = Cstruct.of_string "barfoo" in
Eio_linux.writev to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; Eio_linux.Low_level.writev to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3];
Eio_linux.FD.close to_pipe Eio_linux.FD.close to_pipe
); );
Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message) Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message)