Allow short writes in Read_source_buffer

When performing buffered writes and the OS doesn't write all the data in
one go, it probably means it isn't ready for more. We can let the
application produce more data while we're waiting and then do the next
write with more buffers, instead of trying to finish the original
shorter write first.

This changes `Read_source_buffer` to allow returning the number of bytes
written. It also updates the mock output to show the individual buffers
being written rather than the combined string.

Also, add `Linux_eio.Low_level.writev_single` to expose this behaviour
directly.
This commit is contained in:
Thomas Leonard 2022-06-27 10:10:52 +01:00
parent 080d2c31e4
commit 2f54e3a198
6 changed files with 40 additions and 29 deletions

View File

@ -1,7 +1,7 @@
type shutdown_command = [ `Receive | `Send | `All ] type shutdown_command = [ `Receive | `Send | `All ]
type read_method = .. type read_method = ..
type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit)
class type close = object class type close = object
method close : unit method close : unit
@ -39,7 +39,9 @@ let cstruct_source data : source =
match data with match data with
| [] -> raise End_of_file | [] -> raise End_of_file
| x :: xs when Cstruct.length x = 0 -> data <- xs; aux () | x :: xs when Cstruct.length x = 0 -> data <- xs; aux ()
| xs -> data <- []; fn xs | xs ->
let n = fn xs in
data <- Cstruct.shiftv xs n
in in
aux () aux ()

View File

@ -47,9 +47,9 @@ val string_source : string -> source
val cstruct_source : Cstruct.t list -> source val cstruct_source : Cstruct.t list -> source
(** [cstruct_source cs] is a source that gives the bytes of [cs]. *) (** [cstruct_source cs] is a source that gives the bytes of [cs]. *)
type read_method += Read_source_buffer of ((Cstruct.t list -> unit) -> unit) type read_method += Read_source_buffer of ((Cstruct.t list -> int) -> unit)
(** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn] (** If a source offers [Read_source_buffer rsb] then the user can call [rsb fn]
to borrow a view of the source's buffers. to borrow a view of the source's buffers. [fn] returns the number of bytes it consumed.
[rsb] will raise [End_of_file] if no more data will be produced. [rsb] will raise [End_of_file] if no more data will be produced.
If no data is currently available, [rsb] will wait for some to become available before calling [fn]. If no data is currently available, [rsb] will wait for some to become available before calling [fn].

View File

@ -28,21 +28,22 @@ let pp_default f s =
in in
aux 0 aux 0
let rec takev len = function
| [] -> []
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs
let make ?(pp=pp_default) label = let make ?(pp=pp_default) label =
let on_read = Handler.make (`Raise End_of_file) in let on_read = Handler.make (`Raise End_of_file) in
let on_copy_bytes = Handler.make (`Return 4096) in let on_copy_bytes = Handler.make (`Return 4096) in
let copy_method = ref `Read_into in let copy_method = ref `Read_into in
(* Test optimised copying using Read_source_buffer *) (* Test optimised copying using Read_source_buffer *)
let rec copy_rsb_iovec = function let copy_rsb_iovec src =
| [] -> ()
| src ->
let size = Handler.run on_copy_bytes in let size = Handler.run on_copy_bytes in
let len = min (Cstruct.lenv src) size in let len = min (Cstruct.lenv src) size in
let dst = Cstruct.create len in let bufs = takev len src in
let n, src = Cstruct.fillv ~src ~dst in traceln "%s: wrote (rsb) @[<v>%a@]" label (Fmt.Dump.list (Fmt.using Cstruct.to_string pp)) bufs;
assert (n = len); len
traceln "%s: wrote (rsb) @[<v>%a@]" label pp (Cstruct.to_string dst);
copy_rsb_iovec src
in in
let copy_rsb rsb = let copy_rsb rsb =
try while true do rsb copy_rsb_iovec done try while true do rsb copy_rsb_iovec done

View File

@ -603,13 +603,18 @@ module Low_level = struct
res res
) )
let rec writev ?file_offset fd bufs = let writev_single ?file_offset fd bufs =
let res = enter (enqueue_writev (file_offset, fd, bufs)) in let res = enter (enqueue_writev (file_offset, fd, bufs)) in
Log.debug (fun l -> l "writev: woken up after write"); Log.debug (fun l -> l "writev: woken up after write");
if res < 0 then ( if res < 0 then (
raise (Unix.Unix_error (Uring.error_of_errno res, "writev", "")) raise (Unix.Unix_error (Uring.error_of_errno res, "writev", ""))
) else ( ) else (
match Cstruct.shiftv bufs res with res
)
let rec writev ?file_offset fd bufs =
let bytes_written = writev_single ?file_offset fd bufs in
match Cstruct.shiftv bufs bytes_written with
| [] -> () | [] -> ()
| bufs -> | bufs ->
let file_offset = let file_offset =
@ -617,10 +622,9 @@ module Low_level = struct
match file_offset with match file_offset with
| None -> None | None -> None
| Some ofs when ofs = I63.minus_one -> Some I63.minus_one | Some ofs when ofs = I63.minus_one -> Some I63.minus_one
| Some ofs -> Some (I63.add ofs (I63.of_int res)) | Some ofs -> Some (I63.add ofs (I63.of_int bytes_written))
in in
writev ?file_offset fd bufs writev ?file_offset fd bufs
)
let await_readable fd = let await_readable fd =
let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in let res = enter (enqueue_poll_add fd (Uring.Poll_mask.(pollin + pollerr))) in
@ -847,7 +851,7 @@ let fast_copy_try_splice src dst =
let copy_with_rsb rsb dst = let copy_with_rsb rsb dst =
try try
while true do while true do
rsb (Low_level.writev dst) rsb (Low_level.writev_single dst)
done done
with End_of_file -> () with End_of_file -> ()

View File

@ -186,6 +186,10 @@ module Low_level : sig
If multiple buffers are given, they are sent in order. If multiple buffers are given, they are sent in order.
It will make multiple OS calls if the OS doesn't write all of it at once. *) It will make multiple OS calls if the OS doesn't write all of it at once. *)
val writev_single : ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int
(** [writev_single] is like [writev] but only performs a single write operation.
It returns the number of bytes written, which may be smaller than the requested amount. *)
val splice : FD.t -> dst:FD.t -> len:int -> int val splice : FD.t -> dst:FD.t -> len:int -> int
(** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst]. (** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst].

View File

@ -89,7 +89,7 @@ Copying from src using `Read_source_buffer`:
Eio_mock.Flow.set_copy_method dst `Read_source_buffer; Eio_mock.Flow.set_copy_method dst `Read_source_buffer;
Eio_mock.Flow.on_copy_bytes dst [`Return 3; `Return 5]; Eio_mock.Flow.on_copy_bytes dst [`Return 3; `Return 5];
Eio.Flow.copy src dst;; Eio.Flow.copy src dst;;
+dst: wrote (rsb) "foo" +dst: wrote (rsb) ["foo"]
+dst: wrote (rsb) "bar" +dst: wrote (rsb) ["bar"]
- : unit = () - : unit = ()
``` ```