From ecd195da5d5f03e1f5c0f44c86954c3a1da6ca1d Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 2 Feb 2022 21:05:58 +0000 Subject: [PATCH] Move low-level Eio_linux functions to sub-module The high-level API that was in `Eio_linux.Objects` is now the top-level API. --- lib_eio_linux/eio_linux.ml | 922 ++++++++++++------------- lib_eio_linux/eio_linux.mli | 242 +++---- lib_eio_linux/tests/basic_eio_linux.ml | 4 +- lib_eio_linux/tests/bench_noop.ml | 2 +- lib_eio_linux/tests/eurcp_lib.ml | 10 +- lib_eio_linux/tests/test.ml | 18 +- 6 files changed, 600 insertions(+), 598 deletions(-) diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 76f34ac..8aee38d 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -512,495 +512,495 @@ and complete_rw_req st ({len; cur_off; action; _} as req) res = | _, Exactly len -> Suspended.continue action len | n, Upto _ -> Suspended.continue action n -let alloc_buf st k = - Log.debug (fun l -> l "alloc: %d" (Uring.Region.avail st.mem)); - match Uring.Region.alloc st.mem with - | buf -> Suspended.continue k buf - | exception Uring.Region.No_space -> - Queue.push k st.mem_q; - schedule st +module Low_level = struct + let alloc_buf st k = + Log.debug (fun l -> l "alloc: %d" (Uring.Region.avail st.mem)); + match Uring.Region.alloc st.mem with + | buf -> Suspended.continue k buf + | exception Uring.Region.No_space -> + Queue.push k st.mem_q; + schedule st -let free_buf st buf = - match Queue.take_opt st.mem_q with - | None -> Uring.Region.free buf - | Some k -> enqueue_thread st k buf + let free_buf st buf = + match Queue.take_opt st.mem_q with + | None -> Uring.Region.free buf + | Some k -> enqueue_thread st k buf -let noop () = - let result = enter enqueue_noop in - Log.debug (fun l -> l "noop returned"); - if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", "")) + let noop () = + let result = enter enqueue_noop in + Log.debug (fun l -> l "noop returned"); + if result <> 0 then raise (Unix.Unix_error (Uring.error_of_errno result, "noop", "")) -type _ eff += Sleep_until : float -> unit eff -let sleep_until d = - perform (Sleep_until d) + type _ eff += Sleep_until : float -> unit eff + let sleep_until d = + perform (Sleep_until d) -type _ eff += ERead : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff + type _ eff += ERead : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff -let read_exactly ?file_offset fd buf len = - let res = perform (ERead (file_offset, fd, buf, Exactly len)) in - Log.debug (fun l -> l "read_exactly: woken up after read"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "read_exactly", "")) - ) - -let read_upto ?file_offset fd buf len = - let res = perform (ERead (file_offset, fd, buf, Upto len)) in - Log.debug (fun l -> l "read_upto: woken up after read"); - if res < 0 then ( - let err = Uring.error_of_errno res in - let ex = Unix.Unix_error (err, "read_upto", "") in - if err = Unix.ECONNRESET then raise (Eio.Net.Connection_reset ex) - else raise ex - ) else ( - res - ) - -let readv ?file_offset fd bufs = - let res = enter (enqueue_readv (file_offset, fd, bufs)) in - Log.debug (fun l -> l "readv: woken up after read"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "readv", "")) - ) else if res = 0 then ( - raise End_of_file - ) else ( - res - ) - -let rec writev ?file_offset fd bufs = - let res = enter (enqueue_writev (file_offset, fd, bufs)) in - Log.debug (fun l -> l "writev: woken up after write"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) - ) else ( - match Cstruct.shiftv bufs res with - | [] -> () - | bufs -> - let file_offset = - let module I63 = Optint.Int63 in - match file_offset with - | None -> None - | Some ofs when ofs = I63.minus_one -> Some I63.minus_one - | Some ofs -> Some (I63.add ofs (I63.of_int res)) - in - writev ?file_offset fd bufs - ) - -let await_readable fd = - let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in - Log.debug (fun l -> l "await_readable: woken up"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "await_readable", "")) - ) - -let await_writable fd = - let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in - Log.debug (fun l -> l "await_writable: woken up"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "await_writable", "")) - ) - -type _ eff += EWrite : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff - -let write ?file_offset fd buf len = - let res = perform (EWrite (file_offset, fd, buf, Exactly len)) in - Log.debug (fun l -> l "write: woken up after write"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "write", "")) - ) - -type _ eff += Alloc : Uring.Region.chunk eff -let alloc () = perform Alloc - -type _ eff += Free : Uring.Region.chunk -> unit eff -let free buf = perform (Free buf) - -let splice src ~dst ~len = - let res = enter (enqueue_splice ~src ~dst ~len) in - Log.debug (fun l -> l "splice returned"); - if res > 0 then res - else if res = 0 then raise End_of_file - else raise (Unix.Unix_error (Uring.error_of_errno res, "splice", "")) - -let connect fd addr = - let res = enter (enqueue_connect fd addr) in - Log.debug (fun l -> l "connect returned"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "connect", "")) - ) - -let with_chunk fn = - let chunk = alloc () in - Fun.protect ~finally:(fun () -> free chunk) @@ fun () -> - fn chunk - -let openfile ~sw path flags mode = - let fd = Unix.openfile path flags mode in - FD.of_unix ~sw ~seekable:(FD.is_seekable fd) ~close_unix:true fd - -let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path = - wrap_errors path @@ fun () -> - let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in - Log.debug (fun l -> l "openat2 returned"); - if res < 0 then ( - Switch.check sw; (* If cancelled, report that instead. *) - raise @@ Unix.Unix_error (Uring.error_of_errno res, "openat2", "") - ); - let fd : Unix.file_descr = Obj.magic res in - let seekable = - match seekable with - | None -> FD.is_seekable fd - | Some x -> x - in - FD.of_unix ~sw ~seekable ~close_unix:true fd - -let fstat fd = - Unix.fstat (FD.get "fstat" fd) - -external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat" - -external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_getrandom" - -let getrandom { Cstruct.buffer; off; len } = - eio_getrandom buffer off len - -(* We ignore [sw] because this isn't a uring operation yet. *) -let mkdirat ~perm dir path = - wrap_errors path @@ fun () -> - match dir with - | None -> Unix.mkdir path perm - | Some dir -> eio_mkdirat (FD.get "mkdirat" dir) path perm - -let mkdir_beneath ~perm ?dir path = - let dir_path = Filename.dirname path in - let leaf = Filename.basename path in - (* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *) - Switch.run (fun sw -> - let parent = - wrap_errors path @@ fun () -> - openat2 ~sw ~seekable:false ?dir dir_path - ~access:`R - ~flags:Uring.Open_flags.(cloexec + path + directory) - ~perm:0 - ~resolve:Uring.Resolve.beneath - in - mkdirat ~perm (Some parent) leaf + let read_exactly ?file_offset fd buf len = + let res = perform (ERead (file_offset, fd, buf, Exactly len)) in + Log.debug (fun l -> l "read_exactly: woken up after read"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "read_exactly", "")) ) -let shutdown socket command = - Unix.shutdown (FD.get "shutdown" socket) command + let read_upto ?file_offset fd buf len = + let res = perform (ERead (file_offset, fd, buf, Upto len)) in + Log.debug (fun l -> l "read_upto: woken up after read"); + if res < 0 then ( + let err = Uring.error_of_errno res in + let ex = Unix.Unix_error (err, "read_upto", "") in + if err = Unix.ECONNRESET then raise (Eio.Net.Connection_reset ex) + else raise ex + ) else ( + res + ) -let accept ~sw fd = - Ctf.label "accept"; - let client_addr = Uring.Sockaddr.create () in - let res = enter (enqueue_accept fd client_addr) in - Log.debug (fun l -> l "accept returned"); - if res < 0 then ( - raise (Unix.Unix_error (Uring.error_of_errno res, "accept", "")) - ) else ( - let unix : Unix.file_descr = Obj.magic res in - let client = FD.of_unix ~sw ~seekable:false ~close_unix:true unix in - let client_addr = Uring.Sockaddr.get client_addr in - client, client_addr - ) + let readv ?file_offset fd bufs = + let res = enter (enqueue_readv (file_offset, fd, bufs)) in + Log.debug (fun l -> l "readv: woken up after read"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "readv", "")) + ) else if res = 0 then ( + raise End_of_file + ) else ( + res + ) + + let rec writev ?file_offset fd bufs = + let res = enter (enqueue_writev (file_offset, fd, bufs)) in + Log.debug (fun l -> l "writev: woken up after write"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) + ) else ( + match Cstruct.shiftv bufs res with + | [] -> () + | bufs -> + let file_offset = + let module I63 = Optint.Int63 in + match file_offset with + | None -> None + | Some ofs when ofs = I63.minus_one -> Some I63.minus_one + | Some ofs -> Some (I63.add ofs (I63.of_int res)) + in + writev ?file_offset fd bufs + ) + + let await_readable fd = + let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in + Log.debug (fun l -> l "await_readable: woken up"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "await_readable", "")) + ) + + let await_writable fd = + let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollout + pollerr))) in + Log.debug (fun l -> l "await_writable: woken up"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "await_writable", "")) + ) + + type _ eff += EWrite : (Optint.Int63.t option * FD.t * Uring.Region.chunk * amount) -> int eff + + let write ?file_offset fd buf len = + let res = perform (EWrite (file_offset, fd, buf, Exactly len)) in + Log.debug (fun l -> l "write: woken up after write"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "write", "")) + ) + + type _ eff += Alloc : Uring.Region.chunk eff + let alloc () = perform Alloc + + type _ eff += Free : Uring.Region.chunk -> unit eff + let free buf = perform (Free buf) + + let splice src ~dst ~len = + let res = enter (enqueue_splice ~src ~dst ~len) in + Log.debug (fun l -> l "splice returned"); + if res > 0 then res + else if res = 0 then raise End_of_file + else raise (Unix.Unix_error (Uring.error_of_errno res, "splice", "")) + + let connect fd addr = + let res = enter (enqueue_connect fd addr) in + Log.debug (fun l -> l "connect returned"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "connect", "")) + ) + + let with_chunk fn = + let chunk = alloc () in + Fun.protect ~finally:(fun () -> free chunk) @@ fun () -> + fn chunk + + let openfile ~sw path flags mode = + let fd = Unix.openfile path flags mode in + FD.of_unix ~sw ~seekable:(FD.is_seekable fd) ~close_unix:true fd + + let openat2 ~sw ?seekable ~access ~flags ~perm ~resolve ?dir path = + wrap_errors path @@ fun () -> + let res = enter (enqueue_openat2 (access, flags, perm, resolve, dir, path)) in + Log.debug (fun l -> l "openat2 returned"); + if res < 0 then ( + Switch.check sw; (* If cancelled, report that instead. *) + raise @@ Unix.Unix_error (Uring.error_of_errno res, "openat2", "") + ); + let fd : Unix.file_descr = Obj.magic res in + let seekable = + match seekable with + | None -> FD.is_seekable fd + | Some x -> x + in + FD.of_unix ~sw ~seekable ~close_unix:true fd + + let fstat fd = + Unix.fstat (FD.get "fstat" fd) + + external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat" + + external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_getrandom" + + let getrandom { Cstruct.buffer; off; len } = + eio_getrandom buffer off len + + (* We ignore [sw] because this isn't a uring operation yet. *) + let mkdirat ~perm dir path = + wrap_errors path @@ fun () -> + match dir with + | None -> Unix.mkdir path perm + | Some dir -> eio_mkdirat (FD.get "mkdirat" dir) path perm + + let mkdir_beneath ~perm ?dir path = + let dir_path = Filename.dirname path in + let leaf = Filename.basename path in + (* [mkdir] is really an operation on [path]'s parent. Get a reference to that first: *) + Switch.run (fun sw -> + let parent = + wrap_errors path @@ fun () -> + openat2 ~sw ~seekable:false ?dir dir_path + ~access:`R + ~flags:Uring.Open_flags.(cloexec + path + directory) + ~perm:0 + ~resolve:Uring.Resolve.beneath + in + mkdirat ~perm (Some parent) leaf + ) + + let shutdown socket command = + Unix.shutdown (FD.get "shutdown" socket) command + + let accept ~sw fd = + Ctf.label "accept"; + let client_addr = Uring.Sockaddr.create () in + let res = enter (enqueue_accept fd client_addr) in + Log.debug (fun l -> l "accept returned"); + if res < 0 then ( + raise (Unix.Unix_error (Uring.error_of_errno res, "accept", "")) + ) else ( + let unix : Unix.file_descr = Obj.magic res in + let client = FD.of_unix ~sw ~seekable:false ~close_unix:true unix in + let client_addr = Uring.Sockaddr.get client_addr in + client, client_addr + ) +end external eio_eventfd : int -> Unix.file_descr = "caml_eio_eventfd" -module Objects = struct - type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty +type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty - type has_fd = < fd : FD.t > - type source = < Eio.Flow.source; Eio.Flow.close; has_fd > - type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > +type has_fd = < fd : FD.t > +type source = < Eio.Flow.source; Eio.Flow.close; has_fd > +type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > - let get_fd (t : ) = t#fd +let get_fd (t : ) = t#fd - let get_fd_opt t = Eio.Generic.probe t FD +let get_fd_opt t = Eio.Generic.probe t FD - (* When copying between a source with an FD and a sink with an FD, we can share the chunk - and avoid copying. *) - let fast_copy src dst = - with_chunk @@ fun chunk -> - let chunk_size = Uring.Region.length chunk in - try - while true do - let got = read_upto src chunk chunk_size in - write dst chunk got - done - with End_of_file -> () +(* When copying between a source with an FD and a sink with an FD, we can share the chunk + and avoid copying. *) +let fast_copy src dst = + Low_level.with_chunk @@ fun chunk -> + let chunk_size = Uring.Region.length chunk in + try + while true do + let got = Low_level.read_upto src chunk chunk_size in + Low_level.write dst chunk got + done + with End_of_file -> () - (* Try a fast copy using splice. If the FDs don't support that, switch to copying. *) - let fast_copy_try_splice src dst = - try - while true do - let _ : int = splice src ~dst ~len:max_int in - () - done - with - | End_of_file -> () - | Unix.Unix_error (Unix.EINVAL, "splice", _) -> fast_copy src dst +(* Try a fast copy using splice. If the FDs don't support that, switch to copying. *) +let fast_copy_try_splice src dst = + try + while true do + let _ : int = Low_level.splice src ~dst ~len:max_int in + () + done + with + | End_of_file -> () + | Unix.Unix_error (Unix.EINVAL, "splice", _) -> fast_copy src dst - (* Copy using the [Read_source_buffer] optimisation. - Avoids a copy if the source already has the data. *) - let copy_with_rsb rsb dst = - try - while true do - rsb (writev dst) - done - with End_of_file -> () +(* Copy using the [Read_source_buffer] optimisation. + Avoids a copy if the source already has the data. *) +let copy_with_rsb rsb dst = + try + while true do + rsb (Low_level.writev dst) + done + with End_of_file -> () - (* Copy by allocating a chunk from the pre-shared buffer and asking - the source to write into it. This used when the other methods - aren't available. *) - let fallback_copy src dst = - with_chunk @@ fun chunk -> - let chunk_cs = Uring.Region.to_cstruct chunk in - try - while true do - let got = Eio.Flow.read src chunk_cs in - write dst chunk got - done - with End_of_file -> () +(* Copy by allocating a chunk from the pre-shared buffer and asking + the source to write into it. This used when the other methods + aren't available. *) +let fallback_copy src dst = + Low_level.with_chunk @@ fun chunk -> + let chunk_cs = Uring.Region.to_cstruct chunk in + try + while true do + let got = Eio.Flow.read src chunk_cs in + Low_level.write dst chunk got + done + with End_of_file -> () - let flow fd = - let is_tty = lazy (Unix.isatty (FD.get "isatty" fd)) in - object (_ : ) - method fd = fd - method close = FD.close fd +let flow fd = + let is_tty = lazy (Unix.isatty (FD.get "isatty" fd)) in + object (_ : ) + method fd = fd + method close = FD.close fd - method probe : type a. a Eio.Generic.ty -> a option = function - | FD -> Some fd - | Eio_unix.Unix_file_descr op -> Some (FD.to_unix op fd) - | _ -> None - - method read_into buf = - if Lazy.force is_tty then ( - (* Work-around for https://github.com/axboe/liburing/issues/354 - (should be fixed in Linux 5.14) *) - await_readable fd - ); - readv fd [buf] - - method read_methods = [] - - method write src = - match get_fd_opt src with - | Some src -> fast_copy_try_splice src fd - | None -> - let rec aux = function - | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb rsb fd - | _ :: xs -> aux xs - | [] -> fallback_copy src fd - in - aux (Eio.Flow.read_methods src) - - method shutdown cmd = - Unix.shutdown (FD.get "shutdown" fd) @@ match cmd with - | `Receive -> Unix.SHUTDOWN_RECEIVE - | `Send -> Unix.SHUTDOWN_SEND - | `All -> Unix.SHUTDOWN_ALL - end - - let source fd = (flow fd :> source) - let sink fd = (flow fd :> sink) - - let listening_socket fd = object - inherit Eio.Net.listening_socket - - method! probe : type a. a Eio.Generic.ty -> a option = function + method probe : type a. a Eio.Generic.ty -> a option = function + | FD -> Some fd | Eio_unix.Unix_file_descr op -> Some (FD.to_unix op fd) | _ -> None - method close = FD.close fd + method read_into buf = + if Lazy.force is_tty then ( + (* Work-around for https://github.com/axboe/liburing/issues/354 + (should be fixed in Linux 5.14) *) + Low_level.await_readable fd + ); + Low_level.readv fd [buf] - method accept ~sw = - Switch.check sw; - let client, client_addr = accept ~sw fd in - let client_addr = match client_addr with - | Unix.ADDR_UNIX path -> `Unix path - | Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port) - in - let flow = (flow client :> ) in - flow, client_addr - end - - let net = object - inherit Eio.Net.t - - method listen ~reuse_addr ~reuse_port ~backlog ~sw listen_addr = - let socket_domain, socket_type, addr = - match listen_addr with - | `Unix path -> - if reuse_addr then ( - match Unix.lstat path with - | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path - | _ -> () - | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () - ); - Unix.PF_UNIX, Unix.SOCK_STREAM, Unix.ADDR_UNIX path - | `Tcp (host, port) -> - let host = Eio_unix.Ipaddr.to_unix host in - Unix.PF_INET, Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) - in - let sock_unix = Unix.socket socket_domain socket_type 0 in - (* For Unix domain sockets, remove the path when done (except for abstract sockets). *) - begin match listen_addr with - | `Unix path -> - if String.length path > 0 && path.[0] <> Char.chr 0 then - Switch.on_release sw (fun () -> Unix.unlink path) - | `Tcp _ -> () - end; - if reuse_addr then - Unix.setsockopt sock_unix Unix.SO_REUSEADDR true; - if reuse_port then - Unix.setsockopt sock_unix Unix.SO_REUSEPORT true; - let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in - Unix.bind sock_unix addr; - Unix.listen sock_unix backlog; - listening_socket sock - - method connect ~sw addr = - let socket_domain, socket_type, addr = - match addr with - | `Unix path -> Unix.PF_UNIX, Unix.SOCK_STREAM, Unix.ADDR_UNIX path - | `Tcp (host, port) -> - let host = Eio_unix.Ipaddr.to_unix host in - Unix.PF_INET, Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) - in - let sock_unix = Unix.socket socket_domain socket_type 0 in - let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in - connect sock addr; - (flow sock :> ) - end - - type stdenv = < - stdin : source; - stdout : sink; - stderr : sink; - net : Eio.Net.t; - domain_mgr : Eio.Domain_manager.t; - clock : Eio.Time.clock; - fs : Eio.Dir.t; - cwd : Eio.Dir.t; - secure_random : Eio.Flow.source; - > - - let domain_mgr ~run_event_loop = object (self) - inherit Eio.Domain_manager.t - - method run_raw fn = - let domain = ref None in - enter (fun t k -> - domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue_thread t k ()))) - ); - Domain.join (Option.get !domain) - - method run fn = - self#run_raw (fun () -> - let result = ref None in - run_event_loop (fun _ -> result := Some (fn ())); - Option.get !result - ) - end - - let clock = object - inherit Eio.Time.clock - - method now = Unix.gettimeofday () - method sleep_until = sleep_until - end - - class dir fd = object - inherit Eio.Dir.t - - val resolve_flags = Uring.Resolve.beneath - - method open_in ~sw path = - let fd = openat2 ~sw ?dir:fd path - ~access:`R - ~flags:Uring.Open_flags.cloexec - ~perm:0 - ~resolve:resolve_flags - in - (flow fd :> ) - - method open_out ~sw ~append ~create path = - let perm, flags = - match create with - | `Never -> 0, Uring.Open_flags.empty - | `If_missing perm -> perm, Uring.Open_flags.creat - | `Or_truncate perm -> perm, Uring.Open_flags.(creat + trunc) - | `Exclusive perm -> perm, Uring.Open_flags.(creat + excl) - in - let flags = if append then Uring.Open_flags.(flags + append) else flags in - let fd = openat2 ~sw ?dir:fd path - ~access:`RW - ~flags:Uring.Open_flags.(cloexec + flags) - ~perm - ~resolve:resolve_flags - in - (flow fd :> ) - - method open_dir ~sw path = - let fd = openat2 ~sw ~seekable:false ?dir:fd path - ~access:`R - ~flags:Uring.Open_flags.(cloexec + path + directory) - ~perm:0 - ~resolve:resolve_flags - in - (new dir (Some fd) :> ) - - method mkdir ~perm path = - mkdir_beneath ~perm ?dir:fd path - - method close = - FD.close (Option.get fd) - end - - (* Full access to the filesystem. *) - let fs = object - inherit dir None - - val! resolve_flags = Uring.Resolve.empty - - method! mkdir ~perm path = - mkdirat ~perm None path - end - - let secure_random = object - inherit Eio.Flow.source method read_methods = [] - method read_into buf = getrandom buf + + method write src = + match get_fd_opt src with + | Some src -> fast_copy_try_splice src fd + | None -> + let rec aux = function + | Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb rsb fd + | _ :: xs -> aux xs + | [] -> fallback_copy src fd + in + aux (Eio.Flow.read_methods src) + + method shutdown cmd = + Unix.shutdown (FD.get "shutdown" fd) @@ match cmd with + | `Receive -> Unix.SHUTDOWN_RECEIVE + | `Send -> Unix.SHUTDOWN_SEND + | `All -> Unix.SHUTDOWN_ALL end - let stdenv ~run_event_loop = - let of_unix fd = FD.of_unix_no_hook ~seekable:(FD.is_seekable fd) ~close_unix:true fd in - let stdin = lazy (source (of_unix Unix.stdin)) in - let stdout = lazy (sink (of_unix Unix.stdout)) in - let stderr = lazy (sink (of_unix Unix.stderr)) in - let cwd = new dir None in - object (_ : stdenv) - method stdin = Lazy.force stdin - method stdout = Lazy.force stdout - method stderr = Lazy.force stderr - method net = net - method domain_mgr = domain_mgr ~run_event_loop - method clock = clock - method fs = (fs :> Eio.Dir.t) - method cwd = (cwd :> Eio.Dir.t) - method secure_random = secure_random - end +let source fd = (flow fd :> source) +let sink fd = (flow fd :> sink) + +let listening_socket fd = object + inherit Eio.Net.listening_socket + + method! probe : type a. a Eio.Generic.ty -> a option = function + | Eio_unix.Unix_file_descr op -> Some (FD.to_unix op fd) + | _ -> None + + method close = FD.close fd + + method accept ~sw = + Switch.check sw; + let client, client_addr = Low_level.accept ~sw fd in + let client_addr = match client_addr with + | Unix.ADDR_UNIX path -> `Unix path + | Unix.ADDR_INET (host, port) -> `Tcp (Eio_unix.Ipaddr.of_unix host, port) + in + let flow = (flow client :> ) in + flow, client_addr end +let net = object + inherit Eio.Net.t + + method listen ~reuse_addr ~reuse_port ~backlog ~sw listen_addr = + let socket_domain, socket_type, addr = + match listen_addr with + | `Unix path -> + if reuse_addr then ( + match Unix.lstat path with + | Unix.{ st_kind = S_SOCK; _ } -> Unix.unlink path + | _ -> () + | exception Unix.Unix_error (Unix.ENOENT, _, _) -> () + ); + Unix.PF_UNIX, Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.PF_INET, Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock_unix = Unix.socket socket_domain socket_type 0 in + (* For Unix domain sockets, remove the path when done (except for abstract sockets). *) + begin match listen_addr with + | `Unix path -> + if String.length path > 0 && path.[0] <> Char.chr 0 then + Switch.on_release sw (fun () -> Unix.unlink path) + | `Tcp _ -> () + end; + if reuse_addr then + Unix.setsockopt sock_unix Unix.SO_REUSEADDR true; + if reuse_port then + Unix.setsockopt sock_unix Unix.SO_REUSEPORT true; + let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in + Unix.bind sock_unix addr; + Unix.listen sock_unix backlog; + listening_socket sock + + method connect ~sw addr = + let socket_domain, socket_type, addr = + match addr with + | `Unix path -> Unix.PF_UNIX, Unix.SOCK_STREAM, Unix.ADDR_UNIX path + | `Tcp (host, port) -> + let host = Eio_unix.Ipaddr.to_unix host in + Unix.PF_INET, Unix.SOCK_STREAM, Unix.ADDR_INET (host, port) + in + let sock_unix = Unix.socket socket_domain socket_type 0 in + let sock = FD.of_unix ~sw ~seekable:false ~close_unix:true sock_unix in + Low_level.connect sock addr; + (flow sock :> ) +end + +type stdenv = < + stdin : source; + stdout : sink; + stderr : sink; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + fs : Eio.Dir.t; + cwd : Eio.Dir.t; + secure_random : Eio.Flow.source; +> + +let domain_mgr ~run_event_loop = object (self) + inherit Eio.Domain_manager.t + + method run_raw fn = + let domain = ref None in + enter (fun t k -> + domain := Some (Domain.spawn (fun () -> Fun.protect fn ~finally:(fun () -> enqueue_thread t k ()))) + ); + Domain.join (Option.get !domain) + + method run fn = + self#run_raw (fun () -> + let result = ref None in + run_event_loop (fun _ -> result := Some (fn ())); + Option.get !result + ) +end + +let clock = object + inherit Eio.Time.clock + + method now = Unix.gettimeofday () + method sleep_until = Low_level.sleep_until +end + +class dir fd = object + inherit Eio.Dir.t + + val resolve_flags = Uring.Resolve.beneath + + method open_in ~sw path = + let fd = Low_level.openat2 ~sw ?dir:fd path + ~access:`R + ~flags:Uring.Open_flags.cloexec + ~perm:0 + ~resolve:resolve_flags + in + (flow fd :> ) + + method open_out ~sw ~append ~create path = + let perm, flags = + match create with + | `Never -> 0, Uring.Open_flags.empty + | `If_missing perm -> perm, Uring.Open_flags.creat + | `Or_truncate perm -> perm, Uring.Open_flags.(creat + trunc) + | `Exclusive perm -> perm, Uring.Open_flags.(creat + excl) + in + let flags = if append then Uring.Open_flags.(flags + append) else flags in + let fd = Low_level.openat2 ~sw ?dir:fd path + ~access:`RW + ~flags:Uring.Open_flags.(cloexec + flags) + ~perm + ~resolve:resolve_flags + in + (flow fd :> ) + + method open_dir ~sw path = + let fd = Low_level.openat2 ~sw ~seekable:false ?dir:fd path + ~access:`R + ~flags:Uring.Open_flags.(cloexec + path + directory) + ~perm:0 + ~resolve:resolve_flags + in + (new dir (Some fd) :> ) + + method mkdir ~perm path = + Low_level.mkdir_beneath ~perm ?dir:fd path + + method close = + FD.close (Option.get fd) +end + +(* Full access to the filesystem. *) +let fs = object + inherit dir None + + val! resolve_flags = Uring.Resolve.empty + + method! mkdir ~perm path = + Low_level.mkdirat ~perm None path +end + +let secure_random = object + inherit Eio.Flow.source + method read_methods = [] + method read_into buf = Low_level.getrandom buf +end + +let stdenv ~run_event_loop = + let of_unix fd = FD.of_unix_no_hook ~seekable:(FD.is_seekable fd) ~close_unix:true fd in + let stdin = lazy (source (of_unix Unix.stdin)) in + let stdout = lazy (sink (of_unix Unix.stdout)) in + let stderr = lazy (sink (of_unix Unix.stderr)) in + let cwd = new dir None in + object (_ : stdenv) + method stdin = Lazy.force stdin + method stdout = Lazy.force stdout + method stderr = Lazy.force stderr + method net = net + method domain_mgr = domain_mgr ~run_event_loop + method clock = clock + method fs = (fs :> Eio.Dir.t) + method cwd = (cwd :> Eio.Dir.t) + method secure_random = secure_random + end + let pipe sw = let r, w = Unix.pipe () in - let r = Objects.source (FD.of_unix ~sw ~seekable:false ~close_unix:true r) in - let w = Objects.sink (FD.of_unix ~sw ~seekable:false ~close_unix:true w) in + let r = source (FD.of_unix ~sw ~seekable:false ~close_unix:true r) in + let w = sink (FD.of_unix ~sw ~seekable:false ~close_unix:true w) in r, w let monitor_event_fd t = let buf = Cstruct.create 8 in while true do - let got = readv t.eventfd [buf] in + let got = Low_level.readv t.eventfd [buf] in Log.debug (fun f -> f "Received wakeup on eventfd %a" FD.pp t.eventfd); assert (got = 8); (* We just go back to sleep now, but this will cause the scheduler to look @@ -1022,7 +1022,7 @@ let with_uring ~fixed_buf_len ~queue_depth ?polling_timeout fn = let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main = Log.debug (fun l -> l "starting run"); - let stdenv = Objects.stdenv ~run_event_loop:(run ~queue_depth ~block_size ?polling_timeout) in + let stdenv = stdenv ~run_event_loop:(run ~queue_depth ~block_size ?polling_timeout) in (* TODO unify this allocation API around baregion/uring *) let fixed_buf_len = block_size * queue_depth in with_uring ~fixed_buf_len ~queue_depth ?polling_timeout @@ fun uring -> @@ -1059,7 +1059,7 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main = fn st k; schedule st ) - | ERead args -> Some (fun k -> + | Low_level.ERead args -> Some (fun k -> let k = { Suspended.k; fibre } in enqueue_read st k args; schedule st) @@ -1068,12 +1068,12 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main = enqueue_close st k fd; schedule st ) - | EWrite args -> Some (fun k -> + | Low_level.EWrite args -> Some (fun k -> let k = { Suspended.k; fibre } in enqueue_write st k args; schedule st ) - | Sleep_until time -> Some (fun k -> + | Low_level.Sleep_until time -> Some (fun k -> let k = { Suspended.k; fibre } in match Fibre_context.get_error fibre with | Some ex -> Suspended.discontinue k ex @@ -1128,12 +1128,12 @@ let rec run ?(queue_depth=64) ?(block_size=4096) ?polling_timeout main = ); schedule st ) - | Alloc -> Some (fun k -> + | Low_level.Alloc -> Some (fun k -> let k = { Suspended.k; fibre } in - alloc_buf st k + Low_level.alloc_buf st k ) - | Free buf -> Some (fun k -> - free_buf st buf; + | Low_level.Free buf -> Some (fun k -> + Low_level.free_buf st buf; continue k () ) | _ -> None diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index 9102005..89ea032 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -46,133 +46,135 @@ module FD : sig @raise Invalid_arg if [t] is closed. *) end -val noop : unit -> unit -(** [noop ()] performs a uring noop. This is only useful for benchmarking. *) - -(** {1 Time functions} *) - -val sleep_until : float -> unit -(** [sleep_until time] blocks until the current time is [time]. *) - -(** {1 Memory allocation functions} *) - -val alloc : unit -> Uring.Region.chunk - -val free : Uring.Region.chunk -> unit - -val with_chunk : (Uring.Region.chunk -> 'a) -> 'a -(** [with_chunk fn] runs [fn chunk] with a freshly allocated chunk and then frees it. *) - -(** {1 File manipulation functions} *) - -val openfile : sw:Switch.t -> string -> Unix.open_flag list -> int -> FD.t -(** Like {!Unix.open_file}. *) - -val openat2 : - sw:Switch.t -> - ?seekable:bool -> - access:[`R|`W|`RW] -> - flags:Uring.Open_flags.t -> - perm:Unix.file_perm -> - resolve:Uring.Resolve.t -> - ?dir:FD.t -> string -> FD.t -(** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path]. - See {!Uring.openat2} for details. *) - -val read_upto : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> int -(** [read_upto fd chunk len] reads at most [len] bytes from [fd], - returning as soon as some data is available. - @param file_offset Read from the given position in [fd] (default: 0). - @raise End_of_file Raised if all data has already been read. *) - -val read_exactly : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit -(** [read_exactly fd chunk len] reads exactly [len] bytes from [fd], - performing multiple read operations if necessary. - @param file_offset Read from the given position in [fd] (default: 0). - @raise End_of_file Raised if the stream ends before [len] bytes have been read. *) - -val readv : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int -(** [readv] is like {!read_upto} but can read into any cstruct(s), - not just chunks of the pre-shared buffer. - If multiple buffers are given, they are filled in order. *) - -val write : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit -(** [write fd buf len] writes exactly [len] bytes from [buf] to [fd]. - It blocks until the OS confirms the write is done, - and resubmits automatically if the OS doesn't write all of it at once. *) - -val writev : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit -(** [writev] is like {!write} but can write from any cstruct(s), - not just chunks of the pre-shared buffer. - If multiple buffers are given, they are sent in order. - It will make multiple OS calls if the OS doesn't write all of it at once. *) - -val splice : FD.t -> dst:FD.t -> len:int -> int -(** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. - @return The number of bytes copied. - @raise End_of_file [src] is at the end of the file. - @raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *) - -val connect : FD.t -> Unix.sockaddr -> unit -(** [connect fd addr] attempts to connect socket [fd] to [addr]. *) - -val await_readable : FD.t -> unit -(** [await_readable fd] blocks until [fd] is readable (or has an error). *) - -val await_writable : FD.t -> unit -(** [await_writable fd] blocks until [fd] is writable (or has an error). *) - -val fstat : FD.t -> Unix.stats -(** Like {!Unix.fstat}. *) - -(** {1 Sockets} *) - -val accept : sw:Switch.t -> FD.t -> (FD.t * Unix.sockaddr) -(** [accept ~sw t] blocks until a new connection is received on listening socket [t]. - It returns the new connection and the address of the connecting peer. - The new connection has the close-on-exec flag set automatically. - The new connection is attached to [sw] and will be closed when that finishes, if - not already closed manually by then. *) - -val shutdown : FD.t -> Unix.shutdown_command -> unit -(** Like {!Unix.shutdown}. *) - -(** {1 Randomness} *) - -val getrandom : Cstruct.t -> int -(**[ getrandom buf] reads some random bytes into [buf] and returns the number of bytes written. - It uses Linux's [getrandom] call, which is like reading from /dev/urandom - except that it will block (the whole domain) if used at early boot - when the random system hasn't been initialised yet. *) - (** {1 Eio API} *) -module Objects : sig - type has_fd = < fd : FD.t > - type source = < Eio.Flow.source; Eio.Flow.close; has_fd > - type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > +type has_fd = < fd : FD.t > +type source = < Eio.Flow.source; Eio.Flow.close; has_fd > +type sink = < Eio.Flow.sink ; Eio.Flow.close; has_fd > - type stdenv = < - stdin : source; - stdout : sink; - stderr : sink; - net : Eio.Net.t; - domain_mgr : Eio.Domain_manager.t; - clock : Eio.Time.clock; - fs : Eio.Dir.t; - cwd : Eio.Dir.t; - secure_random : Eio.Flow.source; - > +type stdenv = < + stdin : source; + stdout : sink; + stderr : sink; + net : Eio.Net.t; + domain_mgr : Eio.Domain_manager.t; + clock : Eio.Time.clock; + fs : Eio.Dir.t; + cwd : Eio.Dir.t; + secure_random : Eio.Flow.source; +> - val get_fd : -> FD.t - val get_fd_opt : #Eio.Generic.t -> FD.t option -end +val get_fd : -> FD.t +val get_fd_opt : #Eio.Generic.t -> FD.t option -val pipe : Switch.t -> Objects.source * Objects.sink +val pipe : Switch.t -> source * sink (** [pipe sw] is a source-sink pair [(r, w)], where data written to [w] can be read from [r]. It is implemented as a Unix pipe. *) (** {1 Main Loop} *) -val run : ?queue_depth:int -> ?block_size:int -> ?polling_timeout:int -> (Objects.stdenv -> unit) -> unit -(** FIXME queue_depth and block_size should be in a handler and not the mainloop *) +val run : ?queue_depth:int -> ?block_size:int -> ?polling_timeout:int -> (stdenv -> unit) -> unit +(** Run an event loop using io_uring. + For portable code, you should use {!Eio_main.run} instead, which will use this automatically + if running on Linux with a recent-enough kernel version. *) + +module Low_level : sig + val noop : unit -> unit + (** [noop ()] performs a uring noop. This is only useful for benchmarking. *) + + (** {1 Time functions} *) + + val sleep_until : float -> unit + (** [sleep_until time] blocks until the current time is [time]. *) + + (** {1 Memory allocation functions} *) + + val alloc : unit -> Uring.Region.chunk + + val free : Uring.Region.chunk -> unit + + val with_chunk : (Uring.Region.chunk -> 'a) -> 'a + (** [with_chunk fn] runs [fn chunk] with a freshly allocated chunk and then frees it. *) + + (** {1 File manipulation functions} *) + + val openfile : sw:Switch.t -> string -> Unix.open_flag list -> int -> FD.t + (** Like {!Unix.open_file}. *) + + val openat2 : + sw:Switch.t -> + ?seekable:bool -> + access:[`R|`W|`RW] -> + flags:Uring.Open_flags.t -> + perm:Unix.file_perm -> + resolve:Uring.Resolve.t -> + ?dir:FD.t -> string -> FD.t + (** [openat2 ~sw ~flags ~perm ~resolve ~dir path] opens [dir/path]. + See {!Uring.openat2} for details. *) + + val read_upto : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> int + (** [read_upto fd chunk len] reads at most [len] bytes from [fd], + returning as soon as some data is available. + @param file_offset Read from the given position in [fd] (default: 0). + @raise End_of_file Raised if all data has already been read. *) + + val read_exactly : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit + (** [read_exactly fd chunk len] reads exactly [len] bytes from [fd], + performing multiple read operations if necessary. + @param file_offset Read from the given position in [fd] (default: 0). + @raise End_of_file Raised if the stream ends before [len] bytes have been read. *) + + val readv : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int + (** [readv] is like {!read_upto} but can read into any cstruct(s), + not just chunks of the pre-shared buffer. + If multiple buffers are given, they are filled in order. *) + + val write : ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit + (** [write fd buf len] writes exactly [len] bytes from [buf] to [fd]. + It blocks until the OS confirms the write is done, + and resubmits automatically if the OS doesn't write all of it at once. *) + + val writev : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit + (** [writev] is like {!write} but can write from any cstruct(s), + not just chunks of the pre-shared buffer. + If multiple buffers are given, they are sent in order. + It will make multiple OS calls if the OS doesn't write all of it at once. *) + + val splice : FD.t -> dst:FD.t -> len:int -> int + (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. + @return The number of bytes copied. + @raise End_of_file [src] is at the end of the file. + @raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *) + + val connect : FD.t -> Unix.sockaddr -> unit + (** [connect fd addr] attempts to connect socket [fd] to [addr]. *) + + val await_readable : FD.t -> unit + (** [await_readable fd] blocks until [fd] is readable (or has an error). *) + + val await_writable : FD.t -> unit + (** [await_writable fd] blocks until [fd] is writable (or has an error). *) + + val fstat : FD.t -> Unix.stats + (** Like {!Unix.fstat}. *) + + (** {1 Sockets} *) + + val accept : sw:Switch.t -> FD.t -> (FD.t * Unix.sockaddr) + (** [accept ~sw t] blocks until a new connection is received on listening socket [t]. + It returns the new connection and the address of the connecting peer. + The new connection has the close-on-exec flag set automatically. + The new connection is attached to [sw] and will be closed when that finishes, if + not already closed manually by then. *) + + val shutdown : FD.t -> Unix.shutdown_command -> unit + (** Like {!Unix.shutdown}. *) + + (** {1 Randomness} *) + + val getrandom : Cstruct.t -> int + (**[ getrandom buf] reads some random bytes into [buf] and returns the number of bytes written. + It uses Linux's [getrandom] call, which is like reading from /dev/urandom + except that it will block (the whole domain) if used at early boot + when the random system hasn't been initialised yet. *) +end diff --git a/lib_eio_linux/tests/basic_eio_linux.ml b/lib_eio_linux/tests/basic_eio_linux.ml index a0d04f3..2a9363c 100644 --- a/lib_eio_linux/tests/basic_eio_linux.ml +++ b/lib_eio_linux/tests/basic_eio_linux.ml @@ -1,6 +1,6 @@ (* basic tests using effects *) -open Eio_linux +open Eio_linux.Low_level open Eio.Std module Int63 = Optint.Int63 @@ -11,7 +11,7 @@ let setup_log level = let () = setup_log (Some Logs.Debug); - run @@ fun _stdenv -> + Eio_linux.run @@ fun _stdenv -> Switch.run @@ fun sw -> let fd = Unix.handle_unix_error (openfile ~sw "test.txt" Unix.[O_RDONLY]) 0 in let buf = alloc () in diff --git a/lib_eio_linux/tests/bench_noop.ml b/lib_eio_linux/tests/bench_noop.ml index f67d21c..42c0044 100644 --- a/lib_eio_linux/tests/bench_noop.ml +++ b/lib_eio_linux/tests/bench_noop.ml @@ -13,7 +13,7 @@ let main ~clock = for _ = 1 to n_fibres do Fibre.fork ~sw (fun () -> for _ = 1 to n_iters do - Eio_linux.noop () + Eio_linux.Low_level.noop () done ) done diff --git a/lib_eio_linux/tests/eurcp_lib.ml b/lib_eio_linux/tests/eurcp_lib.ml index bc7e8ea..625f129 100644 --- a/lib_eio_linux/tests/eurcp_lib.ml +++ b/lib_eio_linux/tests/eurcp_lib.ml @@ -2,7 +2,7 @@ open Eio.Std -module U = Eio_linux +module U = Eio_linux.Low_level module Int63 = Optint.Int63 let read_then_write_chunk infd outfd file_offset len = @@ -26,12 +26,12 @@ let copy_file infd outfd insize block_size = copy_block Int63.zero let run_cp block_size queue_depth infile outfile () = - U.run ~queue_depth ~block_size @@ fun _stdenv -> + Eio_linux.run ~queue_depth ~block_size @@ fun _stdenv -> Switch.run @@ fun sw -> let open Unix in - let infd = Eio_linux.openfile ~sw infile [O_RDONLY] 0 in - let outfd = Eio_linux.openfile ~sw outfile [O_WRONLY; O_CREAT; O_TRUNC] 0o644 in - let insize = Eio_linux.fstat infd |> fun {st_size; _} -> Int63.of_int st_size in + let infd = U.openfile ~sw infile [O_RDONLY] 0 in + let outfd = U.openfile ~sw outfile [O_WRONLY; O_CREAT; O_TRUNC] 0o644 in + let insize = U.fstat infd |> fun {st_size; _} -> Int63.of_int st_size in Logs.debug (fun l -> l "eurcp: %s -> %s size %a queue %d bs %d" infile outfile diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index a91a086..b2830df 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -7,8 +7,8 @@ let () = let read_one_byte ~sw r = Fibre.fork_promise ~sw (fun () -> - let r = Option.get (Eio_linux.Objects.get_fd_opt r) in - Eio_linux.await_readable r; + let r = Option.get (Eio_linux.get_fd_opt r) in + Eio_linux.Low_level.await_readable r; let b = Bytes.create 1 in let got = Unix.read (Eio_linux.FD.to_unix `Peek r) b 0 1 in assert (got = 1); @@ -21,8 +21,8 @@ let test_poll_add () = let r, w = Eio_linux.pipe sw in let thread = read_one_byte ~sw r in Fibre.yield (); - let w = Option.get (Eio_linux.Objects.get_fd_opt w) in - Eio_linux.await_writable w; + let w = Option.get (Eio_linux.get_fd_opt w) in + Eio_linux.Low_level.await_writable w; let sent = Unix.write (Eio_linux.FD.to_unix `Peek w) (Bytes.of_string "!") 0 1 in assert (sent = 1); let result = Promise.await thread in @@ -35,7 +35,7 @@ let test_poll_add_busy () = let a = read_one_byte ~sw r in let b = read_one_byte ~sw r in Fibre.yield (); - let w = Option.get (Eio_linux.Objects.get_fd_opt w) |> Eio_linux.FD.to_unix `Peek in + let w = Option.get (Eio_linux.get_fd_opt w) |> Eio_linux.FD.to_unix `Peek in let sent = Unix.write w (Bytes.of_string "!!") 0 2 in assert (sent = 2); let a = Promise.await a in @@ -84,20 +84,20 @@ let test_iovec () = Eio_linux.run ~queue_depth:4 @@ fun _stdenv -> Switch.run @@ fun sw -> let from_pipe, to_pipe = Eio_linux.pipe sw in - let from_pipe = Eio_linux.Objects.get_fd from_pipe in - let to_pipe = Eio_linux.Objects.get_fd to_pipe in + let from_pipe = Eio_linux.get_fd from_pipe in + let to_pipe = Eio_linux.get_fd to_pipe in let message = Cstruct.of_string "Got [ ] and [ ]" in let rec recv = function | [] -> () | cs -> - let got = Eio_linux.readv from_pipe cs in + let got = Eio_linux.Low_level.readv from_pipe cs in recv (Cstruct.shiftv cs got) in Fibre.both (fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3]) (fun () -> let b = Cstruct.of_string "barfoo" in - Eio_linux.writev to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; + Eio_linux.Low_level.writev to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; Eio_linux.FD.close to_pipe ); Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message)