Compare commits

...

3 Commits

Author SHA1 Message Date
Thomas Leonard
b942dde3b3
Merge pull request #663 from talex5/flow-buf
Optimise Flow.copy with Buf_read.as_flow
2024-01-02 10:29:15 +00:00
Thomas Leonard
19c43d7153 Optimise Flow.copy with Buf_read.as_flow
By default, Flow.copy creates a 4KB buffer and copies the data through
that. However, if the source of the copy is a buffered reader then it is
much more efficient to use its buffer directly.

This updates the flow you get from `Buf_read.as_flow` to offer this
optimisation, and also updates the eio_posix backend's flow to use this
optimisation when available (eio_linux already supported this).

Detected in a benchmark by Leandro Ostera.
2024-01-02 10:20:42 +00:00
Thomas Leonard
fbe8a71cb8 Add Flow.copy benchmark 2024-01-02 10:15:57 +00:00
4 changed files with 68 additions and 2 deletions

44
bench/bench_copy.ml Normal file
View File

@ -0,0 +1,44 @@
(* A client opens a connection to an echo service and sends a load of data via it. *)
open Eio.Std
let chunk_size = 1 lsl 16
let n_chunks = 10000
let n_bytes = n_chunks * chunk_size
let run_client sock =
Fiber.both
(fun () ->
let chunk = Cstruct.create chunk_size in
for _ = 1 to n_chunks do
Eio.Flow.write sock [chunk]
done;
Eio.Flow.shutdown sock `Send
)
(fun () ->
let chunk = Cstruct.create chunk_size in
for _ = 1 to n_chunks do
Eio.Flow.read_exact sock chunk
done
)
let time name service =
Switch.run @@ fun sw ->
let client_sock, server_sock = Eio_unix.Net.socketpair_stream ~sw () in
let t0 = Unix.gettimeofday () in
Fiber.both
(fun () -> service server_sock)
(fun () -> run_client client_sock);
let t1 = Unix.gettimeofday () in
let time = t1 -. t0 in
let bytes_per_second = float n_bytes /. time in
traceln "%s: %.2f MB/s" name (bytes_per_second /. 1024. /. 1024.);
Metric.create name (`Float bytes_per_second) "bytes/s" (name ^ " Flow.copy")
let run _env =
[
time "default" (fun sock -> Eio.Flow.copy sock sock);
time "buf_read" (fun sock ->
let r = Eio.Buf_read.of_flow sock ~initial_size:(64 * 1024) ~max_size:(64 * 1024) |> Eio.Buf_read.as_flow in
Eio.Flow.copy r sock);
]

View File

@ -11,6 +11,7 @@ let benchmarks = [
"Eio_unix.Fd", Bench_fd.run; "Eio_unix.Fd", Bench_fd.run;
"File.stat", Bench_fstat.run; "File.stat", Bench_fstat.run;
"Path.stat", Bench_stat.run; "Path.stat", Bench_stat.run;
"Flow.copy", Bench_copy.run;
] ]
let usage_error () = let usage_error () =

View File

@ -140,7 +140,13 @@ module F = struct
consume t len; consume t len;
len len
let read_methods = [] let rsb t fn =
ensure t 1;
let data = peek t in
let sent = fn [data] in
consume t sent
let read_methods = [Flow.Read_source_buffer rsb]
end end
let as_flow = let as_flow =

View File

@ -64,7 +64,22 @@ module Impl = struct
with Unix.Unix_error (code, name, arg) -> with Unix.Unix_error (code, name, arg) ->
raise (Err.wrap code name arg) raise (Err.wrap code name arg)
let copy t ~src = Eio.Flow.Pi.simple_copy ~single_write t ~src (* Copy using the [Read_source_buffer] optimisation.
Avoids a copy if the source already has the data. *)
let copy_with_rsb rsb dst =
try
while true do rsb (single_write dst) done
with End_of_file -> ()
let copy t ~src =
let Eio.Resource.T (src_t, ops) = src in
let module Src = (val (Eio.Resource.get ops Eio.Flow.Pi.Source)) in
let rec aux = function
| Eio.Flow.Read_source_buffer rsb :: _ -> copy_with_rsb (rsb src_t) t
| _ :: xs -> aux xs
| [] -> Eio.Flow.Pi.simple_copy ~single_write t ~src
in
aux Src.read_methods
let single_read t buf = let single_read t buf =
match Low_level.readv t [| buf |] with match Low_level.readv t [| buf |] with