Compare commits

..

2 Commits

Author SHA1 Message Date
Thomas Leonard
1493f01779
Merge pull request #598 from talex5/short-writes
Add Flow.single_write
2023-08-12 08:49:57 +01:00
Thomas Leonard
38a9d93620 Add Flow.single_write
`Flow.write` keeps writing until all the data is sent or an error
occurs, but sometimes it's useful to perform only a single write
operation. For example, if `Buf_write` asks to write 5 bytes and only 4
get written, we would previously do another 1-byte write. However, there
may have been more data available by then.

This is also useful for error recovery, if you need to know exactly how
many bytes were successfully written before the error.

Note that the `Buf_write` tests for `Read_source_buffer` were supposed
to test that the target flow read the buffers directly, but since we
started using `write` instead of `copy` it doesn't matter.
2023-08-11 11:17:33 +01:00
8 changed files with 73 additions and 43 deletions

View File

@ -473,8 +473,8 @@ let rec await_batch t =
let copy t flow = let copy t flow =
let rec aux () = let rec aux () =
let iovecs = await_batch t in let iovecs = await_batch t in
Flow.write flow iovecs; (* todo: add a Flow.single_write and use that. *) let wrote = Flow.single_write flow iovecs in
shift t (Cstruct.lenv iovecs); shift t wrote;
aux () aux ()
in in
try aux () try aux ()

View File

@ -24,7 +24,7 @@ module Pi = struct
module type SINK = sig module type SINK = sig
type t type t
val copy : t -> src:_ source -> unit val copy : t -> src:_ source -> unit
val write : t -> Cstruct.t list -> unit val single_write : t -> Cstruct.t list -> int
end end
module type SHUTDOWN = sig module type SHUTDOWN = sig
@ -130,9 +130,19 @@ let string_source =
let ops = Pi.source (module String_source) in let ops = Pi.source (module String_source) in
fun s -> Resource.T (String_source.create s, ops) fun s -> Resource.T (String_source.create s, ops)
let single_write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
X.single_write t bufs
let write (Resource.T (t, ops)) bufs = let write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in let module X = (val (Resource.get ops Sink)) in
X.write t bufs let rec aux = function
| [] -> ()
| bufs ->
let wrote = X.single_write t bufs in
aux (Cstruct.shiftv bufs wrote)
in
aux bufs
let copy src (Resource.T (t, ops)) = let copy src (Resource.T (t, ops)) =
let module X = (val (Resource.get ops Sink)) in let module X = (val (Resource.get ops Sink)) in
@ -153,8 +163,10 @@ module Buffer_sink = struct
done done
with End_of_file -> () with End_of_file -> ()
let write t bufs = let single_write t bufs =
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs let old_length = Buffer.length t in
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
Buffer.length t - old_length
end end
let buffer_sink = let buffer_sink =

View File

@ -76,6 +76,9 @@ val write : _ sink -> Cstruct.t list -> unit
- {!Buf_write} to combine multiple small writes. - {!Buf_write} to combine multiple small writes.
- {!copy} for bulk transfers, as it allows some extra optimizations. *) - {!copy} for bulk transfers, as it allows some extra optimizations. *)
val single_write : _ sink -> Cstruct.t list -> int
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
val copy : _ source -> _ sink -> unit val copy : _ source -> _ sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *) (** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
@ -119,7 +122,7 @@ module Pi : sig
module type SINK = sig module type SINK = sig
type t type t
val copy : t -> src:_ source -> unit val copy : t -> src:_ source -> unit
val write : t -> Cstruct.t list -> unit val single_write : t -> Cstruct.t list -> int
end end
module type SHUTDOWN = sig module type SHUTDOWN = sig

View File

@ -37,14 +37,25 @@ module Mock_flow = struct
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len] | x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs | x :: xs -> x :: takev (len - Cstruct.length x) xs
(* Test optimised copying using Read_source_buffer *) let write ~pp t bufs =
let copy_rsb_iovec t src =
let size = Handler.run t.on_copy_bytes in let size = Handler.run t.on_copy_bytes in
let len = min (Cstruct.lenv src) size in let len = min (Cstruct.lenv bufs) size in
let bufs = takev len src in let bufs = takev len bufs in
traceln "%s: wrote (rsb) @[<v>%a@]" t.label (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs; traceln "%s: wrote %a" t.label pp bufs;
len len
let single_write t bufs =
let pp f = function
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
in
write ~pp t bufs
let copy_rsb_iovec t bufs =
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
write ~pp t bufs
(* Test optimised copying using Read_source_buffer *)
let copy_rsb t rsb = let copy_rsb t rsb =
try while true do rsb (copy_rsb_iovec t) done try while true do rsb (copy_rsb_iovec t) done
with End_of_file -> () with End_of_file -> ()
@ -84,9 +95,6 @@ module Mock_flow = struct
if not (List.exists try_rsb Src.read_methods) then if not (List.exists try_rsb Src.read_methods) then
Fmt.failwith "Source does not offer Read_source_buffer optimisation" Fmt.failwith "Source does not offer Read_source_buffer optimisation"
let write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs)
let shutdown t cmd = let shutdown t cmd =
traceln "%s: shutdown %s" t.label @@ traceln "%s: shutdown %s" t.label @@
match cmd with match cmd with

View File

@ -165,7 +165,7 @@ module Flow = struct
let read_methods = [] let read_methods = []
let write t bufs = Low_level.writev t bufs let single_write t bufs = Low_level.writev_single t bufs
let copy t ~src = let copy t ~src =
match Eio_unix.Resource.fd_opt src with match Eio_unix.Resource.fd_opt src with

View File

