Compare commits

..

No commits in common. "1493f01779ff71448e3962a95781ddbfa4825e90" and "24571692f1f30ef0c0612768c165af7fa162a4a5" have entirely different histories.

8 changed files with 43 additions and 73 deletions

View File

@ -473,8 +473,8 @@ let rec await_batch t =
let copy t flow =
let rec aux () =
let iovecs = await_batch t in
let wrote = Flow.single_write flow iovecs in
shift t wrote;
Flow.write flow iovecs; (* todo: add a Flow.single_write and use that. *)
shift t (Cstruct.lenv iovecs);
aux ()
in
try aux ()

View File

@ -24,7 +24,7 @@ module Pi = struct
module type SINK = sig
type t
val copy : t -> src:_ source -> unit
val single_write : t -> Cstruct.t list -> int
val write : t -> Cstruct.t list -> unit
end
module type SHUTDOWN = sig
@ -130,19 +130,9 @@ let string_source =
let ops = Pi.source (module String_source) in
fun s -> Resource.T (String_source.create s, ops)
let single_write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
X.single_write t bufs
let write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
let rec aux = function
| [] -> ()
| bufs ->
let wrote = X.single_write t bufs in
aux (Cstruct.shiftv bufs wrote)
in
aux bufs
X.write t bufs
let copy src (Resource.T (t, ops)) =
let module X = (val (Resource.get ops Sink)) in
@ -163,10 +153,8 @@ module Buffer_sink = struct
done
with End_of_file -> ()
let single_write t bufs =
let old_length = Buffer.length t in
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
Buffer.length t - old_length
let write t bufs =
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs
end
let buffer_sink =

View File

@ -76,9 +76,6 @@ val write : _ sink -> Cstruct.t list -> unit
- {!Buf_write} to combine multiple small writes.
- {!copy} for bulk transfers, as it allows some extra optimizations. *)
val single_write : _ sink -> Cstruct.t list -> int
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
val copy : _ source -> _ sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
@ -122,7 +119,7 @@ module Pi : sig
module type SINK = sig
type t
val copy : t -> src:_ source -> unit
val single_write : t -> Cstruct.t list -> int
val write : t -> Cstruct.t list -> unit
end
module type SHUTDOWN = sig

View File

@ -37,25 +37,14 @@ module Mock_flow = struct
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs
let write ~pp t bufs =
(* Test optimised copying using Read_source_buffer *)
let copy_rsb_iovec t src =
let size = Handler.run t.on_copy_bytes in
let len = min (Cstruct.lenv bufs) size in
let bufs = takev len bufs in
traceln "%s: wrote %a" t.label pp bufs;
let len = min (Cstruct.lenv src) size in
let bufs = takev len src in
traceln "%s: wrote (rsb) @[<v>%a@]" t.label (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs;
len
let single_write t bufs =
let pp f = function
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
in
write ~pp t bufs
let copy_rsb_iovec t bufs =
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
write ~pp t bufs
(* Test optimised copying using Read_source_buffer *)
let copy_rsb t rsb =
try while true do rsb (copy_rsb_iovec t) done
with End_of_file -> ()
@ -95,6 +84,9 @@ module Mock_flow = struct
if not (List.exists try_rsb Src.read_methods) then
Fmt.failwith "Source does not offer Read_source_buffer optimisation"
let write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs)
let shutdown t cmd =
traceln "%s: shutdown %s" t.label @@
match cmd with

View File

@ -165,7 +165,7 @@ module Flow = struct
let read_methods = []
let single_write t bufs = Low_level.writev_single t bufs
let write t bufs = Low_level.writev t bufs
let copy t ~src =
match Eio_unix.Resource.fd_opt src with

View File

@ -36,13 +36,7 @@ module Impl = struct
}
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let single_write t bufs =
try
Low_level.writev t (Array.of_list bufs)
with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg)
let write_all t bufs =
let write t bufs =
try
let rec loop = function
| [] -> ()
@ -58,7 +52,7 @@ module Impl = struct
try
while true do
let got = Eio.Flow.single_read src buf in
write_all dst [Cstruct.sub buf 0 got]
write dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()

