Add Buf_write.with_flow

`with_flow` wraps an Eio sink, flushing to it automatically when the writing fiber blocks.
This is intended to be then normal way of using `Buf_write` from Eio.

Some other changes to Faraday:

- I replaced `yield` with `pause` and `unpause`, as it was unclear to me what it should do.
- `flush` now waits for the flush to complete instead of taking a callback (since we now have effects).
- `operation` is now `await_batch`. It now waits until data is available to write instead of returning `Yield`.
- `shift_buffers` and `shift_flushes` are now tail-recursive.
- Fixed overflow bug in `shift_flushes`.
- Removed `write_` prefix from write functions, as it's now in the module name.
- `string` and `bytes` now check the range is valid.
- `serialize` no longer hangs if it gets receives `Closed.
This commit is contained in:
Thomas Leonard 2022-06-22 10:00:03 +01:00
parent 930325258c
commit 73ec23c3f9
9 changed files with 833 additions and 322 deletions

View File

@ -24,7 +24,8 @@ Eio replaces existing concurrency libraries such as Lwt
* [Performance](#performance)
* [Networking](#networking)
* [Design Note: Capabilities](#design-note-capabilities)
* [Buffering and Parsing](#buffering-and-parsing)
* [Buffered Reading and Parsing](#buffered-reading-and-parsing)
* [Buffered Writing](#buffered-writing)
* [Filesystem Access](#filesystem-access)
* [Time](#time)
* [Multicore Support](#multicore-support)
@ -580,7 +581,7 @@ However, it still makes non-malicious code easier to understand and test
and may allow for an extension to the language in the future.
See [Emily][] for a previous attempt at this.
## Buffering and Parsing
## Buffered Reading and Parsing
Reading from an Eio flow directly may give you more or less data than you wanted.
For example, if you want to read a line of text from a TCP stream,
@ -658,6 +659,53 @@ let message =
- : unit = ()
```
## Buffered Writing
For performance, it's often useful to batch up writes and send them all in one go.
For example, consider sending an HTTP response without buffering:
```ocaml
let send_response socket =
Eio.Flow.copy_string "200 OK\r\n" socket;
Eio.Flow.copy_string "\r\n" socket;
Fiber.yield (); (* Simulate waiting for the body *)
Eio.Flow.copy_string "Body data" socket
```
```ocaml
# Eio_main.run @@ fun _ ->
send_response (Eio_mock.Flow.make "socket");;
+socket: wrote "200 OK\r\n"
+socket: wrote "\r\n"
+socket: wrote "Body data"
- : unit = ()
```
The socket received three writes, perhaps sending three separate packets over the network.
We can wrap a flow with [Eio.Buf_write][] to avoid this:
```ocaml
module Write = Eio.Buf_write
let send_response socket =
Write.with_flow socket @@ fun w ->
Write.string w "200 OK\r\n";
Write.string w "\r\n";
Fiber.yield (); (* Simulate waiting for the body *)
Write.string w "Body data"
```
```ocaml
# Eio_main.run @@ fun _ ->
send_response (Eio_mock.Flow.make "socket");;
+socket: wrote "200 OK\r\n"
+ "\r\n"
+socket: wrote "Body data"
- : unit = ()
```
Now the first two writes were combined and sent together.
## Filesystem Access
Access to the [filesystem][Eio.Dir] is controlled by capabilities, and `env` provides two:
@ -1198,6 +1246,7 @@ Some background about the effects system can be found in:
[Eio.Switch]: https://ocaml-multicore.github.io/eio/eio/Eio/Switch/index.html
[Eio.Net]: https://ocaml-multicore.github.io/eio/eio/Eio/Net/index.html
[Eio.Buf_read]: https://ocaml-multicore.github.io/eio/eio/Eio/Buf_read/index.html
[Eio.Buf_write]: https://ocaml-multicore.github.io/eio/eio/Eio/Buf_write/index.html
[Eio.Dir]: https://ocaml-multicore.github.io/eio/eio/Eio/Dir/index.html
[Eio.Time]: https://ocaml-multicore.github.io/eio/eio/Eio/Time/index.html
[Eio.Domain_manager]: https://ocaml-multicore.github.io/eio/eio/Eio/Domain_manager/index.html

View File

@ -1,4 +1,4 @@
(test
(tests
(package eio)
(libraries cstruct crowbar fmt astring eio)
(name test))
(libraries cstruct crowbar fmt astring eio eio.mock)
(names fuzz_buf_read fuzz_buf_write))

49
fuzz/fuzz_buf_write.ml Normal file
View File

@ -0,0 +1,49 @@
(* Run a random sequence of write operations on an [Eio.Buf_write].
Check that the expected data gets written to the flow. *)
module W = Eio.Buf_write
let initial_size = 10
type op = Op : string * (W.t -> unit) -> op (* Expected string, writer *)
let cstruct =
Crowbar.(map [bytes; int; int]) (fun s off len ->
if String.length s = 0 then Cstruct.empty
else (
let off = min (abs off) (String.length s) in
let len = min (abs len) (String.length s - off) in
Cstruct.of_string s ~off ~len
)
)
let op =
let label (name, gen) = Crowbar.with_printer (fun f (Op (s, _)) -> Fmt.pf f "%s:%S" name s) gen in
Crowbar.choose @@ List.map label [
"string", Crowbar.(map [bytes]) (fun s -> Op (s, (fun t -> W.string t s)));
"cstruct", Crowbar.(map [cstruct]) (fun cs -> Op (Cstruct.to_string cs, (fun t -> W.cstruct t cs)));
"schedule_cstruct", Crowbar.(map [cstruct]) (fun cs -> Op (Cstruct.to_string cs, (fun t -> W.schedule_cstruct t cs)));
"yield", Crowbar.const @@ Op ("", (fun _ -> Eio.Fiber.yield ()));
"flush", Crowbar.const @@ Op ("", W.flush);
"pause", Crowbar.const @@ Op ("", W.pause);
"unpause", Crowbar.const @@ Op ("", W.unpause);
]
let random ops close =
Eio_mock.Backend.run @@ fun _ ->
let b = Buffer.create 100 in
let flow = Eio.Flow.buffer_sink b in
let expected = ref [] in
W.with_flow flow ~initial_size (fun t ->
let perform (Op (s, write)) =
expected := s :: !expected;
write t
in
List.iter perform ops;
if close then W.close t
);
let expected = String.concat "" (List.rev !expected) in
Crowbar.check_eq ~pp:Fmt.Dump.string (Buffer.contents b) expected
let () =
Crowbar.(add_test ~name:"random ops" [list op; bool] random)

