Compare commits

..

No commits in common. "1493f01779ff71448e3962a95781ddbfa4825e90" and "24571692f1f30ef0c0612768c165af7fa162a4a5" have entirely different histories.

8 changed files with 43 additions and 73 deletions

View File

@ -473,8 +473,8 @@ let rec await_batch t =
let copy t flow = let copy t flow =
let rec aux () = let rec aux () =
let iovecs = await_batch t in let iovecs = await_batch t in
let wrote = Flow.single_write flow iovecs in Flow.write flow iovecs; (* todo: add a Flow.single_write and use that. *)
shift t wrote; shift t (Cstruct.lenv iovecs);
aux () aux ()
in in
try aux () try aux ()

View File

@ -24,7 +24,7 @@ module Pi = struct
module type SINK = sig module type SINK = sig
type t type t
val copy : t -> src:_ source -> unit val copy : t -> src:_ source -> unit
val single_write : t -> Cstruct.t list -> int val write : t -> Cstruct.t list -> unit
end end
module type SHUTDOWN = sig module type SHUTDOWN = sig
@ -130,19 +130,9 @@ let string_source =
let ops = Pi.source (module String_source) in let ops = Pi.source (module String_source) in
fun s -> Resource.T (String_source.create s, ops) fun s -> Resource.T (String_source.create s, ops)
let single_write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in
X.single_write t bufs
let write (Resource.T (t, ops)) bufs = let write (Resource.T (t, ops)) bufs =
let module X = (val (Resource.get ops Sink)) in let module X = (val (Resource.get ops Sink)) in
let rec aux = function X.write t bufs
| [] -> ()
| bufs ->
let wrote = X.single_write t bufs in
aux (Cstruct.shiftv bufs wrote)
in
aux bufs
let copy src (Resource.T (t, ops)) = let copy src (Resource.T (t, ops)) =
let module X = (val (Resource.get ops Sink)) in let module X = (val (Resource.get ops Sink)) in
@ -163,10 +153,8 @@ module Buffer_sink = struct
done done
with End_of_file -> () with End_of_file -> ()
let single_write t bufs = let write t bufs =
let old_length = Buffer.length t in List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
Buffer.length t - old_length
end end
let buffer_sink = let buffer_sink =

View File

@ -76,9 +76,6 @@ val write : _ sink -> Cstruct.t list -> unit
- {!Buf_write} to combine multiple small writes. - {!Buf_write} to combine multiple small writes.
- {!copy} for bulk transfers, as it allows some extra optimizations. *) - {!copy} for bulk transfers, as it allows some extra optimizations. *)
val single_write : _ sink -> Cstruct.t list -> int
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
val copy : _ source -> _ sink -> unit val copy : _ source -> _ sink -> unit
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *) (** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
@ -122,7 +119,7 @@ module Pi : sig
module type SINK = sig module type SINK = sig
type t type t
val copy : t -> src:_ source -> unit val copy : t -> src:_ source -> unit
val single_write : t -> Cstruct.t list -> int val write : t -> Cstruct.t list -> unit
end end
module type SHUTDOWN = sig module type SHUTDOWN = sig

View File

@ -37,25 +37,14 @@ module Mock_flow = struct
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len] | x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
| x :: xs -> x :: takev (len - Cstruct.length x) xs | x :: xs -> x :: takev (len - Cstruct.length x) xs
let write ~pp t bufs = (* Test optimised copying using Read_source_buffer *)
let copy_rsb_iovec t src =
let size = Handler.run t.on_copy_bytes in let size = Handler.run t.on_copy_bytes in
let len = min (Cstruct.lenv bufs) size in let len = min (Cstruct.lenv src) size in
let bufs = takev len bufs in let bufs = takev len src in
traceln "%s: wrote %a" t.label pp bufs; traceln "%s: wrote (rsb) @[<v>%a@]" t.label (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs;
len len
let single_write t bufs =
let pp f = function
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
in
write ~pp t bufs
let copy_rsb_iovec t bufs =
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
write ~pp t bufs
(* Test optimised copying using Read_source_buffer *)
let copy_rsb t rsb = let copy_rsb t rsb =
try while true do rsb (copy_rsb_iovec t) done try while true do rsb (copy_rsb_iovec t) done
with End_of_file -> () with End_of_file -> ()
@ -95,6 +84,9 @@ module Mock_flow = struct
if not (List.exists try_rsb Src.read_methods) then if not (List.exists try_rsb Src.read_methods) then
Fmt.failwith "Source does not offer Read_source_buffer optimisation" Fmt.failwith "Source does not offer Read_source_buffer optimisation"
let write t bufs =
copy t ~src:(Eio.Flow.cstruct_source bufs)
let shutdown t cmd = let shutdown t cmd =
traceln "%s: shutdown %s" t.label @@ traceln "%s: shutdown %s" t.label @@
match cmd with match cmd with

View File

@ -165,7 +165,7 @@ module Flow = struct
let read_methods = [] let read_methods = []
let single_write t bufs = Low_level.writev_single t bufs let write t bufs = Low_level.writev t bufs
let copy t ~src = let copy t ~src =
match Eio_unix.Resource.fd_opt src with match Eio_unix.Resource.fd_opt src with

View File

@ -36,13 +36,7 @@ module Impl = struct
} }
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let single_write t bufs = let write t bufs =
try
Low_level.writev t (Array.of_list bufs)
with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg)
let write_all t bufs =
try try
let rec loop = function let rec loop = function
| [] -> () | [] -> ()
@ -58,7 +52,7 @@ module Impl = struct
try try
while true do while true do
let got = Eio.Flow.single_read src buf in let got = Eio.Flow.single_read src buf in
write_all dst [Cstruct.sub buf 0 got] write dst [Cstruct.sub buf 0 got]
done done
with End_of_file -> () with End_of_file -> ()

