Compare commits

..

2 Commits

Author SHA1 Message Date
Thomas Leonard
1493f01779
Merge pull request #598 from talex5/short-writes
Add Flow.single_write
2023-08-12 08:49:57 +01:00
Thomas Leonard
38a9d93620 Add Flow.single_write
`Flow.write` keeps writing until all the data is sent or an error
occurs, but sometimes it's useful to perform only a single write
operation. For example, if `Buf_write` asks to write 5 bytes and only 4
get written, we would previously do another 1-byte write. However, there
may have been more data available by then.

This is also useful for error recovery, if you need to know exactly how
many bytes were successfully written before the error.

Note that the `Buf_write` tests for `Read_source_buffer` were supposed
to test that the target flow read the buffers directly, but since we
started using `write` instead of `copy` it doesn't matter.
2023-08-11 11:17:33 +01:00
8 changed files with 73 additions and 43 deletions

View File

@ -473,8 +473,8 @@ let rec await_batch t =
let copy t flow =
let rec aux () =
let iovecs = await_batch t in
Flow.write flow iovecs; (* todo: add a Flow.single_write and use that. *)
shift t (Cstruct.lenv iovecs);
let wrote = Flow.single_write flow iovecs in
shift t wrote;
aux ()
in
try aux ()

View File

@ -24,7 +24,7 @@ module Pi = struct
module type SINK = sig
type t
val copy : t -> src:_ source -> unit
val write : t -> Cstruct.t list -> unit
val single_write : t -> Cstruct.t list -> int
end
module type SHUTDOWN = sig
@ -130,9 +130,19 @@ let string_source =
let ops = Pi.source (module String_source) in
fun s -> Resource.T (String_source.create s, ops)
let single_write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
X.single_write t bufs
let write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
X.write t bufs
let rec aux = function
| [] -> ()
| bufs ->
let wrote = X.single_write t bufs in
aux (Cstruct.shiftv bufs wrote)
in
aux bufs
let copy src (Resource.T (t, ops)) =
let module X = (val (Resource.get ops Sink)) in
@ -153,8 +163,10 @@ module Buffer_sink = struct
done
with End_of_file -> ()
let write t bufs =
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs
let single_write t bufs =
let old_length = Buffer.length t in
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
Buffer.length t - old_length
end
let buffer_sink =

View File

@ -76,6 +76,9 @@ val write : _ sink -> Cstruct.t list -> unit
- {!Buf_write} to combine multiple small writes.
- {!copy} for bulk transfers, as it allows some extra optimizations. *)
val single_write : _ sink -> Cstruct.t list -> int
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
val copy : _ source -> _ sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
@ -119,7 +122,7 @@ module Pi : sig
module type SINK = sig
type t
val copy : t -> src:_ source -> unit
val write : t -> Cstruct.t list -> unit
val single_write : t -> Cstruct.t list -> int
end
module type SHUTDOWN = sig

View File

@ -37,14 +37,25 @@ module Mock_flow = struct
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs
(* Test optimised copying using Read_source_buffer *)
let copy_rsb_iovec t src =
let write ~pp t bufs =
let size = Handler.run t.on_copy_bytes in
let len = min (Cstruct.lenv src) size in
let bufs = takev len src in
traceln "%s: wrote (rsb) @[<v>%a@]" t.label (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs;
let len = min (Cstruct.lenv bufs) size in
let bufs = takev len bufs in
traceln "%s: wrote %a" t.label pp bufs;
len
let single_write t bufs =
let pp f = function
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
in
write ~pp t bufs
let copy_rsb_iovec t bufs =
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
write ~pp t bufs
(* Test optimised copying using Read_source_buffer *)
let copy_rsb t rsb =
try while true do rsb (copy_rsb_iovec t) done
with End_of_file -> ()
@ -84,9 +95,6 @@ module Mock_flow = struct
if not (List.exists try_rsb Src.read_methods) then
Fmt.failwith "Source does not offer Read_source_buffer optimisation"
let write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs)
let shutdown t cmd =
traceln "%s: shutdown %s" t.label @@
match cmd with

View File

@ -165,7 +165,7 @@ module Flow = struct
let read_methods = []
let write t bufs = Low_level.writev t bufs
let single_write t bufs = Low_level.writev_single t bufs
let copy t ~src =
match Eio_unix.Resource.fd_opt src with