0
fuzz/fuzz_buf_write.mli Normal file
View File

View File

@ -1,4 +1,7 @@
(*----------------------------------------------------------------------------
(* This module is based on code from Faraday (0.7.2), which had the following
license:
----------------------------------------------------------------------------
Copyright (c) 2016 Inhabited Type LLC.
All rights reserved.
@ -42,14 +45,29 @@ module Deque(T:sig type t val sentinel : t end) : sig
type t
val create : int -> t
(* [t = create n] creates a new deque with initial capacity [n].
[to_list t = []] *)
val is_empty : t -> bool
(* [is_empty t = (to_list t = []) *)
val enqueue : elem -> t -> unit
val dequeue_exn : t -> elem
val enqueue_front : elem -> t -> unit
(* [enqueue elem t]
val map_to_list : t -> f:(elem -> 'b) -> 'b list
[to_list t'] = to_list t @ [elem] *)
val dequeue_exn : t -> elem
(* [dequeue_exn t = List.hd (to_list t)]
[to_list t' = List.tl (to_list t)] *)
val enqueue_front : elem -> t -> unit
(* [enqueue_front elem t]
to_list t' = elem :: to_list t *)
val to_list : t -> elem list
end = struct
type elem = T.t
@ -107,10 +125,10 @@ end = struct
t.front <- t.front - 1;
t.elements.(t.front) <- e
let map_to_list t ~f =
let to_list t =
let result = ref [] in
for i = t.back - 1 downto t.front do
result := f t.elements.(i) :: !result
result := t.elements.(i) :: !result
done;
!result
end
@ -119,35 +137,34 @@ module Buffers = Deque(struct
type t = Cstruct.t
let sentinel =
let deadbeef = "\222\173\190\239" in
let len = String.length deadbeef in
let buffer = Bigstringaf.create len in
String.iteri (Bigstringaf.unsafe_set buffer) deadbeef;
Cstruct.of_bigarray buffer ~len
Cstruct.of_string deadbeef
end)
module Flushes = Deque(struct
type t = int * (unit -> unit)
let sentinel = 0, fun () -> ()
end)
type state =
| Active
| Paused
| Closed
type t =
{ mutable buffer : bigstring
; mutable scheduled_pos : int
; mutable write_pos : int
; mutable scheduled_pos : int (* How much of [buffer] is in [scheduled] *)
; mutable write_pos : int (* How much of [buffer] has been written to *)
; scheduled : Buffers.t
; flushed : Flushes.t
; mutable bytes_received : int
; mutable bytes_written : int
; mutable closed : bool
; mutable yield : bool
; mutable bytes_received : int (* Total scheduled bytes. Wraps. *)
; mutable bytes_written : int (* Total written bytes. Wraps. *)
; mutable state : state
; mutable wake_writer : unit -> unit
; id : Ctf.id
}
(* Invariant: [write_pos >= scheduled_pos] *)
type operation = [
| `Writev of Cstruct.t list
| `Yield
| `Close
]
let of_bigstring buffer =
let of_buffer buffer =
{ buffer
; write_pos = 0
; scheduled_pos = 0
@ -155,47 +172,55 @@ let of_bigstring buffer =
; flushed = Flushes.create 1
; bytes_received = 0
; bytes_written = 0
; closed = false
; yield = false }
; state = Active
; wake_writer = ignore
; id = Ctf.mint_id ()
}
let create size =
of_bigstring (Bigstringaf.create size)
of_buffer (Bigstringaf.create size)
let writable_exn t =
if t.closed then
match t.state with
| Active | Paused -> ()
| Closed ->
failwith "cannot write to closed writer"
let schedule_iovec t ?(off=0) ~len buffer =
t.bytes_received <- t.bytes_received + len;
Buffers.enqueue (Cstruct.of_bigarray buffer ~off ~len) t.scheduled
let wake_writer t =
match t.state with
| Paused -> ()
| Active | Closed ->
let wake = t.wake_writer in
if wake != ignore then (
t.wake_writer <- ignore;
wake ()
)
(* Schedule [cs] now, without any checks. Users use {!schedule_cstruct} instead. *)
let schedule_iovec t cs =
t.bytes_received <- t.bytes_received + Cstruct.length cs;
Buffers.enqueue cs t.scheduled
(* Schedule all pending data in [buffer]. *)
let flush_buffer t =
let len = t.write_pos - t.scheduled_pos in
if len > 0 then begin
let off = t.scheduled_pos in
schedule_iovec t ~off ~len t.buffer;
schedule_iovec t (Cstruct.of_bigarray ~off ~len t.buffer);
t.scheduled_pos <- t.write_pos
end
let flush t f =
t.yield <- false;
flush_buffer t;
if Buffers.is_empty t.scheduled then f ()
else Flushes.enqueue (t.bytes_received, f) t.flushed
let free_bytes_in_buffer t =
let buf_len = Bigstringaf.length t.buffer in
buf_len - t.write_pos
let schedule_bigstring t ?(off=0) ?len a =
let schedule_cstruct t cs =
writable_exn t;
flush_buffer t;
let len =
match len with
| None -> Bigstringaf.length a - off
| Some len -> len
in
if len > 0 then schedule_iovec t ~off ~len a
if Cstruct.length cs > 0 then (
schedule_iovec t cs;
wake_writer t;
)
let ensure_space t len =
if free_bytes_in_buffer t < len then begin
@ -205,132 +230,143 @@ let ensure_space t len =
t.scheduled_pos <- 0
end
let write_gen t ~length ~blit ?(off=0) ?len a =
let advance_pos t n =
t.write_pos <- t.write_pos + n;
wake_writer t
let write_gen t ~blit ~off ~len a =
writable_exn t;
let len =
match len with
| None -> length a - off
| Some len -> len
in
ensure_space t len;
blit a ~src_off:off t.buffer ~dst_off:t.write_pos ~len;
t.write_pos <- t.write_pos + len
advance_pos t len
let write_string =
let length = String.length in
let blit = Bigstringaf.unsafe_blit_from_string in
fun t ?off ?len a -> write_gen t ~length ~blit ?off ?len a
let string =
let blit = Bigstringaf.blit_from_string in
fun t ?(off=0) ?len a ->
let len =
match len with
| None -> String.length a - off
| Some len -> len
in
write_gen t ~blit ~off ~len a
let write_bytes =
let length = Bytes.length in
let blit = Bigstringaf.unsafe_blit_from_bytes in
fun t ?off ?len a -> write_gen t ~length ~blit ?off ?len a
let bytes =
let blit = Bigstringaf.blit_from_bytes in
fun t ?(off=0) ?len a ->
let len =
match len with
| None -> Bytes.length a - off
| Some len -> len
in
write_gen t ~blit ~off ~len a
let write_bigstring =
let length = Bigstringaf.length in
let blit = Bigstringaf.unsafe_blit in
fun t ?off ?len a -> write_gen t ~length ~blit ?off ?len a
let cstruct t { Cstruct.buffer; off; len } =
write_gen t ~off ~len buffer
~blit:Bigstringaf.unsafe_blit
let write_char t c =
let char t c =
writable_exn t;
ensure_space t 1;
Bigstringaf.unsafe_set t.buffer t.write_pos c;
t.write_pos <- t.write_pos + 1
advance_pos t 1
let write_uint8 t b =
let uint8 t b =
writable_exn t;
ensure_space t 1;
Bigstringaf.unsafe_set t.buffer t.write_pos (Char.unsafe_chr b);
t.write_pos <- t.write_pos + 1
advance_pos t 1
module BE = struct
let write_uint16 t i =
let uint16 t i =
writable_exn t;
ensure_space t 2;
Bigstringaf.unsafe_set_int16_be t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 2
advance_pos t 2
let write_uint32 t i =
let uint32 t i =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_be t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 4
advance_pos t 4
let write_uint48 t i =
let uint48 t i =
writable_exn t;
ensure_space t 6;
Bigstringaf.unsafe_set_int32_be t.buffer t.write_pos
Int64.(to_int32 (shift_right_logical i 4));
Bigstringaf.unsafe_set_int16_be t.buffer (t.write_pos + 2)
Int64.(to_int i);
t.write_pos <- t.write_pos + 6
advance_pos t 6
let write_uint64 t i =
let uint64 t i =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_be t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 8
advance_pos t 8
let write_float t f =
let float t f =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_be t.buffer t.write_pos (Int32.bits_of_float f);
t.write_pos <- t.write_pos + 4
advance_pos t 4
let write_double t d =
let double t d =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_be t.buffer t.write_pos (Int64.bits_of_float d);
t.write_pos <- t.write_pos + 8
advance_pos t 8
end
module LE = struct
let write_uint16 t i =
let uint16 t i =
writable_exn t;
ensure_space t 2;
Bigstringaf.unsafe_set_int16_le t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 2
advance_pos t 2
let write_uint32 t i =
let uint32 t i =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_le t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 4
advance_pos t 4
let write_uint48 t i =
let uint48 t i =
writable_exn t;
ensure_space t 6;
Bigstringaf.unsafe_set_int16_le t.buffer t.write_pos
Int64.(to_int i);
Bigstringaf.unsafe_set_int32_le t.buffer (t.write_pos + 2)
Int64.(to_int32 (shift_right_logical i 2));
t.write_pos <- t.write_pos + 6
advance_pos t 6
let write_uint64 t i =
let uint64 t i =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_le t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 8
advance_pos t 8
let write_float t f =
let float t f =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_le t.buffer t.write_pos (Int32.bits_of_float f);
t.write_pos <- t.write_pos + 4
advance_pos t 4
let write_double t d =
let double t d =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_le t.buffer t.write_pos (Int64.bits_of_float d);
t.write_pos <- t.write_pos + 8
advance_pos t 8
end
let close t =
t.closed <- true;
flush_buffer t
t.state <- Closed;
flush_buffer t;
wake_writer t
let is_closed t =
t.closed
match t.state with
| Closed -> true
| Active | Paused -> false
let pending_bytes t =
(t.write_pos - t.scheduled_pos) + (t.bytes_received - t.bytes_written)
@ -338,82 +374,120 @@ let pending_bytes t =
let has_pending_output t =
pending_bytes t <> 0
let yield t =
t.yield <- true
let pause t =
match t.state with
| Active -> t.state <- Paused
| Paused | Closed -> ()
let unpause t =
match t.state with
| Active | Closed -> ()
| Paused ->
t.state <- Active;
if has_pending_output t then
wake_writer t
let flush t =
flush_buffer t;
unpause t;
if not (Buffers.is_empty t.scheduled) then (
let p, r = Promise.create () in
Flushes.enqueue (t.bytes_received, Promise.resolve r) t.flushed;
Promise.await p
)
let rec shift_buffers t written =
try
let { Cstruct.len; _ } as iovec = Buffers.dequeue_exn t.scheduled in
if len <= written then begin
match Buffers.dequeue_exn t.scheduled with
| { Cstruct.len; _ } as iovec ->
if len <= written then
shift_buffers t (written - len)
end else
else
Buffers.enqueue_front (Cstruct.shift iovec written) t.scheduled
with Dequeue_empty ->
| exception Dequeue_empty ->
assert (written = 0);
if t.scheduled_pos = t.write_pos then begin
t.scheduled_pos <- 0;
t.write_pos <- 0
end
(* Resolve any flushes that are now due. *)
let rec shift_flushes t =
try
let (threshold, f) as flush = Flushes.dequeue_exn t.flushed in
(* Edited notes from @dinosaure:
*
* The quantities [t.bytes_written] and [threshold] are always going to be
* positive integers. Therefore, we can treat them as unsinged integers for
* the purposes of comparision. Doing so allows us to handle overflows in
* either quantity as long as they're both within one overflow of each other.
* We can accomplish this by subracting [min_int] from both quantities before
* comparision. This shift a quantity that has not overflowed into the
* negative integer range while shifting a quantity that has overflow into
* the positive integer range.
*
* This effectively restablishes the relative difference when an overflow
* has occurred, and otherwise just compares numbers that haven't
* overflowed as similarly, just shifted down a bit.
*)
if t.bytes_written - min_int >= threshold - min_int
then begin f (); shift_flushes t end
else Flushes.enqueue_front flush t.flushed
with Dequeue_empty ->
()
match Flushes.dequeue_exn t.flushed with
| exception Dequeue_empty -> ()
| (threshold, f) as flush ->
(* Be careful: [bytes_written] and [threshold] both wrap, so subtract first. *)
if t.bytes_written - threshold >= 0 then (
(* We have written at least up to [threshold]
(or we're more than [max_int] behind, which we assume won't happen). *)
f ();
shift_flushes t
) else (
Flushes.enqueue_front flush t.flushed
)
let shift t written =
shift_buffers t written;
t.bytes_written <- t.bytes_written + written;
shift_flushes t
let operation t =
if t.closed then begin
t.yield <- false
end;
let rec await_batch t =
flush_buffer t;
let nothing_to_do = not (has_pending_output t) in
if t.closed && nothing_to_do then
`Close
else if t.yield || nothing_to_do then begin
t.yield <- false;
`Yield
end else begin
let iovecs = Buffers.map_to_list t.scheduled ~f:(fun x -> x) in
`Writev iovecs
match t.state, has_pending_output t with
| Closed, false -> raise End_of_file
| (Active | Closed), true -> Buffers.to_list t.scheduled
| Paused, _ | Active, false ->
Suspend.enter (fun ctx enqueue ->
Fiber_context.set_cancel_fn ctx (fun ex ->
t.wake_writer <- ignore;
enqueue (Error ex)
);
t.wake_writer <- (fun () ->
(* Our caller has already set [wake_writer <- ignore]. *)
ignore (Fiber_context.clear_cancel_fn ctx : bool);
enqueue (Ok ())
);
);
await_batch t
let read_into t buf =
let iovecs = await_batch t in
let n, _iovecs = Cstruct.fillv ~src:iovecs ~dst:buf in
shift t n;
n
let read_source_buffer t fn =
let iovecs = await_batch t in
shift t (fn iovecs)
let as_flow t =
object
inherit Flow.source
method! read_methods = [Flow.Read_source_buffer (read_source_buffer t)]
method read_into = read_into t
end
let with_flow ?(initial_size=0x1000) flow fn =
let t = create initial_size in
Switch.run @@ fun sw ->
Fiber.fork ~sw (fun () -> Flow.copy (as_flow t) flow);
Fun.protect ~finally:(fun () -> close t) (fun () -> fn t)
let rec serialize t writev =
match operation t with
| `Writev iovecs ->
begin match writev iovecs with
| `Ok n -> shift t n; if not (Buffers.is_empty t.scheduled) then yield t
| `Closed -> close t
end;
match await_batch t with
| exception End_of_file -> Ok ()
| iovecs ->
match writev iovecs with
| Error `Closed as e -> close t; e
| Ok n ->
shift t n;
if not (Buffers.is_empty t.scheduled) then Fiber.yield ();
serialize t writev
| (`Close|`Yield) as next -> next
let serialize_to_string t =
close t;
match operation t with
| `Writev iovecs ->
match await_batch t with
| exception End_of_file -> ""
| iovecs ->
let len = Cstruct.lenv iovecs in
let bytes = Bytes.create len in
let pos = ref 0 in
@ -423,37 +497,26 @@ let serialize_to_string t =
pos := !pos + len)
iovecs;
shift t len;
assert (operation t = `Close);
assert (not (has_pending_output t));
Bytes.unsafe_to_string bytes
| `Close -> ""
| `Yield -> assert false
let serialize_to_bigstring t =
let serialize_to_cstruct t =
close t;
match operation t with
| `Writev iovecs ->
let len = Cstruct.lenv iovecs in
let bs = Bigstringaf.create len in
let pos = ref 0 in
List.iter (function
| { Cstruct.buffer; off; len } ->
Bigstringaf.unsafe_blit buffer ~src_off:off bs ~dst_off:!pos ~len;
pos := !pos + len)
iovecs;
shift t len;
assert (operation t = `Close);
bs
| `Close -> Bigstringaf.create 0
| `Yield -> assert false
match await_batch t with
| exception End_of_file -> Cstruct.empty
| iovecs ->
let data = Cstruct.concat iovecs in
shift t (Cstruct.length data);
assert (not (has_pending_output t));
data
let drain =
let rec loop t acc =
match operation t with
| `Writev iovecs ->
match await_batch t with
| exception End_of_file -> acc
| iovecs ->
let len = Cstruct.lenv iovecs in
shift t len;
loop t (len + acc)
| `Close -> acc
| `Yield -> loop t acc
in
fun t -> loop t 0

View File

@ -1,4 +1,7 @@
(*----------------------------------------------------------------------------
(* This module is based on code from Faraday (0.7.2), which had the following
license:
----------------------------------------------------------------------------
Copyright (c) 2016 Inhabited Type LLC.
All rights reserved.
@ -31,44 +34,68 @@
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
(** Serialization primitives built for speed an memory-efficiency.
(** Serialization primitives built for speed and memory-efficiency.
Faraday is a library for writing fast and memory-efficient serializers. Its
core type and related operation gives the user fine-grained control over
copying and allocation behavior while serializing user-defined types, and
presents the output in a form that makes it possible to use vectorized
Buf_write is designed for writing fast and memory-efficient serializers.
It is based on the Faraday library, but adapted for Eio.
Its core type and related operation gives the user fine-grained control
over copying and allocation behavior while serializing user-defined types,
and presents the output in a form that makes it possible to use vectorized
write operations, such as the [writev][] system call, or any other platform
or application-specific output APIs.
A Faraday serializer manages an internal buffer and a queue of output
A Buf_write serializer manages an internal buffer and a queue of output
buffers. The output bufferes may be a sub range of the serializer's
internal buffer or one that is user-provided. Buffered writes such as
{!write_string}, {!write_char}, {!write_bigstring}, etc., copy the source
bytes into the serializer's internal buffer. Unbuffered writes such as
{!schedule_string}, {!schedule_bigstring}, etc., on the other hand perform
no copying. Instead, they enqueue the source bytes into the serializer's
write queue directly. *)
{!string}, {!char}, {!cstruct}, etc., copy the source bytes into the
serializer's internal buffer. Unbuffered writes are done with
{!schedule_cstruct}, which performs no copying. Instead, it enqueues the
source bytes into the serializer's write queue directly.
Example:
type bigstring =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
{[
module Write = Eio.Buf_write
let () =
Eio_mock.Backend.run @@ fun () ->
let stdout = Eio_mock.Flow.make "stdout" in
Write.with_flow stdout (fun w ->
Write.string w "foo";
Write.string w "bar";
Eio.Fiber.yield ();
Write.string w "baz";
)
]}
This combines the first two writes, giving:
{[
+stdout: wrote "foobar"
+stdout: wrote "baz"
]}
*)
type t
(** The type of a serializer. *)
(** {2 Running} *)
(** {2 Constructors} *)
val with_flow : ?initial_size:int -> #Flow.sink -> (t -> 'a) -> 'a
(** [with_flow flow fn] runs [fn writer], where [writer] is a buffer that flushes to [flow].
val create : int -> t
(** [create len] creates a serializer with a fixed-length internal buffer of
length [len]. See the Buffered writes section for details about what happens
when [len] is not large enough to support a write. *)
Concurrently with [fn], it also runs a fiber that copies from [writer] to [flow].
If this fiber runs out of data to copy then it will suspend itself.
Writing to [writer] will automatically schedule it to be resumed.
This means that pending data is flushed automatically before the process sleeps.
val of_bigstring : bigstring -> t
(** [of_bigstring buf] creates a serializer, using [buf] as its internal
buffer. The serializer takes ownership of [buf] until the serializer has
been closed and flushed of all output. *)
When [fn] returns, [writer] is automatically closed and any remaining data is flushed
before [with_flow] itself returns.
@param initial_size The initial size of the buffer used to collect writes.
New buffers will be allocated as needed, with the same size.
If the buffer is too small to contain a write, the size is increased. *)
(** {2 Buffered Writes}
@ -80,111 +107,109 @@ val of_bigstring : bigstring -> t
use it for the current and subsequent writes. The old buffer will be
garbage collected once all of its contents have been {!flush}ed. *)
val write_string : t -> ?off:int -> ?len:int -> string -> unit
(** [write_string t ?off ?len str] copies [str] into the serializer's
val string : t -> ?off:int -> ?len:int -> string -> unit
(** [string t ?off ?len str] copies [str] into the serializer's
internal buffer. *)
val write_bytes : t -> ?off:int -> ?len:int -> Bytes.t -> unit
(** [write_bytes t ?off ?len bytes] copies [bytes] into the serializer's
val bytes : t -> ?off:int -> ?len:int -> Bytes.t -> unit
(** [bytes t ?off ?len bytes] copies [bytes] into the serializer's
internal buffer. It is safe to modify [bytes] after this call returns. *)
val write_bigstring : t -> ?off:int -> ?len:int -> bigstring -> unit
(** [write_bigstring t ?off ?len bigstring] copies [bigstring] into the
serializer's internal buffer. It is safe to modify [bigstring] after this
call returns. *)
val cstruct : t -> Cstruct.t -> unit
(** [cstruct t cs] copies [cs] into the serializer's internal buffer.
It is safe to modify [cs] after this call returns.
For large cstructs, it may be more efficient to use {!schedule_cstruct}. *)
val write_gen
: t
-> length:('a -> int)
-> blit:('a -> src_off:int -> bigstring -> dst_off:int -> len:int -> unit)
-> ?off:int
-> ?len:int
-> blit:('a -> src_off:int -> Cstruct.buffer -> dst_off:int -> len:int -> unit)
-> off:int
-> len:int
-> 'a -> unit
(** [write_gen t ~length ~blit ?off ?len x] copies [x] into the serializer's
internal buffer using the provided [length] and [blit] operations.
(** [write_gen t ~blit ~off ~len x] copies [x] into the serializer's
internal buffer using the provided [blit] operation.
See {!Bigstring.blit} for documentation of the arguments. *)
val write_char : t -> char -> unit
(** [write_char t char] copies [char] into the serializer's internal buffer. *)
val char : t -> char -> unit
(** [char t c] copies [c] into the serializer's internal buffer. *)
val write_uint8 : t -> int -> unit
(** [write_uint8 t n] copies the lower 8 bits of [n] into the serializer's
val uint8 : t -> int -> unit
(** [uint8 t n] copies the lower 8 bits of [n] into the serializer's
internal buffer. *)
(** Big endian serializers *)
module BE : sig
val write_uint16 : t -> int -> unit
(** [write_uint16 t n] copies the lower 16 bits of [n] into the serializer's
val uint16 : t -> int -> unit
(** [uint16 t n] copies the lower 16 bits of [n] into the serializer's
internal buffer in big-endian byte order. *)
val write_uint32 : t -> int32 -> unit
(** [write_uint32 t n] copies [n] into the serializer's internal buffer in
val uint32 : t -> int32 -> unit
(** [uint32 t n] copies [n] into the serializer's internal buffer in
big-endian byte order. *)
val write_uint48 : t -> int64 -> unit
(** [write_uint48 t n] copies the lower 48 bits of [n] into the serializer's
val uint48 : t -> int64 -> unit
(** [uint48 t n] copies the lower 48 bits of [n] into the serializer's
internal buffer in big-endian byte order. *)
val write_uint64 : t -> int64 -> unit
(** [write_uint64 t n] copies [n] into the serializer's internal buffer in
val uint64 : t -> int64 -> unit
(** [uint64 t n] copies [n] into the serializer's internal buffer in
big-endian byte order. *)
val write_float : t -> float -> unit
(** [write_float t n] copies the lower 32 bits of [n] into the serializer's
val float : t -> float -> unit
(** [float t n] copies the lower 32 bits of [n] into the serializer's
internal buffer in big-endian byte order. *)
val write_double : t -> float -> unit
(** [write_double t n] copies [n] into the serializer's internal buffer in
val double : t -> float -> unit
(** [double t n] copies [n] into the serializer's internal buffer in
big-endian byte order. *)
end
(** Little endian serializers *)
module LE : sig
val write_uint16 : t -> int -> unit
(** [write_uint16 t n] copies the lower 16 bits of [n] into the
val uint16 : t -> int -> unit
(** [uint16 t n] copies the lower 16 bits of [n] into the
serializer's internal buffer in little-endian byte order. *)
val write_uint32 : t -> int32 -> unit
(** [write_uint32 t n] copies [n] into the serializer's internal buffer in
val uint32 : t -> int32 -> unit
(** [uint32 t n] copies [n] into the serializer's internal buffer in
little-endian byte order. *)
val write_uint48 : t -> int64 -> unit
(** [write_uint48 t n] copies the lower 48 bits of [n] into the serializer's
val uint48 : t -> int64 -> unit
(** [uint48 t n] copies the lower 48 bits of [n] into the serializer's
internal buffer in little-endian byte order. *)
val write_uint64 : t -> int64 -> unit
(** [write_uint64 t n] copies [n] into the serializer's internal buffer in
val uint64 : t -> int64 -> unit
(** [uint64 t n] copies [n] into the serializer's internal buffer in
little-endian byte order. *)
val write_float : t -> float -> unit
(** [write_float t n] copies the lower 32 bits of [n] into the serializer's
val float : t -> float -> unit
(** [float t n] copies the lower 32 bits of [n] into the serializer's
internal buffer in little-endian byte order. *)
val write_double : t -> float -> unit
(** [write_double t n] copies [n] into the serializer's internal buffer in
val double : t -> float -> unit
(** [double t n] copies [n] into the serializer's internal buffer in
little-endian byte order. *)
end
(** {2 Unbuffered Writes}
Unbuffered writes do not involve copying bytes to the serializers internal
Unbuffered writes do not involve copying bytes to the serializer's internal
buffer. *)
val schedule_bigstring : t -> ?off:int -> ?len:int -> bigstring -> unit
(** [schedule_bigstring t ?off ?len bigstring] schedules [bigstring] to
be written the next time the serializer surfaces writes to the user.
[bigstring] is not copied in this process, so [bigstring] should only be
modified after [t] has been {!flush}ed. *)
val schedule_cstruct : t -> Cstruct.t -> unit
(** [schedule_cstruct t cs] schedules [cs] to be written.
[cs] is not copied in this process,
so [cs] should only be modified after [t] has been {!flush}ed. *)
(** {2 Querying A Serializer's State} *)
val free_bytes_in_buffer : t -> int
(** [free_bytes_in_buffer t] returns the free space, in bytes, of the
serializer's write buffer. If a {write_*} call has a length that exceeds
serializer's write buffer. If a write call has a length that exceeds
this value, the serializer will allocate a new buffer that will replace the
serializer's internal buffer for that and subsequent calls. *)
@ -195,92 +220,70 @@ val has_pending_output : t -> bool
val pending_bytes : t -> int
(** [pending_bytes t] is the size of the next write, in bytes, that [t] will
surface to the caller as a [`Writev]. *)
surface to the caller via {!await_batch}. *)
(** {2 Control Operations} *)
val yield : t -> unit
(** [yield t] causes [t] to delay surfacing writes to the user, instead
returning a [`Yield]. This gives the serializer an opportunity to collect
additional writes before sending them to the underlying device, which will
increase the write batch size.
val pause : t -> unit
(** [pause t] causes [t] to stop surfacing writes to the user.
This gives the serializer an opportunity to collect additional writes
before sending them to the underlying device, which will increase the write
batch size.
As one example, code may want to call this function if it's about to
release the OCaml lock and perform a blocking system call, but would like
to batch output across that system call. To hint to the thread of control
that is performing the writes on behalf of the serializer, the code might
call [yield t] before releasing the lock. *)
to batch output across that system call.
val flush : t -> (unit -> unit) -> unit
(** [flush t f] registers [f] to be called when all prior writes have been
successfully completed. If [t] has no pending writes, then [f] will be
called immediately. If {!yield} was recently called on [t], then the effect
of the [yield] will be ignored so that client code has an opportunity to
write pending output, regardless of how it handles [`Yield] operations. *)
Call {!unpause} to resume writing later.
Note that calling {!flush} or {!close} will automatically call {!unpause} too. *)
val unpause : t -> unit
(** [unpause t] resumes writing data after a previous call to {!pause}. *)
val flush : t -> unit
(** [flush t] waits until all prior writes have been successfully completed.
If [t] has no pending writes, [flush] returns immediately.
If [t] is paused then it is unpaused first. *)
val close : t -> unit
(** [close t] closes [t]. All subsequent write calls will raise, and any
pending or subsequent {!yield} calls will be ignored. If the serializer has
subsequent {!pause} calls will be ignored. If the serializer has
any pending writes, user code will have an opportunity to service them
before it receives the [Close] operation. Flush callbacks will continue to
before receiving [End_of_file]. Flush callbacks will continue to
be invoked while output is {!shift}ed out of [t] as needed. *)
val is_closed : t -> bool
(** [is_closed t] is [true] if [close] has been called on [t] and [false]
otherwise. A closed [t] may still have pending output. *)
(** {2 Low-level API}
Low-level operations for running a serializer. *)
val create : int -> t
(** [create len] creates a serializer with a fixed-length internal buffer of
length [len]. See the Buffered writes section for details about what happens
when [len] is not large enough to support a write. *)
val of_buffer : Cstruct.buffer -> t
(** [of_buffer buf] creates a serializer, using [buf] as its internal
buffer. The serializer takes ownership of [buf] until the serializer has
been closed and flushed of all output. *)
val await_batch : t -> Cstruct.t list
(** [await_batch t] returns a list of buffers that should be written.
If no data is currently available, it waits until some is.
After performing a write, call {!shift} with the number of bytes written.
You must accurately report the number of bytes written. Failure to do so
will result in the same bytes being surfaced multiple times.
@raises End_of_file [t] is closed and there is nothing left to write. *)
val shift : t -> int -> unit
(** [shift t n] removes the first [n] bytes in [t]'s write queue. Any flush
callbacks registered with [t] within this span of the write queue will be
called. *)
val drain : t -> int
(** [drain t] removes all pending writes from [t], returning the number of
bytes that were enqueued to be written and freeing any scheduled
buffers in the process. *)
(** {2 Running}
Low-level operations for runing a serializer. For production use-cases,
consider the Async and Lwt support that this library includes before
attempting to use this these operations directly. *)
type operation = [
| `Writev of Cstruct.t list
| `Yield
| `Close ]
(** The type of operations that the serialier may wish to perform.
{ul
{li [`Writev iovecs]: Write the bytes in {!iovecs}s reporting the actual
number of bytes written by calling {!shift}. You must accurately report the
number of bytes written. Failure to do so will result in the same bytes being
surfaced in a [`Writev] operation multiple times.}
{li [`Yield]: Yield to other threads of control, waiting for additional
output before procedding. The method for achieving this is
application-specific, but once complete, the caller can proceed with
serialization by simply making another call to {!val:operation} or
{!serialize}.}
{li [`Close]: Serialization is complete. No further output will generated.
The action to take as a result, if any, is application-specific.}} *)
val operation : t -> operation
(** [operation t] is the next operation that the caller must perform on behalf
of the serializer [t]. Users should consider using {!serialize} before this
function. See the documentation for the {!type:operation} type for details
on how callers should handle these operations. *)
val serialize : t -> (Cstruct.t list -> [`Ok of int | `Closed]) -> [`Yield | `Close]
(** [serialize t writev] sufaces the next operation of [t] to the caller,
handling a [`Writev] operation with [writev] function and performing an
additional bookkeeping on the caller's behalf. In the event that [writev]
indicates a partial write, {!serialize} will call {!yield} on the
serializer rather than attempting successive [writev] calls. *)
operations called within this span of the write queue will be scheduled
to resume. *)
(** {2 Convenience Functions}
@ -289,12 +292,21 @@ val serialize : t -> (Cstruct.t list -> [`Ok of int | `Closed]) -> [`Yield | `Cl
development. They are not the suggested way of driving a serializer in a
production setting. *)
val serialize : t -> (Cstruct.t list -> (int, [`Closed]) result) -> (unit, [> `Closed]) result
(** [serialize t writev] calls [writev bufs] each time [t] is ready to write.
In the event that [writev] indicates a partial write, {!serialize} will
call {!Fiber.yield} before continuing. *)
val serialize_to_string : t -> string
(** [serialize_to_string t] runs [t], collecting the output into a string and
returning it. [serialzie_to_string t] immediately closes [t] and ignores
any calls to {!yield} on [t]. *)
returning it. [serializie_to_string t] immediately closes [t]. *)
val serialize_to_bigstring : t -> bigstring
(** [serialize_to_string t] runs [t], collecting the output into a bigstring
and returning it. [serialzie_to_bigstring t] immediately closes [t] and
ignores any calls to {!yield} on [t]. *)
val serialize_to_cstruct : t -> Cstruct.t
(** [serialize_to_cstruct t] runs [t], collecting the output into a cstruct
and returning it. [serialize_to_cstruct t] immediately closes [t]. *)
val drain : t -> int
(** [drain t] removes all pending writes from [t], returning the number of
bytes that were enqueued to be written and freeing any scheduled
buffers in the process. Note that this does not close [t] itself,
and does not return until [t] has been closed. *)

338
tests/buf_write.md Normal file
View File

@ -0,0 +1,338 @@
```ocaml
# #require "eio";;
# #require "eio.mock";;
```
```ocaml
open Eio.Std
module Write = Eio.Buf_write
let flow = Eio_mock.Flow.make "flow"
let flow_rsb = Eio_mock.Flow.make "flow"
let () = Eio_mock.Flow.set_copy_method flow_rsb `Read_source_buffer
```
## A simple run-through
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun w ->
Write.string w "Hello"; Write.char w ' '; Write.string w "world";;
+flow: wrote "Hello world"
- : unit = ()
```
## Auto-commit
If we yield then we flush the data so far:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun w ->
Write.string w "Hello"; Write.char w ' ';
Fiber.yield ();
Write.string w "world";;
+flow: wrote "Hello "
+flow: wrote "world"
- : unit = ()
```
## Read source buffer
If supported by the flow, we can avoid copying:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow_rsb @@ fun w ->
Write.string w "Hello";
Write.char w ' ';
Write.schedule_cstruct w (Cstruct.of_string "world");
Write.char w '!';;
+flow: wrote (rsb) ["Hello "; "world"; "!"]
- : unit = ()
```
## Pausing
Without pausing:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun w ->
Write.string w "Hello... ";
Fiber.yield ();
Write.string w "world";;
+flow: wrote "Hello... "
+flow: wrote "world"
- : unit = ()
```
With pausing
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun w ->
Write.string w "Hello... ";
Write.pause w;
Fiber.yield ();
Write.unpause w;
Write.string w "world";;
+flow: wrote "Hello... world"
- : unit = ()
```
## Empty writes
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
Write.string t "";
Write.bytes t (Bytes.make 0 '\000');
Write.cstruct t Cstruct.empty;
Write.schedule_cstruct t Cstruct.empty;;
- : unit = ()
```
## Endianness
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
Write.LE.uint16 t 5;
Fiber.yield ();
Write.BE.uint16 t 5;;
+flow: wrote "\005\000"
+flow: wrote "\000\005"
- : unit = ()
```
## Writes
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
Write.string t "test";
Fiber.yield ();
Write.bytes t (Bytes.of_string "test");
Fiber.yield ();
Write.cstruct t (Cstruct.of_string ~off:1 ~len:4 "!test!");
Fiber.yield ();
Write.char t 'A';;;
+flow: wrote "test"
+flow: wrote "test"
+flow: wrote "test"
+flow: wrote "A"
- : unit = ()
```
## Multiple writes
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let f t =
Write.string t "te";
Write.string t "st";
Write.string t "te";
Write.string t "st";
Write.char t 't';
Write.char t 'e'
in
traceln "With room:";
Write.with_flow flow_rsb f;
traceln "Without room:";
Write.with_flow ~initial_size:1 flow_rsb f;;
+With room:
+flow: wrote (rsb) ["testtestte"]
+Without room:
+flow: wrote (rsb) ["te"; "st"; "te"; "st"; "te"]
- : unit = ()
```
## Flushing
```ocaml
let p1, r2 = Promise.create ();;
Eio_mock.Flow.on_copy_bytes flow [
`Await p1;
]
```
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
Fiber.both
(fun () ->
Write.flush t;
Write.string t "Hello";
traceln "Flushing...";
Write.flush t;
traceln "Flushed"
)
(fun () ->
traceln "Write now completes...";
Promise.resolve_ok r2 3
);;
+Flushing...
+Write now completes...
+flow: wrote "Hel"
+flow: wrote "lo"
+Flushed
- : unit = ()
```
Multiple flushes:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow_rsb [
`Yield_then (`Return 1);
`Yield_then (`Return 2);
`Yield_then (`Return 2);
`Yield_then (`Return 2);
];
Write.with_flow flow_rsb @@ fun t ->
Fiber.all [
(fun () -> Write.string t "ab"; Write.flush t; traceln "1st flush");
(fun () -> Write.string t "cd"; Write.flush t; traceln "2nd flush");
(fun () -> Write.string t "ef"; Write.flush t; traceln "3rd flush");
];
traceln "Done";;
+flow: wrote (rsb) ["a"]
+flow: wrote (rsb) ["b"; "c"]
+1st flush
+flow: wrote (rsb) ["d"; "e"]
+2nd flush
+flow: wrote (rsb) ["f"]
+3rd flush
+Done
- : unit = ()
```
## Scheduling
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
Write.schedule_cstruct t (Cstruct.of_string "one");
Write.string t "two";
Fiber.yield ();
Write.string t "one";
Write.schedule_cstruct t (Cstruct.of_string "two");
Fiber.yield ();
Write.schedule_cstruct t (Cstruct.of_string "end");
Fiber.yield ();
traceln "Should all be flushed by now.";;;
+flow: wrote "onetwo"
+flow: wrote "onetwo"
+flow: wrote "end"
+Should all be flushed by now.
- : unit = ()
```
## Cancellation
Cancelled while waiting for the underlying flow to perform the write:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let flow = Eio_mock.Flow.make "flow" in
Eio_mock.Flow.on_copy_bytes flow [`Run Fiber.await_cancel];
Write.with_flow flow @@ fun t ->
Fiber.both
(fun () -> Write.string t "Hello"; traceln "Did write")
(fun () -> Fiber.yield (); failwith "Simulated error");;
+Did write
Exception: Failure "Simulated error".
```
Cancelled while waiting for some data:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let t = Write.create 100 in
Fiber.both
(fun () -> ignore (Write.await_batch t); assert false)
(fun () -> failwith "Simulated error");;
Exception: Failure "Simulated error".
```
## Invalid offset
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
try Write.string t "hi" ~off:100; assert false
with Invalid_argument _ -> ();;
- : unit = ()
```
## Serialize
```ocaml
let foobar () =
let t = Write.create 100 in
Write.string t "foo";
Write.cstruct t (Cstruct.of_string "bar");
Write.close t;
t
```
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.serialize (foobar ()) @@ fun bufs ->
traceln "Write %a" Fmt.(Dump.list (using Cstruct.to_string Dump.string)) bufs;
Ok (Cstruct.lenv bufs);;
+Write ["foobar"]
- : (unit, [> `Closed ]) result = Ok ()
```
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.serialize (foobar ()) @@ fun bufs ->
assert (bufs <> []);
traceln "Write %a" Fmt.(Dump.list (using Cstruct.to_string Dump.string)) bufs;
Error `Closed;;
+Write ["foobar"]
- : (unit, [> `Closed ]) result = Error `Closed
```
```ocaml
# Write.serialize_to_string (foobar ());;
- : string = "foobar"
```
```ocaml
# Write.serialize_to_cstruct (foobar ()) |> Cstruct.to_string;;
- : string = "foobar"
```
## Exceptions
We still flush the output on error:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.with_flow flow @@ fun t ->
Write.string t "foo";
failwith "Simulated error";;
+flow: wrote "foo"
Exception: Failure "Simulated error".
```
But we don't flush if cancelled:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let flow = Eio_mock.Flow.make "flow" in
Eio_mock.Flow.on_copy_bytes flow [`Run Fiber.await_cancel];
Fiber.both
(fun () ->
Write.with_flow flow @@ fun t ->
Write.string t "foo";
Fiber.await_cancel ()
)
(fun () -> failwith "Simulated error");;
Exception: Failure "Simulated error".
```