mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-07-17 00:01:11 -04:00
Compare commits
2 Commits
24571692f1
...
1493f01779
Author | SHA1 | Date | |
---|---|---|---|
|
1493f01779 | ||
|
38a9d93620 |
@ -473,8 +473,8 @@ let rec await_batch t =
|
||||
let copy t flow =
|
||||
let rec aux () =
|
||||
let iovecs = await_batch t in
|
||||
Flow.write flow iovecs; (* todo: add a Flow.single_write and use that. *)
|
||||
shift t (Cstruct.lenv iovecs);
|
||||
let wrote = Flow.single_write flow iovecs in
|
||||
shift t wrote;
|
||||
aux ()
|
||||
in
|
||||
try aux ()
|
||||
|
@ -24,7 +24,7 @@ module Pi = struct
|
||||
module type SINK = sig
|
||||
type t
|
||||
val copy : t -> src:_ source -> unit
|
||||
val write : t -> Cstruct.t list -> unit
|
||||
val single_write : t -> Cstruct.t list -> int
|
||||
end
|
||||
|
||||
module type SHUTDOWN = sig
|
||||
@ -130,9 +130,19 @@ let string_source =
|
||||
let ops = Pi.source (module String_source) in
|
||||
fun s -> Resource.T (String_source.create s, ops)
|
||||
|
||||
let single_write (Resource.T (t, ops)) bufs =
|
||||
let module X = (val (Resource.get ops Sink)) in
|
||||
X.single_write t bufs
|
||||
|
||||
let write (Resource.T (t, ops)) bufs =
|
||||
let module X = (val (Resource.get ops Sink)) in
|
||||
X.write t bufs
|
||||
let rec aux = function
|
||||
| [] -> ()
|
||||
| bufs ->
|
||||
let wrote = X.single_write t bufs in
|
||||
aux (Cstruct.shiftv bufs wrote)
|
||||
in
|
||||
aux bufs
|
||||
|
||||
let copy src (Resource.T (t, ops)) =
|
||||
let module X = (val (Resource.get ops Sink)) in
|
||||
@ -153,8 +163,10 @@ module Buffer_sink = struct
|
||||
done
|
||||
with End_of_file -> ()
|
||||
|
||||
let write t bufs =
|
||||
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs
|
||||
let single_write t bufs =
|
||||
let old_length = Buffer.length t in
|
||||
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
|
||||
Buffer.length t - old_length
|
||||
end
|
||||
|
||||
let buffer_sink =
|
||||
|
@ -76,6 +76,9 @@ val write : _ sink -> Cstruct.t list -> unit
|
||||
- {!Buf_write} to combine multiple small writes.
|
||||
- {!copy} for bulk transfers, as it allows some extra optimizations. *)
|
||||
|
||||
val single_write : _ sink -> Cstruct.t list -> int
|
||||
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
|
||||
|
||||
val copy : _ source -> _ sink -> unit
|
||||
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
|
||||
|
||||
@ -119,7 +122,7 @@ module Pi : sig
|
||||
module type SINK = sig
|
||||
type t
|
||||
val copy : t -> src:_ source -> unit
|
||||
val write : t -> Cstruct.t list -> unit
|
||||
val single_write : t -> Cstruct.t list -> int
|
||||
end
|
||||
|
||||
module type SHUTDOWN = sig
|
||||
|
@ -37,14 +37,25 @@ module Mock_flow = struct
|
||||
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
|
||||
| x :: xs -> x :: takev (len - Cstruct.length x) xs
|
||||
|
||||
(* Test optimised copying using Read_source_buffer *)
|
||||
let copy_rsb_iovec t src =
|
||||
let write ~pp t bufs =
|
||||
let size = Handler.run t.on_copy_bytes in
|
||||
let len = min (Cstruct.lenv src) size in
|
||||
let bufs = takev len src in
|
||||
traceln "%s: wrote (rsb) @[<v>%a@]" t.label (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs;
|
||||
let len = min (Cstruct.lenv bufs) size in
|
||||
let bufs = takev len bufs in
|
||||
traceln "%s: wrote %a" t.label pp bufs;
|
||||
len
|
||||
|
||||
let single_write t bufs =
|
||||
let pp f = function
|
||||
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
|
||||
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
|
||||
in
|
||||
write ~pp t bufs
|
||||
|
||||
let copy_rsb_iovec t bufs =
|
||||
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
|
||||
write ~pp t bufs
|
||||
|
||||
(* Test optimised copying using Read_source_buffer *)
|
||||
let copy_rsb t rsb =
|
||||
try while true do rsb (copy_rsb_iovec t) done
|
||||
with End_of_file -> ()
|
||||
@ -84,9 +95,6 @@ module Mock_flow = struct
|
||||
if not (List.exists try_rsb Src.read_methods) then
|
||||
Fmt.failwith "Source does not offer Read_source_buffer optimisation"
|
||||
|
||||
let write t bufs =
|
||||
copy t ~src:(Eio.Flow.cstruct_source bufs)
|
||||
|
||||
let shutdown t cmd =
|
||||
traceln "%s: shutdown %s" t.label @@
|
||||
match cmd with
|
||||
|
@ -165,7 +165,7 @@ module Flow = struct
|
||||
|
||||
let read_methods = []
|
||||
|
||||
let write t bufs = Low_level.writev t bufs
|
||||
let single_write t bufs = Low_level.writev_single t bufs
|
||||
|
||||
let copy t ~src =
|
||||
match Eio_unix.Resource.fd_opt src with
|
||||
|
@ -36,7 +36,13 @@ module Impl = struct
|
||||
}
|
||||
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
||||
|
||||
let write t bufs =
|
||||
let single_write t bufs =
|
||||
try
|
||||
Low_level.writev t (Array.of_list bufs)
|
||||
with Unix.Unix_error (code, name, arg) ->
|
||||
raise (Err.wrap code name arg)
|
||||
|
||||
let write_all t bufs =
|
||||
try
|
||||
let rec loop = function
|
||||
| [] -> ()
|
||||
@ -52,7 +58,7 @@ module Impl = struct
|
||||
try
|
||||
while true do
|
||||
let got = Eio.Flow.single_read src buf in
|
||||
write dst [Cstruct.sub buf 0 got]
|
||||
write_all dst [Cstruct.sub buf 0 got]
|
||||
done
|
||||
with End_of_file -> ()
|
||||
|
||||
|
@ -36,16 +36,21 @@ module Impl = struct
|
||||
}
|
||||
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
||||
|
||||
let write t bufs =
|
||||
let write_all t bufs =
|
||||
try Low_level.writev t bufs
|
||||
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
|
||||
|
||||
(* todo: provide a way to do a single write *)
|
||||
let single_write t bufs =
|
||||
write_all t bufs;
|
||||
Cstruct.lenv bufs
|
||||
|
||||
let copy dst ~src =
|
||||
let buf = Cstruct.create 4096 in
|
||||
try
|
||||
while true do
|
||||
let got = Eio.Flow.single_read src buf in
|
||||
write dst [Cstruct.sub buf 0 got]
|
||||
write_all dst [Cstruct.sub buf 0 got]
|
||||
done
|
||||
with End_of_file -> ()
|
||||
|
||||
|
@ -8,9 +8,6 @@ open Eio.Std
|
||||
module Write = Eio.Buf_write
|
||||
|
||||
let flow = Eio_mock.Flow.make "flow"
|
||||
|
||||
let flow_rsb = Eio_mock.Flow.make "flow"
|
||||
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
|
||||
```
|
||||
|
||||
## A simple run-through
|
||||
@ -44,12 +41,12 @@ If supported by the flow, we can avoid copying:
|
||||
|
||||
```ocaml
|
||||
# Eio_mock.Backend.run @@ fun () ->
|
||||
Write.with_flow flow_rsb @@ fun w ->
|
||||
Write.with_flow flow @@ fun w ->
|
||||
Write.string w "Hello";
|
||||
Write.char w ' ';
|
||||
Write.schedule_cstruct w (Cstruct.of_string "world");
|
||||
Write.char w '!';;
|
||||
+flow: wrote (rsb) ["Hello "; "world"; "!"]
|
||||
+flow: wrote ["Hello "; "world"; "!"]
|
||||
- : unit = ()
|
||||
```
|
||||
|
||||
@ -78,7 +75,7 @@ With pausing
|
||||
Fiber.yield ();
|
||||
Write.unpause w;
|
||||
Write.string w "world";;
|
||||
+flow: wrote "Hello... world"
|
||||
+flow: wrote ["Hello... "; "world"]
|
||||
- : unit = ()
|
||||
```
|
||||
|
||||
@ -139,13 +136,13 @@ With pausing
|
||||
Write.char t 'e'
|
||||
in
|
||||
traceln "With room:";
|
||||
Write.with_flow flow_rsb f;
|
||||
Write.with_flow flow f;
|
||||
traceln "Without room:";
|
||||
Write.with_flow ~initial_size:1 flow_rsb f;;
|
||||
Write.with_flow ~initial_size:1 flow f;;
|
||||
+With room:
|
||||
+flow: wrote (rsb) ["testtestte"]
|
||||
+flow: wrote "testtestte"
|
||||
+Without room:
|
||||
+flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"]
|
||||
+flow: wrote ["te"; "st"; "te"; "st"; "te"]
|
||||
- : unit = ()
|
||||
```
|
||||
|
||||
@ -182,32 +179,29 @@ Eio_mock.Flow.on_copy_bytes flow [
|
||||
- : unit = ()
|
||||
```
|
||||
|
||||
Multiple flushes.
|
||||
Note: ideally the flushes here would complete as soon as enough data has been flushed,
|
||||
but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
|
||||
the whole batch to be flushed.
|
||||
Multiple flushes:
|
||||
|
||||
```ocaml
|
||||
# Eio_mock.Backend.run @@ fun () ->
|
||||
Eio_mock.Flow.on_copy_bytes flow_rsb [
|
||||
Eio_mock.Flow.on_copy_bytes flow [
|
||||
`Yield_then (`Return 1);
|
||||
`Yield_then (`Return 2);
|
||||
`Yield_then (`Return 2);
|
||||
`Yield_then (`Return 2);
|
||||
];
|
||||
Write.with_flow flow_rsb @@ fun t ->
|
||||
Write.with_flow flow @@ fun t ->
|
||||
Fiber.all [
|
||||
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
|
||||
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
|
||||
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
|
||||
];
|
||||
traceln "Done";;
|
||||
+flow: wrote (rsb) ["a"]
|
||||
+flow: wrote (rsb) ["b"; "c"]
|
||||
+flow: wrote (rsb) ["d"; "e"]
|
||||
+flow: wrote (rsb) ["f"]
|
||||
+flow: wrote "a"
|
||||
+flow: wrote ["b"; "c"]
|
||||
+1st flush
|
||||
+flow: wrote ["d"; "e"]
|
||||
+2nd flush
|
||||
+flow: wrote "f"
|
||||
+3rd flush
|
||||
+Done
|
||||
- : unit = ()
|
||||
@ -229,7 +223,9 @@ module Slow_writer = struct
|
||||
done
|
||||
with End_of_file -> ()
|
||||
|
||||
let write t bufs = copy t ~src:(Eio.Flow.cstruct_source bufs)
|
||||
let single_write t bufs =
|
||||
copy t ~src:(Eio.Flow.cstruct_source bufs);
|
||||
Cstruct.lenv bufs
|
||||
end
|
||||
let slow_writer =
|
||||
let ops = Eio.Flow.Pi.sink (module Slow_writer) in
|
||||
@ -261,8 +257,8 @@ let slow_writer =
|
||||
Write.schedule_cstruct t (Cstruct.of_string "end");
|
||||
Fiber.yield ();
|
||||
traceln "Should all be flushed by now.";;;
|
||||
+flow: wrote "onetwo"
|
||||
+flow: wrote "onetwo"
|
||||
+flow: wrote ["one"; "two"]
|
||||
+flow: wrote ["one"; "two"]
|
||||
+flow: wrote "end"
|
||||
+Should all be flushed by now.
|
||||
- : unit = ()
|
||||
|
Loading…
x
Reference in New Issue
Block a user