diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 4be7fa7..47002e8 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -29,6 +29,19 @@ type amount = Exactly of int | Upto of int let system_thread = Ctf.mint_id () +let rec skip_empty = function + | c :: cs when Cstruct.length c = 0 -> skip_empty cs + | x -> x + +(* todo: use Cstruct.shiftv when that's released *) +let rec shiftv cs = function + | 0 -> skip_empty cs + | n -> + match cs with + | [] -> failwith "Can't advance past end of vector!" + | c :: cs when n >= Cstruct.length c -> shiftv cs (n - Cstruct.length c) + | c :: cs -> Cstruct.shift c n :: cs + effect Close : Unix.file_descr -> int module FD = struct @@ -94,6 +107,8 @@ type cancel_hook = Switch.hook ref (* Type of user-data attached to jobs. *) type io_job = | Read : rw_req * cancel_hook -> io_job + | Readv : int Suspended.t * cancel_hook -> io_job + | Writev : int Suspended.t * cancel_hook -> io_job | Poll_add : int Suspended.t * cancel_hook -> io_job | Splice : int Suspended.t * cancel_hook -> io_job | Openat2 : int Suspended.t * cancel_hook -> io_job @@ -207,6 +222,36 @@ let enqueue_read st action (sw,file_offset,fd,buf,len) = Ctf.label "read"; submit_rw_req st req +let rec enqueue_readv st action args = + let (sw,file_offset,fd,bufs) = args in + let file_offset = + match file_offset with + | Some x -> x + | None -> FD.uring_file_offset fd + in + Ctf.label "readv"; + let retry = with_cancel_hook ?sw ~action st (fun cancel -> + Uring.readv st.uring ~file_offset (FD.get "readv" fd) bufs (Readv (action, cancel)) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_readv st action args) st.io_q + +let rec enqueue_writev st action args = + let (sw,file_offset,fd,bufs) = args in + let file_offset = + match file_offset with + | Some x -> x + | None -> FD.uring_file_offset fd + in + Ctf.label "writev"; + let retry = with_cancel_hook ?sw ~action st (fun cancel -> + Uring.writev st.uring ~file_offset (FD.get "writev" fd) bufs (Writev (action, cancel)) + ) + in + if retry then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_writev st action args) st.io_q + let rec enqueue_poll_add ?sw st action fd poll_mask = Log.debug (fun l -> l "poll_add: submitting call"); Ctf.label "poll_add"; @@ -344,10 +389,18 @@ and handle_complete st ~runnable result = Log.debug (fun l -> l "read returned"); Switch.remove_hook !cancel; complete_rw_req st req result + | Readv (k, cancel) -> + Log.debug (fun l -> l "readv returned"); + Switch.remove_hook !cancel; + Suspended.continue k result | Write (req, cancel) -> Log.debug (fun l -> l "write returned"); Switch.remove_hook !cancel; complete_rw_req st req result + | Writev (k, cancel) -> + Log.debug (fun l -> l "writev returned"); + Switch.remove_hook !cancel; + Suspended.continue k result | Poll_add (k, cancel) -> Log.debug (fun l -> l "poll_add returned"); Switch.remove_hook !cancel; @@ -428,6 +481,42 @@ let read_upto ?sw ?file_offset fd buf len = res ) +effect EReadv : (Switch.t option * Optint.Int63.t option * FD.t * Cstruct.t list) -> int + +let readv ?sw ?file_offset fd bufs = + let res = perform (EReadv (sw, file_offset, fd, bufs)) in + Log.debug (fun l -> l "readv: woken up after read"); + if res < 0 then ( + Option.iter Switch.check sw; (* If cancelled, report that instead. *) + raise (Unix.Unix_error (Uring.error_of_errno res, "readv", "")) + ) else if res = 0 then ( + raise End_of_file + ) else ( + res + ) + +effect EWritev : (Switch.t option * Optint.Int63.t option * FD.t * Cstruct.t list) -> int + +let rec writev ?sw ?file_offset fd bufs = + let res = perform (EWritev (sw, file_offset, fd, bufs)) in + Log.debug (fun l -> l "writev: woken up after read"); + if res < 0 then ( + Option.iter Switch.check sw; (* If cancelled, report that instead. *) + raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) + ) else ( + match shiftv bufs res with + | [] -> () + | bufs -> + let file_offset = + let module I63 = Optint.Int63 in + match file_offset with + | None -> None + | Some ofs when ofs = I63.minus_one -> Some I63.minus_one + | Some ofs -> Some (I63.add ofs (I63.of_int res)) + in + writev ?sw ?file_offset fd bufs + ) + effect EPoll_add : Switch.t option * FD.t * Uring.Poll_mask.t -> int let await_readable ?sw fd = @@ -829,6 +918,14 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let k = { Suspended.k; tid } in enqueue_read st k args; schedule st + | effect (EReadv args) k -> + let k = { Suspended.k; tid } in + enqueue_readv st k args; + schedule st + | effect (EWritev args) k -> + let k = { Suspended.k; tid } in + enqueue_writev st k args; + schedule st | effect (EPoll_add (sw, fd, poll_mask)) k -> let k = { Suspended.k; tid } in enqueue_poll_add ?sw st k fd poll_mask; diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index d906baf..c80cb3a 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -86,11 +86,22 @@ val read_exactly : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring. @param file_offset Read from the given position in [fd] (default: 0). @raise End_of_file Raised if the stream ends before [len] bytes have been read. *) +val readv : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int +(** [readv] is like {!read_upto} but can read into any cstruct(s), + not just chunks of the pre-shared buffer. + If multiple buffers are given, they are filled in order. *) + val write : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit (** [write fd buf len] writes exactly [len] bytes from [buf] to [fd]. It blocks until the OS confirms the write is done, and resubmits automatically if the OS doesn't write all of it at once. *) +val writev : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit +(** [writev] is like {!write} but can write from any cstruct(s), + not just chunks of the pre-shared buffer. + If multiple buffers are given, they are sent in order. + It will make multiple OS calls if the OS doesn't write all of it at once. *) + val splice : ?sw:Switch.t -> FD.t -> dst:FD.t -> len:int -> int (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. @return The number of bytes copied. diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index eef2d30..8cd4897 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -79,6 +79,42 @@ let test_direct_copy () = Eio.Flow.close from_pipe1; Eio.Flow.close from_pipe2 +let rec skip_empty = function + | c :: cs when Cstruct.length c = 0 -> skip_empty cs + | x -> x + +(* todo: use Cstruct.shiftv when that's released *) +let rec shiftv cs = function + | 0 -> skip_empty cs + | n -> + match cs with + | [] -> failwith "Can't advance past end of vector!" + | c :: cs when n >= Cstruct.length c -> shiftv cs (n - Cstruct.length c) + | c :: cs -> Cstruct.shift c n :: cs + +(* Read and write using IO vectors rather than the fixed buffers. *) +let test_iovec () = + Eio_linux.run ~queue_depth:4 @@ fun _stdenv -> + Switch.top @@ fun sw -> + let from_pipe, to_pipe = Eio_linux.pipe sw in + let from_pipe = Eio_linux.Objects.get_fd from_pipe in + let to_pipe = Eio_linux.Objects.get_fd to_pipe in + let message = Cstruct.of_string "Got [ ] and [ ]" in + let rec recv = function + | [] -> () + | cs -> + let got = Eio_linux.readv ~sw from_pipe cs in + recv (shiftv cs got) + in + Fibre.both ~sw + (fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3]) + (fun () -> + let b = Cstruct.of_string "barfoo" in + Eio_linux.writev ~sw to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3]; + Eio_linux.FD.close to_pipe + ); + Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message) + let () = let open Alcotest in run "eioio" [ @@ -87,5 +123,6 @@ let () = test_case "direct_copy" `Quick test_direct_copy; test_case "poll_add" `Quick test_poll_add; test_case "poll_add_busy" `Quick test_poll_add_busy; + test_case "iovec" `Quick test_iovec; ]; ]