From 2f54e3a198b5311fa82ee490551a417d30cd02c4 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Mon, 27 Jun 2022 10:10:52 +0100 Subject: [PATCH] Allow short writes in Read_source_buffer When performing buffered writes and the OS doesn't write all the data in one go, it probably means it isn't ready for more. We can let the application produce more data while we're waiting and then do the next write with more buffers, instead of trying to finish the original shorter write first. This changes `Read_source_buffer` to allow returning the number of bytes written. It also updates the mock output to show the individual buffers being written rather than the combined string. Also, add `Linux_eio.Low_level.writev_single` to expose this behaviour directly. --- lib_eio/flow.ml | 6 ++++-- lib_eio/flow.mli | 4 ++-- lib_eio/mock/flow.ml | 21 +++++++++++---------- lib_eio_linux/eio_linux.ml | 30 +++++++++++++++++------------- lib_eio_linux/eio_linux.mli | 4 ++++ tests/test_flow.md | 4 ++-- 6 files changed, 40 insertions(+), 29 deletions(-) diff --git a/lib_eio/flow.ml b/lib_eio/flow.ml index fc6ad21..1c56c09 100644 --- a/lib_eio/flow.ml +++ b/lib_eio/flow.ml @@ -1,7 +1,7 @@ type shutdown_command = [ `Receive | `Send | `All ] type read_method = .. -type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) +type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit) class type close = object method close : unit @@ -39,7 +39,9 @@ let cstruct_source data : source = match data with | [] -> raise End_of_file | x :: xs when Cstruct.length x = 0 -> data <- xs; aux () - | xs -> data <- []; fn xs + | xs -> + let n = fn xs in + data <- Cstruct.shiftv xs n in aux () diff --git a/lib_eio/flow.mli b/lib_eio/flow.mli index 3c3cb6f..24d0c23 100644 --- a/lib_eio/flow.mli +++ b/lib_eio/flow.mli @@ -47,9 +47,9 @@ val string_source : string -> source val cstruct_source : Cstruct.t list -> source (** [cstruct_source cs] is a source that gives the bytes of [cs]. *) -type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) +type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit) (** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn] - to borrow a view of the source's buffers. + to borrow a view of the source's buffers. [fn] returns the number of bytes it consumed. [rsb] will raise [End_of_file] if no more data will be produced. If no data is currently available, [rsb] will wait for some to become available before calling [fn]. diff --git a/lib_eio/mock/flow.ml b/lib_eio/mock/flow.ml index 1e4e893..71ed05c 100644 --- a/lib_eio/mock/flow.ml +++ b/lib_eio/mock/flow.ml @@ -28,21 +28,22 @@ let pp_default f s = in aux 0 +let rec takev len = function + | [] -> [] + | x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len] + | x :: xs -> x :: takev (len - Cstruct.length x) xs + let make ?(pp=pp_default) label = let on_read = Handler.make (`Raise End_of_file) in let on_copy_bytes = Handler.make (`Return 4096) in let copy_method = ref `Read_into in (* Test optimised copying using Read_source_buffer *) - let rec copy_rsb_iovec = function - | [] -> () - | src -> - let size = Handler.run on_copy_bytes in - let len = min (Cstruct.lenv src) size in - let dst = Cstruct.create len in - let n, src = Cstruct.fillv ~src ~dst in - assert (n = len); - traceln "%s: wrote (rsb) @[%a@]" label pp (Cstruct.to_string dst); - copy_rsb_iovec src + let copy_rsb_iovec src = + let size = Handler.run on_copy_bytes in + let len = min (Cstruct.lenv src) size in + let bufs = takev len src in + traceln "%s: wrote (rsb) @[%a@]" label (Fmt.Dump.list (Fmt.using Cstruct.to_string pp)) bufs; + len in let copy_rsb rsb = try while true do rsb copy_rsb_iovec done diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 9a5f232..fa06366 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -603,25 +603,29 @@ module Low_level = struct res ) - let rec writev ?file_offset fd bufs = + let writev_single ?file_offset fd bufs = let res = enter (enqueue_writev (file_offset, fd, bufs)) in Log.debug (fun l -> l "writev: woken up after write"); if res < 0 then ( raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) ) else ( - match Cstruct.shiftv bufs res with - | [] -> () - | bufs -> - let file_offset = - let module I63 = Optint.Int63 in - match file_offset with - | None -> None - | Some ofs when ofs = I63.minus_one -> Some I63.minus_one - | Some ofs -> Some (I63.add ofs (I63.of_int res)) - in - writev ?file_offset fd bufs + res ) + let rec writev ?file_offset fd bufs = + let bytes_written = writev_single ?file_offset fd bufs in + match Cstruct.shiftv bufs bytes_written with + | [] -> () + | bufs -> + let file_offset = + let module I63 = Optint.Int63 in + match file_offset with + | None -> None + | Some ofs when ofs = I63.minus_one -> Some I63.minus_one + | Some ofs -> Some (I63.add ofs (I63.of_int bytes_written)) + in + writev ?file_offset fd bufs + let await_readable fd = let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in Log.debug (fun l -> l "await_readable: woken up"); @@ -847,7 +851,7 @@ let fast_copy_try_splice src dst = let copy_with_rsb rsb dst = try while true do - rsb (Low_level.writev dst) + rsb (Low_level.writev_single dst) done with End_of_file -> () diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index 2437c07..2083aeb 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -186,6 +186,10 @@ module Low_level : sig If multiple buffers are given, they are sent in order. It will make multiple OS calls if the OS doesn't write all of it at once. *) + val writev_single : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int + (** [writev_single] is like [writev] but only performs a single write operation. + It returns the number of bytes written, which may be smaller than the requested amount. *) + val splice : FD.t -> dst:FD.t -> len:int -> int (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. diff --git a/tests/test_flow.md b/tests/test_flow.md index 4cbab16..6195687 100644 --- a/tests/test_flow.md +++ b/tests/test_flow.md @@ -89,7 +89,7 @@ Copying from src using `Read_source_buffer`: Eio_mock.Flow.set_copy_method dst `Read_source_buffer; Eio_mock.Flow.on_copy_bytes dst [`Return 3; `Return 5]; Eio.Flow.copy src dst;; -+dst: wrote (rsb) "foo" -+dst: wrote (rsb) "bar" ++dst: wrote (rsb) ["foo"] ++dst: wrote (rsb) ["bar"] - : unit = () ```