@ -36,7 +36,13 @@ module Impl = struct
} }
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let write t bufs = let single_write t bufs =
try
Low_level.writev t (Array.of_list bufs)
with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg)
let write_all t bufs =
try try
let rec loop = function let rec loop = function
| [] -> () | [] -> ()
@ -52,7 +58,7 @@ module Impl = struct
try try
while true do while true do
let got = Eio.Flow.single_read src buf in let got = Eio.Flow.single_read src buf in
write dst [Cstruct.sub buf 0 got] write_all dst [Cstruct.sub buf 0 got]
done done
with End_of_file -> () with End_of_file -> ()

View File

@ -36,16 +36,21 @@ module Impl = struct
} }
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let write t bufs = let write_all t bufs =
try Low_level.writev t bufs try Low_level.writev t bufs
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
(* todo: provide a way to do a single write *)
let single_write t bufs =
write_all t bufs;
Cstruct.lenv bufs
let copy dst ~src = let copy dst ~src =
let buf = Cstruct.create 4096 in let buf = Cstruct.create 4096 in
try try
while true do while true do
let got = Eio.Flow.single_read src buf in let got = Eio.Flow.single_read src buf in
write dst [Cstruct.sub buf 0 got] write_all dst [Cstruct.sub buf 0 got]
done done
with End_of_file -> () with End_of_file -> ()

View File

@ -8,9 +8,6 @@ open Eio.Std
module Write = Eio.Buf_write module Write = Eio.Buf_write
let flow = Eio_mock.Flow.make "flow" let flow = Eio_mock.Flow.make "flow"
let flow_rsb = Eio_mock.Flow.make "flow"
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
``` ```
## A simple run-through ## A simple run-through
@ -44,12 +41,12 @@ If supported by the flow, we can avoid copying:
```ocaml ```ocaml
# Eio_mock.Backend.run @@ fun () -> # Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow_rsb @@ fun w -> Write.with_flow flow @@ fun w ->
Write.string w "Hello"; Write.string w "Hello";
Write.char w ' '; Write.char w ' ';
Write.schedule_cstruct w (Cstruct.of_string "world"); Write.schedule_cstruct w (Cstruct.of_string "world");
Write.char w '!';; Write.char w '!';;
+flow: wrote (rsb) ["Hello "; "world"; "!"] +flow: wrote ["Hello "; "world"; "!"]
- : unit = () - : unit = ()
``` ```
@ -78,7 +75,7 @@ With pausing
Fiber.yield (); Fiber.yield ();
Write.unpause w; Write.unpause w;
Write.string w "world";; Write.string w "world";;
+flow: wrote "Hello... world" +flow: wrote ["Hello... "; "world"]
- : unit = () - : unit = ()
``` ```
@ -139,13 +136,13 @@ With pausing
Write.char t 'e' Write.char t 'e'
in in
traceln "With room:"; traceln "With room:";
Write.with_flow flow_rsb f; Write.with_flow flow f;
traceln "Without room:"; traceln "Without room:";
Write.with_flow ~initial_size:1 flow_rsb f;; Write.with_flow ~initial_size:1 flow f;;
+With room: +With room:
+flow: wrote (rsb) ["testtestte"] +flow: wrote "testtestte"
+Without room: +Without room:
+flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"] +flow: wrote ["te"; "st"; "te"; "st"; "te"]
- : unit = () - : unit = ()
``` ```
@ -182,32 +179,29 @@ Eio_mock.Flow.on_copy_bytes flow [
- : unit = () - : unit = ()
``` ```
Multiple flushes. Multiple flushes:
Note: ideally the flushes here would complete as soon as enough data has been flushed,
but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
the whole batch to be flushed.
```ocaml ```ocaml
# Eio_mock.Backend.run @@ fun () -> # Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow_rsb [ Eio_mock.Flow.on_copy_bytes flow [
`Yield_then (`Return 1); `Yield_then (`Return 1);
`Yield_then (`Return 2); `Yield_then (`Return 2);
`Yield_then (`Return 2); `Yield_then (`Return 2);
`Yield_then (`Return 2); `Yield_then (`Return 2);
]; ];
Write.with_flow flow_rsb @@ fun t -> Write.with_flow flow @@ fun t ->
Fiber.all [ Fiber.all [
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush"); (fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush"); (fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush"); (fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
]; ];
traceln "Done";; traceln "Done";;
+flow: wrote (rsb) ["a"] +flow: wrote "a"
+flow: wrote (rsb) ["b"; "c"] +flow: wrote ["b"; "c"]
+flow: wrote (rsb) ["d"; "e"]
+flow: wrote (rsb) ["f"]
+1st flush +1st flush
+flow: wrote ["d"; "e"]
+2nd flush +2nd flush
+flow: wrote "f"
+3rd flush +3rd flush
+Done +Done
- : unit = () - : unit = ()
@ -229,7 +223,9 @@ module Slow_writer = struct
done done
with End_of_file -> () with End_of_file -> ()
let write t bufs = copy t ~src:(Eio.Flow.cstruct_source bufs) let single_write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs);
Cstruct.lenv bufs
end end
let slow_writer = let slow_writer =
let ops = Eio.Flow.Pi.sink (module Slow_writer) in let ops = Eio.Flow.Pi.sink (module Slow_writer) in
@ -261,8 +257,8 @@ let slow_writer =
Write.schedule_cstruct t (Cstruct.of_string "end"); Write.schedule_cstruct t (Cstruct.of_string "end");
Fiber.yield (); Fiber.yield ();
traceln "Should all be flushed by now.";;; traceln "Should all be flushed by now.";;;
+flow: wrote "onetwo" +flow: wrote ["one"; "two"]
+flow: wrote "onetwo" +flow: wrote ["one"; "two"]
+flow: wrote "end" +flow: wrote "end"
+Should all be flushed by now. +Should all be flushed by now.
- : unit = () - : unit = ()