View File

@ -36,21 +36,16 @@ module Impl = struct
}
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let write_all t bufs =
let write t bufs =
try Low_level.writev t bufs
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
(* todo: provide a way to do a single write *)
let single_write t bufs =
write_all t bufs;
Cstruct.lenv bufs
let copy dst ~src =
let buf = Cstruct.create 4096 in
try
while true do
let got = Eio.Flow.single_read src buf in
write_all dst [Cstruct.sub buf 0 got]
write dst [Cstruct.sub buf 0 got]
done
with End_of_file -> ()

View File

@ -8,6 +8,9 @@ open Eio.Std
module Write = Eio.Buf_write
let flow = Eio_mock.Flow.make "flow"
let flow_rsb = Eio_mock.Flow.make "flow"
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
```
## A simple run-through
@ -41,12 +44,12 @@ If supported by the flow, we can avoid copying:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun w ->
Write.with_flow flow_rsb @@ fun w ->
Write.string w "Hello";
Write.char w ' ';
Write.schedule_cstruct w (Cstruct.of_string "world");
Write.char w '!';;
+flow: wrote ["Hello "; "world"; "!"]
+flow: wrote (rsb) ["Hello "; "world"; "!"]
- : unit = ()
```
@ -75,7 +78,7 @@ With pausing
Fiber.yield ();
Write.unpause w;
Write.string w "world";;
+flow: wrote ["Hello... "; "world"]
+flow: wrote "Hello... world"
- : unit = ()
```
@ -136,13 +139,13 @@ With pausing
Write.char t 'e'
in
traceln "With room:";
Write.with_flow flow f;
Write.with_flow flow_rsb f;
traceln "Without room:";
Write.with_flow ~initial_size:1 flow f;;
Write.with_flow ~initial_size:1 flow_rsb f;;
+With room:
+flow: wrote "testtestte"
+flow: wrote (rsb) ["testtestte"]
+Without room:
+flow: wrote ["te"; "st"; "te"; "st"; "te"]
+flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"]
- : unit = ()
```
@ -179,29 +182,32 @@ Eio_mock.Flow.on_copy_bytes flow [
- : unit = ()
```
Multiple flushes:
Multiple flushes.
Note: ideally the flushes here would complete as soon as enough data has been flushed,
but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
the whole batch to be flushed.
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow [
Eio_mock.Flow.on_copy_bytes flow_rsb [
`Yield_then (`Return 1);
`Yield_then (`Return 2);
`Yield_then (`Return 2);
`Yield_then (`Return 2);
];
Write.with_flow flow @@ fun t ->
Write.with_flow flow_rsb @@ fun t ->
Fiber.all [
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
];
traceln "Done";;
+flow: wrote "a"
+flow: wrote ["b"; "c"]
+flow: wrote (rsb) ["a"]
+flow: wrote (rsb) ["b"; "c"]
+flow: wrote (rsb) ["d"; "e"]
+flow: wrote (rsb) ["f"]
+1st flush
+flow: wrote ["d"; "e"]
+2nd flush
+flow: wrote "f"
+3rd flush
+Done
- : unit = ()
@ -223,9 +229,7 @@ module Slow_writer = struct
done
with End_of_file -> ()
let single_write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs);
Cstruct.lenv bufs
let write t bufs = copy t ~src:(Eio.Flow.cstruct_source bufs)
end
let slow_writer =
let ops = Eio.Flow.Pi.sink (module Slow_writer) in
@ -257,8 +261,8 @@ let slow_writer =
Write.schedule_cstruct t (Cstruct.of_string "end");
Fiber.yield ();
traceln "Should all be flushed by now.";;;
+flow: wrote ["one"; "two"]
+flow: wrote ["one"; "two"]
+flow: wrote "onetwo"
+flow: wrote "onetwo"
+flow: wrote "end"
+Should all be flushed by now.
- : unit = ()