mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-07-17 00:01:11 -04:00
Compare commits
2 Commits
24571692f1
...
1493f01779
Author | SHA1 | Date | |
---|---|---|---|
|
1493f01779 | ||
|
38a9d93620 |
@ -473,8 +473,8 @@ let rec await_batch t =
|
|||||||
let copy t flow =
|
let copy t flow =
|
||||||
let rec aux () =
|
let rec aux () =
|
||||||
let iovecs = await_batch t in
|
let iovecs = await_batch t in
|
||||||
Flow.write flow iovecs; (* todo: add a Flow.single_write and use that. *)
|
let wrote = Flow.single_write flow iovecs in
|
||||||
shift t (Cstruct.lenv iovecs);
|
shift t wrote;
|
||||||
aux ()
|
aux ()
|
||||||
in
|
in
|
||||||
try aux ()
|
try aux ()
|
||||||
|
@ -24,7 +24,7 @@ module Pi = struct
|
|||||||
module type SINK = sig
|
module type SINK = sig
|
||||||
type t
|
type t
|
||||||
val copy : t -> src:_ source -> unit
|
val copy : t -> src:_ source -> unit
|
||||||
val write : t -> Cstruct.t list -> unit
|
val single_write : t -> Cstruct.t list -> int
|
||||||
end
|
end
|
||||||
|
|
||||||
module type SHUTDOWN = sig
|
module type SHUTDOWN = sig
|
||||||
@ -130,9 +130,19 @@ let string_source =
|
|||||||
let ops = Pi.source (module String_source) in
|
let ops = Pi.source (module String_source) in
|
||||||
fun s -> Resource.T (String_source.create s, ops)
|
fun s -> Resource.T (String_source.create s, ops)
|
||||||
|
|
||||||
|
let single_write (Resource.T (t, ops)) bufs =
|
||||||
|
let module X = (val (Resource.get ops Sink)) in
|
||||||
|
X.single_write t bufs
|
||||||
|
|
||||||
let write (Resource.T (t, ops)) bufs =
|
let write (Resource.T (t, ops)) bufs =
|
||||||
let module X = (val (Resource.get ops Sink)) in
|
let module X = (val (Resource.get ops Sink)) in
|
||||||
X.write t bufs
|
let rec aux = function
|
||||||
|
| [] -> ()
|
||||||
|
| bufs ->
|
||||||
|
let wrote = X.single_write t bufs in
|
||||||
|
aux (Cstruct.shiftv bufs wrote)
|
||||||
|
in
|
||||||
|
aux bufs
|
||||||
|
|
||||||
let copy src (Resource.T (t, ops)) =
|
let copy src (Resource.T (t, ops)) =
|
||||||
let module X = (val (Resource.get ops Sink)) in
|
let module X = (val (Resource.get ops Sink)) in
|
||||||
@ -153,8 +163,10 @@ module Buffer_sink = struct
|
|||||||
done
|
done
|
||||||
with End_of_file -> ()
|
with End_of_file -> ()
|
||||||
|
|
||||||
let write t bufs =
|
let single_write t bufs =
|
||||||
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs
|
let old_length = Buffer.length t in
|
||||||
|
List.iter (fun buf -> Buffer.add_bytes t (Cstruct.to_bytes buf)) bufs;
|
||||||
|
Buffer.length t - old_length
|
||||||
end
|
end
|
||||||
|
|
||||||
let buffer_sink =
|
let buffer_sink =
|
||||||
|
@ -76,6 +76,9 @@ val write : _ sink -> Cstruct.t list -> unit
|
|||||||
- {!Buf_write} to combine multiple small writes.
|
- {!Buf_write} to combine multiple small writes.
|
||||||
- {!copy} for bulk transfers, as it allows some extra optimizations. *)
|
- {!copy} for bulk transfers, as it allows some extra optimizations. *)
|
||||||
|
|
||||||
|
val single_write : _ sink -> Cstruct.t list -> int
|
||||||
|
(** [single_write dst bufs] writes at least one byte from [bufs] and returns the number of bytes written. *)
|
||||||
|
|
||||||
val copy : _ source -> _ sink -> unit
|
val copy : _ source -> _ sink -> unit
|
||||||
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
|
(** [copy src dst] copies data from [src] to [dst] until end-of-file. *)
|
||||||
|
|
||||||
@ -119,7 +122,7 @@ module Pi : sig
|
|||||||
module type SINK = sig
|
module type SINK = sig
|
||||||
type t
|
type t
|
||||||
val copy : t -> src:_ source -> unit
|
val copy : t -> src:_ source -> unit
|
||||||
val write : t -> Cstruct.t list -> unit
|
val single_write : t -> Cstruct.t list -> int
|
||||||
end
|
end
|
||||||
|
|
||||||
module type SHUTDOWN = sig
|
module type SHUTDOWN = sig
|
||||||
|
@ -37,14 +37,25 @@ module Mock_flow = struct
|
|||||||
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
|
| x :: _ when Cstruct.length x >= len -> [Cstruct.sub x 0 len]
|
||||||
| x :: xs -> x :: takev (len - Cstruct.length x) xs
|
| x :: xs -> x :: takev (len - Cstruct.length x) xs
|
||||||
|
|
||||||
(* Test optimised copying using Read_source_buffer *)
|
let write ~pp t bufs =
|
||||||
let copy_rsb_iovec t src =
|
|
||||||
let size = Handler.run t.on_copy_bytes in
|
let size = Handler.run t.on_copy_bytes in
|
||||||
let len = min (Cstruct.lenv src) size in
|
let len = min (Cstruct.lenv bufs) size in
|
||||||
let bufs = takev len src in
|
let bufs = takev len bufs in
|
||||||
traceln "%s: wrote (rsb) @[<v>%a@]" t.label (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs;
|
traceln "%s: wrote %a" t.label pp bufs;
|
||||||
len
|
len
|
||||||
|
|
||||||
|
let single_write t bufs =
|
||||||
|
let pp f = function
|
||||||
|
| [buf] -> Fmt.pf f "@[<v>%a@]" t.pp (Cstruct.to_string buf)
|
||||||
|
| bufs -> Fmt.pf f "@[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs
|
||||||
|
in
|
||||||
|
write ~pp t bufs
|
||||||
|
|
||||||
|
let copy_rsb_iovec t bufs =
|
||||||
|
let pp f bufs = Fmt.pf f "(rsb) @[<v>%a@]" (Fmt.Dump.list (Fmt.using Cstruct.to_string t.pp)) bufs in
|
||||||
|
write ~pp t bufs
|
||||||
|
|
||||||
|
(* Test optimised copying using Read_source_buffer *)
|
||||||
let copy_rsb t rsb =
|
let copy_rsb t rsb =
|
||||||
try while true do rsb (copy_rsb_iovec t) done
|
try while true do rsb (copy_rsb_iovec t) done
|
||||||
with End_of_file -> ()
|
with End_of_file -> ()
|
||||||
@ -84,9 +95,6 @@ module Mock_flow = struct
|
|||||||
if not (List.exists try_rsb Src.read_methods) then
|
if not (List.exists try_rsb Src.read_methods) then
|
||||||
Fmt.failwith "Source does not offer Read_source_buffer optimisation"
|
Fmt.failwith "Source does not offer Read_source_buffer optimisation"
|
||||||
|
|
||||||
let write t bufs =
|
|
||||||
copy t ~src:(Eio.Flow.cstruct_source bufs)
|
|
||||||
|
|
||||||
let shutdown t cmd =
|
let shutdown t cmd =
|
||||||
traceln "%s: shutdown %s" t.label @@
|
traceln "%s: shutdown %s" t.label @@
|
||||||
match cmd with
|
match cmd with
|
||||||
|
@ -165,7 +165,7 @@ module Flow = struct
|
|||||||
|
|
||||||
let read_methods = []
|
let read_methods = []
|
||||||
|
|
||||||
let write t bufs = Low_level.writev t bufs
|
let single_write t bufs = Low_level.writev_single t bufs
|
||||||
|
|
||||||
let copy t ~src =
|
let copy t ~src =
|
||||||
match Eio_unix.Resource.fd_opt src with
|
match Eio_unix.Resource.fd_opt src with
|
||||||
|
@ -36,7 +36,13 @@ module Impl = struct
|
|||||||
}
|
}
|
||||||
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
||||||
|
|
||||||
let write t bufs =
|
let single_write t bufs =
|
||||||
|
try
|
||||||
|
Low_level.writev t (Array.of_list bufs)
|
||||||
|
with Unix.Unix_error (code, name, arg) ->
|
||||||
|
raise (Err.wrap code name arg)
|
||||||
|
|
||||||
|
let write_all t bufs =
|
||||||
try
|
try
|
||||||
let rec loop = function
|
let rec loop = function
|
||||||
| [] -> ()
|
| [] -> ()
|
||||||
@ -52,7 +58,7 @@ module Impl = struct
|
|||||||
try
|
try
|
||||||
while true do
|
while true do
|
||||||
let got = Eio.Flow.single_read src buf in
|
let got = Eio.Flow.single_read src buf in
|
||||||
write dst [Cstruct.sub buf 0 got]
|
write_all dst [Cstruct.sub buf 0 got]
|
||||||
done
|
done
|
||||||
with End_of_file -> ()
|
with End_of_file -> ()
|
||||||
|
|
||||||
|
@ -36,16 +36,21 @@ module Impl = struct
|
|||||||
}
|
}
|
||||||
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
with Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
||||||
|
|
||||||
let write t bufs =
|
let write_all t bufs =
|
||||||
try Low_level.writev t bufs
|
try Low_level.writev t bufs
|
||||||
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
|
with Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
|
||||||
|
|
||||||
|
(* todo: provide a way to do a single write *)
|
||||||
|
let single_write t bufs =
|
||||||
|
write_all t bufs;
|
||||||
|
Cstruct.lenv bufs
|
||||||
|
|
||||||
let copy dst ~src =
|
let copy dst ~src =
|
||||||
let buf = Cstruct.create 4096 in
|
let buf = Cstruct.create 4096 in
|
||||||
try
|
try
|
||||||
while true do
|
while true do
|
||||||
let got = Eio.Flow.single_read src buf in
|
let got = Eio.Flow.single_read src buf in
|
||||||
write dst [Cstruct.sub buf 0 got]
|
write_all dst [Cstruct.sub buf 0 got]
|
||||||
done
|
done
|
||||||
with End_of_file -> ()
|
with End_of_file -> ()
|
||||||
|
|
||||||
|
@ -8,9 +8,6 @@ open Eio.Std
|
|||||||
module Write = Eio.Buf_write
|
module Write = Eio.Buf_write
|
||||||
|
|
||||||
let flow = Eio_mock.Flow.make "flow"
|
let flow = Eio_mock.Flow.make "flow"
|
||||||
|
|
||||||
let flow_rsb = Eio_mock.Flow.make "flow"
|
|
||||||
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## A simple run-through
|
## A simple run-through
|
||||||
@ -44,12 +41,12 @@ If supported by the flow, we can avoid copying:
|
|||||||
|
|
||||||
```ocaml
|
```ocaml
|
||||||
# Eio_mock.Backend.run @@ fun () ->
|
# Eio_mock.Backend.run @@ fun () ->
|
||||||
Write.with_flow flow_rsb @@ fun w ->
|
Write.with_flow flow @@ fun w ->
|
||||||
Write.string w "Hello";
|
Write.string w "Hello";
|
||||||
Write.char w ' ';
|
Write.char w ' ';
|
||||||
Write.schedule_cstruct w (Cstruct.of_string "world");
|
Write.schedule_cstruct w (Cstruct.of_string "world");
|
||||||
Write.char w '!';;
|
Write.char w '!';;
|
||||||
+flow: wrote (rsb) ["Hello "; "world"; "!"]
|
+flow: wrote ["Hello "; "world"; "!"]
|
||||||
- : unit = ()
|
- : unit = ()
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -78,7 +75,7 @@ With pausing
|
|||||||
Fiber.yield ();
|
Fiber.yield ();
|
||||||
Write.unpause w;
|
Write.unpause w;
|
||||||
Write.string w "world";;
|
Write.string w "world";;
|
||||||
+flow: wrote "Hello... world"
|
+flow: wrote ["Hello... "; "world"]
|
||||||
- : unit = ()
|
- : unit = ()
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -139,13 +136,13 @@ With pausing
|
|||||||
Write.char t 'e'
|
Write.char t 'e'
|
||||||
in
|
in
|
||||||
traceln "With room:";
|
traceln "With room:";
|
||||||
Write.with_flow flow_rsb f;
|
Write.with_flow flow f;
|
||||||
traceln "Without room:";
|
traceln "Without room:";
|
||||||
Write.with_flow ~initial_size:1 flow_rsb f;;
|
Write.with_flow ~initial_size:1 flow f;;
|
||||||
+With room:
|
+With room:
|
||||||
+flow: wrote (rsb) ["testtestte"]
|
+flow: wrote "testtestte"
|
||||||
+Without room:
|
+Without room:
|
||||||
+flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"]
|
+flow: wrote ["te"; "st"; "te"; "st"; "te"]
|
||||||
- : unit = ()
|
- : unit = ()
|
||||||
```
|
```
|
||||||
|
|
||||||
@ -182,32 +179,29 @@ Eio_mock.Flow.on_copy_bytes flow [
|
|||||||
- : unit = ()
|
- : unit = ()
|
||||||
```
|
```
|
||||||
|
|
||||||
Multiple flushes.
|
Multiple flushes:
|
||||||
Note: ideally the flushes here would complete as soon as enough data has been flushed,
|
|
||||||
but currently Eio.Flow.sink doesn't allow short writes and so Buf_write has to wait for
|
|
||||||
the whole batch to be flushed.
|
|
||||||
|
|
||||||
```ocaml
|
```ocaml
|
||||||
# Eio_mock.Backend.run @@ fun () ->
|
# Eio_mock.Backend.run @@ fun () ->
|
||||||
Eio_mock.Flow.on_copy_bytes flow_rsb [
|
Eio_mock.Flow.on_copy_bytes flow [
|
||||||
`Yield_then (`Return 1);
|
`Yield_then (`Return 1);
|
||||||
`Yield_then (`Return 2);
|
`Yield_then (`Return 2);
|
||||||
`Yield_then (`Return 2);
|
`Yield_then (`Return 2);
|
||||||
`Yield_then (`Return 2);
|
`Yield_then (`Return 2);
|
||||||
];
|
];
|
||||||
Write.with_flow flow_rsb @@ fun t ->
|
Write.with_flow flow @@ fun t ->
|
||||||
Fiber.all [
|
Fiber.all [
|
||||||
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
|
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
|
||||||
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
|
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
|
||||||
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
|
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
|
||||||
];
|
];
|
||||||
traceln "Done";;
|
traceln "Done";;
|
||||||
+flow: wrote (rsb) ["a"]
|
+flow: wrote "a"
|
||||||
+flow: wrote (rsb) ["b"; "c"]
|
+flow: wrote ["b"; "c"]
|
||||||
+flow: wrote (rsb) ["d"; "e"]
|
|
||||||
+flow: wrote (rsb) ["f"]
|
|
||||||
+1st flush
|
+1st flush
|
||||||
|
+flow: wrote ["d"; "e"]
|
||||||
+2nd flush
|
+2nd flush
|
||||||
|
+flow: wrote "f"
|
||||||
+3rd flush
|
+3rd flush
|
||||||
+Done
|
+Done
|
||||||
- : unit = ()
|
- : unit = ()
|
||||||
@ -229,7 +223,9 @@ module Slow_writer = struct
|
|||||||
done
|
done
|
||||||
with End_of_file -> ()
|
with End_of_file -> ()
|
||||||
|
|
||||||
let write t bufs = copy t ~src:(Eio.Flow.cstruct_source bufs)
|
let single_write t bufs =
|
||||||
|
copy t ~src:(Eio.Flow.cstruct_source bufs);
|
||||||
|
Cstruct.lenv bufs
|
||||||
end
|
end
|
||||||
let slow_writer =
|
let slow_writer =
|
||||||
let ops = Eio.Flow.Pi.sink (module Slow_writer) in
|
let ops = Eio.Flow.Pi.sink (module Slow_writer) in
|
||||||
@ -261,8 +257,8 @@ let slow_writer =
|
|||||||
Write.schedule_cstruct t (Cstruct.of_string "end");
|
Write.schedule_cstruct t (Cstruct.of_string "end");
|
||||||
Fiber.yield ();
|
Fiber.yield ();
|
||||||
traceln "Should all be flushed by now.";;;
|
traceln "Should all be flushed by now.";;;
|
||||||
+flow: wrote "onetwo"
|
+flow: wrote ["one"; "two"]
|
||||||
+flow: wrote "onetwo"
|
+flow: wrote ["one"; "two"]
|
||||||
+flow: wrote "end"
|
+flow: wrote "end"
|
||||||
+Should all be flushed by now.
|
+Should all be flushed by now.
|
||||||
- : unit = ()
|
- : unit = ()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user