View File

@ -36,21 +36,16 @@ module Impl = struct
} }
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let write_all t bufs = let write t bufs =
try Low_level.writev t bufs try Low_level.writev t bufs
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
(* todo: provide a way to do a single write *)
let single_write t bufs =
write_all t bufs;
Cstruct.lenv bufs
let copy dst ~src = let copy dst ~src =
let buf = Cstruct.create 4096 in let buf = Cstruct.create 4096 in
try try
while true do while true do
let got = Eio.Flow.single_read src buf in let got = Eio.Flow.single_read src buf in
write_all dst [Cstruct.sub buf 0 got] write dst [Cstruct.sub buf 0 got]
done done
with End_of_file -> () with End_of_file -> ()

View File

@ -8,6 +8,9 @@ open Eio.Std
module Write = Eio.Buf_write module Write = Eio.Buf_write
let flow = Eio_mock.Flow.make "flow" let flow = Eio_mock.Flow.make "flow"
let flow_rsb = Eio_mock.Flow.make "flow"
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
``` ```
## A simple run-through ## A simple run-through
@ -41,12 +44,12 @@ If supported by the flow, we can avoid copying:
```ocaml ```ocaml
# Eio_mock.Backend.run @@ fun () -> # Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun w -> Write.with_flow flow_rsb @@ fun w ->
Write.string w "Hello"; Write.string w "Hello";
Write.char w ' '; Write.char w ' ';
Write.schedule_cstruct w (Cstruct.of_string "world"); Write.schedule_cstruct w (Cstruct.of_string "world");
Write.char w '!';; Write.char w '!';;
+flow: wrote ["Hello "; "world"; "!"] +flow: wrote (rsb) ["Hello "; "world"; "!"]
- : unit = () - : unit = ()
``` ```
@ -75,7 +78,7 @@ With pausing
Fiber.yield (); Fiber.yield ();
Write.unpause w; Write.unpause w;
Write.string w "world";; Write.string w "world";;
+flow: wrote ["Hello... "; "world"] +flow: wrote "Hello... world"
- : unit = () - : unit = ()
``` ```
@ -136,13 +139,13 @@ With pausing
Write.char t 'e' Write.char t 'e'
in in
traceln "With room:"; traceln "With room:";
Write.with_flow flow f; Write.with_flow flow_rsb f;
traceln "Without room:"; traceln "Without room:";
Write.with_flow ~initial_size:1 flow f;; Write.with_flow ~initial_size:1 flow_rsb f;;
+With room: +With room:
+flow: wrote "testtestte" +flow: wrote (rsb) ["testtestte"]
+Without room: +Without room:
+flow: wrote ["te"; "st"; "te"; "st"; "te"] +flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"]
- : unit = () - : unit = ()
``` ```
@ -179,29 +182,32 @@ Eio_mock.Flow.on_copy_bytes flow [
- : unit = () - : unit = ()
``` ```
Multiple flushes: Multiple flushes.
Note: ideally the flushes here would complete as soon as enough data has been flushed,
but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
the whole batch to be flushed.
```ocaml ```ocaml
# Eio_mock.Backend.run @@ fun () -> # Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow [ Eio_mock.Flow.on_copy_bytes flow_rsb [
`Yield_then (`Return 1); `Yield_then (`Return 1);
`Yield_then (`Return 2); `Yield_then (`Return 2);
`Yield_then (`Return 2); `Yield_then (`Return 2);
`Yield_then (`Return 2); `Yield_then (`Return 2);
]; ];
Write.with_flow flow @@ fun t -> Write.with_flow flow_rsb @@ fun t ->
Fiber.all [ Fiber.all [
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush"); (fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush"); (fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush"); (fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
]; ];
traceln "Done";; traceln "Done";;
+flow: wrote "a" +flow: wrote (rsb) ["a"]
+flow: wrote ["b"; "c"] +flow: wrote (rsb) ["b"; "c"]
+flow: wrote (rsb) ["d"; "e"]
+flow: wrote (rsb) ["f"]
+1st flush +1st flush
+flow: wrote ["d"; "e"]
+2nd flush +2nd flush
+flow: wrote "f"
+3rd flush +3rd flush
+Done +Done
- : unit = () - : unit = ()
@ -223,9 +229,7 @@ module Slow_writer = struct
done done
with End_of_file -> () with End_of_file -> ()
let single_write t bufs = let write t bufs = copy t ~src:(Eio.Flow.cstruct_source bufs)
copy t ~src:(Eio.Flow.cstruct_source bufs);
Cstruct.lenv bufs
end end
let slow_writer = let slow_writer =
let ops = Eio.Flow.Pi.sink (module Slow_writer) in let ops = Eio.Flow.Pi.sink (module Slow_writer) in
@ -257,8 +261,8 @@ let slow_writer =
Write.schedule_cstruct t (Cstruct.of_string "end"); Write.schedule_cstruct t (Cstruct.of_string "end");
Fiber.yield (); Fiber.yield ();
traceln "Should all be flushed by now.";;; traceln "Should all be flushed by now.";;;
+flow: wrote ["one"; "two"] +flow: wrote "onetwo"
+flow: wrote ["one"; "two"] +flow: wrote "onetwo"
+flow: wrote "end" +flow: wrote "end"
+Should all be flushed by now. +Should all be flushed by now.
- : unit = () - : unit = ()