diff --git a/lib_eio/flow.ml b/lib_eio/flow.ml index fc6ad21..1c56c09 100644 --- a/lib_eio/flow.ml +++ b/lib_eio/flow.ml @@ -1,7 +1,7 @@ type shutdown_command = [ `Receive | `Send | `All ] type read_method = .. -type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) +type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit) class type close = object method close : unit @@ -39,7 +39,9 @@ let cstruct_source data : source = match data with | [] -> raise End_of_file | x :: xs when Cstruct.length x = 0 -> data <- xs; aux () - | xs -> data <- []; fn xs + | xs -> + let n = fn xs in + data <- Cstruct.shiftv xs n in aux () diff --git a/lib_eio/flow.mli b/lib_eio/flow.mli index 3c3cb6f..24d0c23 100644 --- a/lib_eio/flow.mli +++ b/lib_eio/flow.mli @@ -47,9 +47,9 @@ val string_source : string -> source val cstruct_source : Cstruct.t list -> source (** [cstruct_source cs] is a source that gives the bytes of [cs]. *) -type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) +type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit) (** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn] - to borrow a view of the source's buffers. + to borrow a view of the source's buffers. [fn] returns the number of bytes it consumed. [rsb] will raise [End_of_file] if no more data will be produced. If no data is currently available, [rsb] will wait for some to become available before calling [fn]. diff --git a/lib_eio/mock/flow.ml b/lib_eio/mock/flow.ml index 1e4e893..71ed05c 100644 --- a/lib_eio/mock/flow.ml +++ b/lib_eio/mock/flow.ml @@ -28,21 +28,22 @@ let pp_default f s = in aux 0 +let rec takev len = function + | [] -> [] + | x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len] + | x :: xs -> x :: takev (len - Cstruct.length x) xs + let make ?(pp=pp_default) label = let on_read = Handler.make (`Raise End_of_file) in let on_copy_bytes = Handler.make (`Return 4096) in let copy_method = ref `Read_into in (* Test optimised copying using Read_source_buffer *) - let rec copy_rsb_iovec = function - | [] -> () - | src -> - let size = Handler.run on_copy_bytes in - let len = min (Cstruct.lenv src) size in - let dst = Cstruct.create len in - let n, src = Cstruct.fillv ~src ~dst in - assert (n = len); - traceln "%s: wrote (rsb) @[%a@]" label pp (Cstruct.to_string dst); - copy_rsb_iovec src + let copy_rsb_iovec src = + let size = Handler.run on_copy_bytes in + let len = min (Cstruct.lenv src) size in + let bufs = takev len src in + traceln "%s: wrote (rsb) @[%a@]" label (Fmt.Dump.list (Fmt.using Cstruct.to_string pp)) bufs; + len in let copy_rsb rsb = try while true do rsb copy_rsb_iovec done diff --git a/lib_eio_linux/eio_linux.ml b/lib_eio_linux/eio_linux.ml index 9a5f232..fa06366 100644 --- a/lib_eio_linux/eio_linux.ml +++ b/lib_eio_linux/eio_linux.ml @@ -603,25 +603,29 @@ module Low_level = struct res ) - let rec writev ?file_offset fd bufs = + let writev_single ?file_offset fd bufs = let res = enter (enqueue_writev (file_offset, fd, bufs)) in Log.debug (fun l -> l "writev: woken up after write"); if res < 0 then ( raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) ) else ( - match Cstruct.shiftv bufs res with - | [] -> () - | bufs -> - let file_offset = - let module I63 = Optint.Int63 in - match file_offset with - | None -> None - | Some ofs when ofs = I63.minus_one -> Some I63.minus_one - | Some ofs -> Some (I63.add ofs (I63.of_int res)) - in - writev ?file_offset fd bufs + res ) + let rec writev ?file_offset fd bufs = + let bytes_written = writev_single ?file_offset fd bufs in + match Cstruct.shiftv bufs bytes_written with + | [] -> () + | bufs -> + let file_offset = + let module I63 = Optint.Int63 in + match file_offset with + | None -> None + | Some ofs when ofs = I63.minus_one -> Some I63.minus_one + | Some ofs -> Some (I63.add ofs (I63.of_int bytes_written)) + in + writev ?file_offset fd bufs + let await_readable fd = let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in Log.debug (fun l -> l "await_readable: woken up"); @@ -847,7 +851,7 @@ let fast_copy_try_splice src dst = let copy_with_rsb rsb dst = try while true do - rsb (Low_level.writev dst) + rsb (Low_level.writev_single dst) done with End_of_file -> () diff --git a/lib_eio_linux/eio_linux.mli b/lib_eio_linux/eio_linux.mli index 2437c07..2083aeb 100644 --- a/lib_eio_linux/eio_linux.mli +++ b/lib_eio_linux/eio_linux.mli @@ -186,6 +186,10 @@ module Low_level : sig If multiple buffers are given, they are sent in order. It will make multiple OS calls if the OS doesn't write all of it at once. *) + val writev_single : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int + (** [writev_single] is like [writev] but only performs a single write operation. + It returns the number of bytes written, which may be smaller than the requested amount. *) + val splice : FD.t -> dst:FD.t -> len:int -> int (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. diff --git a/tests/test_flow.md b/tests/test_flow.md index 4cbab16..6195687 100644 --- a/tests/test_flow.md +++ b/tests/test_flow.md @@ -89,7 +89,7 @@ Copying from src using `Read_source_buffer`: Eio_mock.Flow.set_copy_method dst `Read_source_buffer; Eio_mock.Flow.on_copy_bytes dst [`Return 3; `Return 5]; Eio.Flow.copy src dst;; -+dst: wrote (rsb) "foo" -+dst: wrote (rsb) "bar" ++dst: wrote (rsb) ["foo"] ++dst: wrote (rsb) ["bar"] - : unit = () ```