View File

@ -36,7 +36,13 @@ module Impl = struct
}
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let write t bufs =
let single_write t bufs =
try
Low_level.writev t (Array.of_list bufs)
with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg)
let write_all t bufs =
try
let rec loop = function
| [] -> ()
@ -52,7 +58,7 @@ module Impl = struct
try
while true do
let got = Eio.Flow.single_read src buf in
write dst [Cstruct.sub buf 0 got]
write_all dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()

View File

@ -36,16 +36,21 @@ module Impl = struct
}
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let write t bufs =
let write_all t bufs =
try Low_level.writev t bufs
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
(* todo: provide a way to do a single write *)
let single_write t bufs =
write_all t bufs;
Cstruct.lenv bufs
let copy dst ~src =
let buf = Cstruct.create 4096 in
try
while true do
let got = Eio.Flow.single_read src buf in
write dst [Cstruct.sub buf 0 got]
write_all dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()

View File

@ -8,9 +8,6 @@ open Eio.Std
module Write = Eio.Buf_write
let flow = Eio_mock.Flow.make "flow"
let flow_rsb = Eio_mock.Flow.make "flow"
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
```
## A simple run-through
@ -44,12 +41,12 @@ If supported by the flow, we can avoid copying:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow_rsb @@ fun w ->
Write.with_flow flow @@ fun w ->
Write.string w "Hello";
Write.char w ' ';
Write.schedule_cstruct w (Cstruct.of_string "world");
Write.char w '!';;
+flow: wrote (rsb) ["Hello "; "world"; "!"]
+flow: wrote ["Hello "; "world"; "!"]
- : unit = ()
```
@ -78,7 +75,7 @@ With pausing
Fiber.yield ();
Write.unpause w;
Write.string w "world";;
+flow: wrote "Hello... world"
+flow: wrote ["Hello... "; "world"]
- : unit = ()
```
@ -139,13 +136,13 @@ With pausing
Write.char t 'e'
in
traceln "With room:";
Write.with_flow flow_rsb f;
Write.with_flow flow f;
traceln "Without room:";
Write.with_flow ~initial_size:1 flow_rsb f;;
Write.with_flow ~initial_size:1 flow f;;
+With room:
+flow: wrote (rsb) ["testtestte"]
+flow: wrote "testtestte"
+Without room:
+flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"]
+flow: wrote ["te"; "st"; "te"; "st"; "te"]
- : unit = ()
```
@ -182,32 +179,29 @@ Eio_mock.Flow.on_copy_bytes flow [
- : unit = ()
```
Multiple flushes.
Note: ideally the flushes here would complete as soon as enough data has been flushed,
but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
the whole batch to be flushed.
Multiple flushes:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow_rsb [
Eio_mock.Flow.on_copy_bytes flow [
`Yield_then (`Return 1);
`Yield_then (`Return 2);
`Yield_then (`Return 2);
`Yield_then (`Return 2);
];
Write.with_flow flow_rsb @@ fun t ->
Write.with_flow flow @@ fun t ->
Fiber.all [
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
];
traceln "Done";;
+flow: wrote (rsb) ["a"]
+flow: wrote (rsb) ["b"; "c"]
+flow: wrote (rsb) ["d"; "e"]
+flow: wrote (rsb) ["f"]
+flow: wrote "a"
+flow: wrote ["b"; "c"]
+1st flush
+flow: wrote ["d"; "e"]
+2nd flush
+flow: wrote "f"
+3rd flush
+Done
- : unit = ()
@ -229,7 +223,9 @@ module Slow_writer = struct
done
with End_of_file -> ()
let write t bufs = copy t ~src:(Eio.Flow.cstruct_source bufs)
let single_write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs);
Cstruct.lenv bufs
end
let slow_writer =
let ops = Eio.Flow.Pi.sink (module Slow_writer) in
@ -261,8 +257,8 @@ let slow_writer =
Write.schedule_cstruct t (Cstruct.of_string "end");
Fiber.yield ();
traceln "Should all be flushed by now.";;;
+flow: wrote "onetwo"
+flow: wrote "onetwo"
+flow: wrote ["one"; "two"]
+flow: wrote ["one"; "two"]
+flow: wrote "end"
+Should all be flushed by now.
- : unit = ()