Import of Faraday 0.7.2 as Buf_write

This commit is contained in:
Thomas Leonard 2022-06-21 15:56:05 +01:00
parent 62e2964fc9
commit 61dd5d2a73
8 changed files with 833 additions and 1 deletions

View File

@ -33,3 +33,38 @@ ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
The `Eio.Buf_write` module is based on Faraday by Inhabited Type LLC,
which has the following license (BSD-3-clause):
Copyright (c) 2016, Inhabited Type LLC
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

View File

@ -14,6 +14,7 @@
(depends
(ocaml (>= 4.12.0))
base-domains
(bigstringaf (>= 0.9.0))
(cstruct (>= 6.0.1))
lwt-dllist
(optint (>= 0.1.0))

View File

@ -12,6 +12,7 @@ depends: [
"dune" {>= "2.9"}
"ocaml" {>= "4.12.0"}
"base-domains"
"bigstringaf" {>= "0.9.0"}
"cstruct" {>= "6.0.1"}
"lwt-dllist"
"optint" {>= "0.1.0"}

484
lib_eio/buf_write.ml Normal file
View File

@ -0,0 +1,484 @@
(*----------------------------------------------------------------------------
Copyright (c) 2016 Inhabited Type LLC.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
type bigstring = Bigstringaf.t
type 'a iovec =
{ buffer : 'a
; off : int
; len : int }
exception Dequeue_empty
module Deque(T:sig type t val sentinel : t end) : sig
type elem = T.t
type t
val create : int -> t
val is_empty : t -> bool
val enqueue : elem -> t -> unit
val dequeue_exn : t -> elem
val enqueue_front : elem -> t -> unit
val map_to_list : t -> f:(elem -> 'b) -> 'b list
end = struct
type elem = T.t
type t =
{ mutable elements : elem array
; mutable front : int
; mutable back : int }
let sentinel = T.sentinel
let create size =
{ elements = Array.make size sentinel; front = 0; back = 0 }
let is_empty t =
t.front = t.back
let ensure_space t =
if t.back = Array.length t.elements - 1 then begin
let len = t.back - t.front in
if t.front > 0 then begin
(* Shift everything to the front of the array and then clear out
* dangling pointers to elements from their previous locations. *)
Array.blit t.elements t.front t.elements 0 len;
Array.fill t.elements len t.front sentinel
end else begin
let old = t.elements in
let new_ = Array.(make (2 * length old) sentinel) in
Array.blit old t.front new_ 0 len;
t.elements <- new_
end;
t.front <- 0;
t.back <- len
end
let enqueue e t =
ensure_space t;
t.elements.(t.back) <- e;
t.back <- t.back + 1
let dequeue_exn t =
if is_empty t then
raise Dequeue_empty
else
let result = Array.unsafe_get t.elements t.front in
Array.unsafe_set t.elements t.front sentinel;
t.front <- t.front + 1;
result
let enqueue_front e t =
(* This is in general not true for Deque data structures, but the usage
* below ensures that there is always space to push an element back on the
* front. An [enqueue_front] is always preceded by a [dequeue], with no
* intervening operations. *)
assert (t.front > 0);
t.front <- t.front - 1;
t.elements.(t.front) <- e
let map_to_list t ~f =
let result = ref [] in
for i = t.back - 1 downto t.front do
result := f t.elements.(i) :: !result
done;
!result
end
module IOVec = struct
let create buffer ~off ~len =
{ buffer; off; len }
let length t =
t.len
let shift { buffer; off; len } n =
assert (n < len);
{ buffer; off = off + n; len = len - n }
let lengthv ts =
let rec loop ts acc =
match ts with
| [] -> acc
| iovec::ts -> loop ts (length iovec + acc)
in
loop ts 0
end
module Buffers = Deque(struct
type t = bigstring iovec
let sentinel =
let deadbeef = "\222\173\190\239" in
let len = String.length deadbeef in
let buffer = Bigstringaf.create len in
String.iteri (Bigstringaf.unsafe_set buffer) deadbeef;
{ buffer; off = 0; len }
end)
module Flushes = Deque(struct
type t = int * (unit -> unit)
let sentinel = 0, fun () -> ()
end)
type t =
{ mutable buffer : bigstring
; mutable scheduled_pos : int
; mutable write_pos : int
; scheduled : Buffers.t
; flushed : Flushes.t
; mutable bytes_received : int
; mutable bytes_written : int
; mutable closed : bool
; mutable yield : bool
}
type operation = [
| `Writev of bigstring iovec list
| `Yield
| `Close
]
let of_bigstring buffer =
{ buffer
; write_pos = 0
; scheduled_pos = 0
; scheduled = Buffers.create 4
; flushed = Flushes.create 1
; bytes_received = 0
; bytes_written = 0
; closed = false
; yield = false }
let create size =
of_bigstring (Bigstringaf.create size)
let writable_exn t =
if t.closed then
failwith "cannot write to closed writer"
let schedule_iovec t ?(off=0) ~len buffer =
t.bytes_received <- t.bytes_received + len;
Buffers.enqueue (IOVec.create buffer ~off ~len) t.scheduled
let flush_buffer t =
let len = t.write_pos - t.scheduled_pos in
if len > 0 then begin
let off = t.scheduled_pos in
schedule_iovec t ~off ~len t.buffer;
t.scheduled_pos <- t.write_pos
end
let flush t f =
t.yield <- false;
flush_buffer t;
if Buffers.is_empty t.scheduled then f ()
else Flushes.enqueue (t.bytes_received, f) t.flushed
let free_bytes_in_buffer t =
let buf_len = Bigstringaf.length t.buffer in
buf_len - t.write_pos
let schedule_bigstring t ?(off=0) ?len a =
writable_exn t;
flush_buffer t;
let len =
match len with
| None -> Bigstringaf.length a - off
| Some len -> len
in
if len > 0 then schedule_iovec t ~off ~len a
let ensure_space t len =
if free_bytes_in_buffer t < len then begin
flush_buffer t;
t.buffer <- Bigstringaf.create (max (Bigstringaf.length t.buffer) len);
t.write_pos <- 0;
t.scheduled_pos <- 0
end
let write_gen t ~length ~blit ?(off=0) ?len a =
writable_exn t;
let len =
match len with
| None -> length a - off
| Some len -> len
in
ensure_space t len;
blit a ~src_off:off t.buffer ~dst_off:t.write_pos ~len;
t.write_pos <- t.write_pos + len
let write_string =
let length = String.length in
let blit = Bigstringaf.unsafe_blit_from_string in
fun t ?off ?len a -> write_gen t ~length ~blit ?off ?len a
let write_bytes =
let length = Bytes.length in
let blit = Bigstringaf.unsafe_blit_from_bytes in
fun t ?off ?len a -> write_gen t ~length ~blit ?off ?len a
let write_bigstring =
let length = Bigstringaf.length in
let blit = Bigstringaf.unsafe_blit in
fun t ?off ?len a -> write_gen t ~length ~blit ?off ?len a
let write_char t c =
writable_exn t;
ensure_space t 1;
Bigstringaf.unsafe_set t.buffer t.write_pos c;
t.write_pos <- t.write_pos + 1
let write_uint8 t b =
writable_exn t;
ensure_space t 1;
Bigstringaf.unsafe_set t.buffer t.write_pos (Char.unsafe_chr b);
t.write_pos <- t.write_pos + 1
module BE = struct
let write_uint16 t i =
writable_exn t;
ensure_space t 2;
Bigstringaf.unsafe_set_int16_be t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 2
let write_uint32 t i =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_be t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 4
let write_uint48 t i =
writable_exn t;
ensure_space t 6;
Bigstringaf.unsafe_set_int32_be t.buffer t.write_pos
Int64.(to_int32 (shift_right_logical i 4));
Bigstringaf.unsafe_set_int16_be t.buffer (t.write_pos + 2)
Int64.(to_int i);
t.write_pos <- t.write_pos + 6
let write_uint64 t i =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_be t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 8
let write_float t f =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_be t.buffer t.write_pos (Int32.bits_of_float f);
t.write_pos <- t.write_pos + 4
let write_double t d =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_be t.buffer t.write_pos (Int64.bits_of_float d);
t.write_pos <- t.write_pos + 8
end
module LE = struct
let write_uint16 t i =
writable_exn t;
ensure_space t 2;
Bigstringaf.unsafe_set_int16_le t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 2
let write_uint32 t i =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_le t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 4
let write_uint48 t i =
writable_exn t;
ensure_space t 6;
Bigstringaf.unsafe_set_int16_le t.buffer t.write_pos
Int64.(to_int i);
Bigstringaf.unsafe_set_int32_le t.buffer (t.write_pos + 2)
Int64.(to_int32 (shift_right_logical i 2));
t.write_pos <- t.write_pos + 6
let write_uint64 t i =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_le t.buffer t.write_pos i;
t.write_pos <- t.write_pos + 8
let write_float t f =
writable_exn t;
ensure_space t 4;
Bigstringaf.unsafe_set_int32_le t.buffer t.write_pos (Int32.bits_of_float f);
t.write_pos <- t.write_pos + 4
let write_double t d =
writable_exn t;
ensure_space t 8;
Bigstringaf.unsafe_set_int64_le t.buffer t.write_pos (Int64.bits_of_float d);
t.write_pos <- t.write_pos + 8
end
let close t =
t.closed <- true;
flush_buffer t
let is_closed t =
t.closed
let pending_bytes t =
(t.write_pos - t.scheduled_pos) + (t.bytes_received - t.bytes_written)
let has_pending_output t =
pending_bytes t <> 0
let yield t =
t.yield <- true
let rec shift_buffers t written =
try
let { len; _ } as iovec = Buffers.dequeue_exn t.scheduled in
if len <= written then begin
shift_buffers t (written - len)
end else
Buffers.enqueue_front (IOVec.shift iovec written) t.scheduled
with Dequeue_empty ->
assert (written = 0);
if t.scheduled_pos = t.write_pos then begin
t.scheduled_pos <- 0;
t.write_pos <- 0
end
let rec shift_flushes t =
try
let (threshold, f) as flush = Flushes.dequeue_exn t.flushed in
(* Edited notes from @dinosaure:
*
* The quantities [t.bytes_written] and [threshold] are always going to be
* positive integers. Therefore, we can treat them as unsinged integers for
* the purposes of comparision. Doing so allows us to handle overflows in
* either quantity as long as they're both within one overflow of each other.
* We can accomplish this by subracting [min_int] from both quantities before
* comparision. This shift a quantity that has not overflowed into the
* negative integer range while shifting a quantity that has overflow into
* the positive integer range.
*
* This effectively restablishes the relative difference when an overflow
* has occurred, and otherwise just compares numbers that haven't
* overflowed as similarly, just shifted down a bit.
*)
if t.bytes_written - min_int >= threshold - min_int
then begin f (); shift_flushes t end
else Flushes.enqueue_front flush t.flushed
with Dequeue_empty ->
()
let shift t written =
shift_buffers t written;
t.bytes_written <- t.bytes_written + written;
shift_flushes t
let operation t =
if t.closed then begin
t.yield <- false
end;
flush_buffer t;
let nothing_to_do = not (has_pending_output t) in
if t.closed && nothing_to_do then
`Close
else if t.yield || nothing_to_do then begin
t.yield <- false;
`Yield
end else begin
let iovecs = Buffers.map_to_list t.scheduled ~f:(fun x -> x) in
`Writev iovecs
end
let rec serialize t writev =
match operation t with
| `Writev iovecs ->
begin match writev iovecs with
| `Ok n -> shift t n; if not (Buffers.is_empty t.scheduled) then yield t
| `Closed -> close t
end;
serialize t writev
| (`Close|`Yield) as next -> next
let serialize_to_string t =
close t;
match operation t with
| `Writev iovecs ->
let len = IOVec.lengthv iovecs in
let bytes = Bytes.create len in
let pos = ref 0 in
List.iter (function
| { buffer; off; len } ->
Bigstringaf.unsafe_blit_to_bytes buffer ~src_off:off bytes ~dst_off:!pos ~len;
pos := !pos + len)
iovecs;
shift t len;
assert (operation t = `Close);
Bytes.unsafe_to_string bytes
| `Close -> ""
| `Yield -> assert false
let serialize_to_bigstring t =
close t;
match operation t with
| `Writev iovecs ->
let len = IOVec.lengthv iovecs in
let bs = Bigstringaf.create len in
let pos = ref 0 in
List.iter (function
| { buffer; off; len } ->
Bigstringaf.unsafe_blit buffer ~src_off:off bs ~dst_off:!pos ~len;
pos := !pos + len)
iovecs;
shift t len;
assert (operation t = `Close);
bs
| `Close -> Bigstringaf.create 0
| `Yield -> assert false
let drain =
let rec loop t acc =
match operation t with
| `Writev iovecs ->
let len = IOVec.lengthv iovecs in
shift t len;
loop t (len + acc)
| `Close -> acc
| `Yield -> loop t acc
in
fun t -> loop t 0

307
lib_eio/buf_write.mli Normal file
View File

@ -0,0 +1,307 @@
(*----------------------------------------------------------------------------
Copyright (c) 2016 Inhabited Type LLC.
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
3. Neither the name of the author nor the names of his contributors
may be used to endorse or promote products derived from this software
without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE CONTRIBUTORS ``AS IS'' AND ANY EXPRESS
OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
----------------------------------------------------------------------------*)
(** Serialization primitives built for speed an memory-efficiency.
Faraday is a library for writing fast and memory-efficient serializers. Its
core type and related operation gives the user fine-grained control over
copying and allocation behavior while serializing user-defined types, and
presents the output in a form that makes it possible to use vectorized
write operations, such as the [writev][] system call, or any other platform
or application-specific output APIs.
A Faraday serializer manages an internal buffer and a queue of output
buffers. The output bufferes may be a sub range of the serializer's
internal buffer or one that is user-provided. Buffered writes such as
{!write_string}, {!write_char}, {!write_bigstring}, etc., copy the source
bytes into the serializer's internal buffer. Unbuffered writes such as
{!schedule_string}, {!schedule_bigstring}, etc., on the other hand perform
no copying. Instead, they enqueue the source bytes into the serializer's
write queue directly. *)
type bigstring =
(char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
type t
(** The type of a serializer. *)
(** {2 Constructors} *)
val create : int -> t
(** [create len] creates a serializer with a fixed-length internal buffer of
length [len]. See the Buffered writes section for details about what happens
when [len] is not large enough to support a write. *)
val of_bigstring : bigstring -> t
(** [of_bigstring buf] creates a serializer, using [buf] as its internal
buffer. The serializer takes ownership of [buf] until the serializer has
been closed and flushed of all output. *)
(** {2 Buffered Writes}
A serializer manages an internal buffer for coalescing small writes. The
size of this buffer is determined when the serializer is created. If the
buffer does not contain sufficient space to service a caller's buffered
write, the serializer will allocate a new buffer of the sufficient size and
use it for the current and subsequent writes. The old buffer will be
garbage collected once all of its contents have been {!flush}ed. *)
val write_string : t -> ?off:int -> ?len:int -> string -> unit
(** [write_string t ?off ?len str] copies [str] into the serializer's
internal buffer. *)
val write_bytes : t -> ?off:int -> ?len:int -> Bytes.t -> unit
(** [write_bytes t ?off ?len bytes] copies [bytes] into the serializer's
internal buffer. It is safe to modify [bytes] after this call returns. *)
val write_bigstring : t -> ?off:int -> ?len:int -> bigstring -> unit
(** [write_bigstring t ?off ?len bigstring] copies [bigstring] into the
serializer's internal buffer. It is safe to modify [bigstring] after this
call returns. *)
val write_gen
: t
-> length:('a -> int)
-> blit:('a -> src_off:int -> bigstring -> dst_off:int -> len:int -> unit)
-> ?off:int
-> ?len:int
-> 'a -> unit
(** [write_gen t ~length ~blit ?off ?len x] copies [x] into the serializer's
internal buffer using the provided [length] and [blit] operations.
See {!Bigstring.blit} for documentation of the arguments. *)
val write_char : t -> char -> unit
(** [write_char t char] copies [char] into the serializer's internal buffer. *)
val write_uint8 : t -> int -> unit
(** [write_uint8 t n] copies the lower 8 bits of [n] into the serializer's
internal buffer. *)
(** Big endian serializers *)
module BE : sig
val write_uint16 : t -> int -> unit
(** [write_uint16 t n] copies the lower 16 bits of [n] into the serializer's
internal buffer in big-endian byte order. *)
val write_uint32 : t -> int32 -> unit
(** [write_uint32 t n] copies [n] into the serializer's internal buffer in
big-endian byte order. *)
val write_uint48 : t -> int64 -> unit
(** [write_uint48 t n] copies the lower 48 bits of [n] into the serializer's
internal buffer in big-endian byte order. *)
val write_uint64 : t -> int64 -> unit
(** [write_uint64 t n] copies [n] into the serializer's internal buffer in
big-endian byte order. *)
val write_float : t -> float -> unit
(** [write_float t n] copies the lower 32 bits of [n] into the serializer's
internal buffer in big-endian byte order. *)
val write_double : t -> float -> unit
(** [write_double t n] copies [n] into the serializer's internal buffer in
big-endian byte order. *)
end
(** Little endian serializers *)
module LE : sig
val write_uint16 : t -> int -> unit
(** [write_uint16 t n] copies the lower 16 bits of [n] into the
serializer's internal buffer in little-endian byte order. *)
val write_uint32 : t -> int32 -> unit
(** [write_uint32 t n] copies [n] into the serializer's internal buffer in
little-endian byte order. *)
val write_uint48 : t -> int64 -> unit
(** [write_uint48 t n] copies the lower 48 bits of [n] into the serializer's
internal buffer in little-endian byte order. *)
val write_uint64 : t -> int64 -> unit
(** [write_uint64 t n] copies [n] into the serializer's internal buffer in
little-endian byte order. *)
val write_float : t -> float -> unit
(** [write_float t n] copies the lower 32 bits of [n] into the serializer's
internal buffer in little-endian byte order. *)
val write_double : t -> float -> unit
(** [write_double t n] copies [n] into the serializer's internal buffer in
little-endian byte order. *)
end
(** {2 Unbuffered Writes}
Unbuffered writes do not involve copying bytes to the serializers internal
buffer. *)
val schedule_bigstring : t -> ?off:int -> ?len:int -> bigstring -> unit
(** [schedule_bigstring t ?off ?len bigstring] schedules [bigstring] to
be written the next time the serializer surfaces writes to the user.
[bigstring] is not copied in this process, so [bigstring] should only be
modified after [t] has been {!flush}ed. *)
(** {2 Querying A Serializer's State} *)
val free_bytes_in_buffer : t -> int
(** [free_bytes_in_buffer t] returns the free space, in bytes, of the
serializer's write buffer. If a {write_*} call has a length that exceeds
this value, the serializer will allocate a new buffer that will replace the
serializer's internal buffer for that and subsequent calls. *)
val has_pending_output : t -> bool
(** [has_pending_output t] is [true] if [t]'s output queue is non-empty. It may
be the case that [t]'s queued output is being serviced by some other thread
of control, but has not yet completed. *)
val pending_bytes : t -> int
(** [pending_bytes t] is the size of the next write, in bytes, that [t] will
surface to the caller as a [`Writev]. *)
(** {2 Control Operations} *)
val yield : t -> unit
(** [yield t] causes [t] to delay surfacing writes to the user, instead
returning a [`Yield]. This gives the serializer an opportunity to collect
additional writes before sending them to the underlying device, which will
increase the write batch size.
As one example, code may want to call this function if it's about to
release the OCaml lock and perform a blocking system call, but would like
to batch output across that system call. To hint to the thread of control
that is performing the writes on behalf of the serializer, the code might
call [yield t] before releasing the lock. *)
val flush : t -> (unit -> unit) -> unit
(** [flush t f] registers [f] to be called when all prior writes have been
successfully completed. If [t] has no pending writes, then [f] will be
called immediately. If {!yield} was recently called on [t], then the effect
of the [yield] will be ignored so that client code has an opportunity to
write pending output, regardless of how it handles [`Yield] operations. *)
val close : t -> unit
(** [close t] closes [t]. All subsequent write calls will raise, and any
pending or subsequent {!yield} calls will be ignored. If the serializer has
any pending writes, user code will have an opportunity to service them
before it receives the [Close] operation. Flush callbacks will continue to
be invoked while output is {!shift}ed out of [t] as needed. *)
val is_closed : t -> bool
(** [is_closed t] is [true] if [close] has been called on [t] and [false]
otherwise. A closed [t] may still have pending output. *)
val shift : t -> int -> unit
(** [shift t n] removes the first [n] bytes in [t]'s write queue. Any flush
callbacks registered with [t] within this span of the write queue will be
called. *)
val drain : t -> int
(** [drain t] removes all pending writes from [t], returning the number of
bytes that were enqueued to be written and freeing any scheduled
buffers in the process. *)
(** {2 Running}
Low-level operations for runing a serializer. For production use-cases,
consider the Async and Lwt support that this library includes before
attempting to use this these operations directly. *)
type 'a iovec =
{ buffer : 'a
; off : int
; len : int }
(** A view into {!iovec.buffer} starting at {!iovec.off} and with length
{!iovec.len}. *)
type operation = [
| `Writev of bigstring iovec list
| `Yield
| `Close ]
(** The type of operations that the serialier may wish to perform.
{ul
{li [`Writev iovecs]: Write the bytes in {!iovecs}s reporting the actual
number of bytes written by calling {!shift}. You must accurately report the
number of bytes written. Failure to do so will result in the same bytes being
surfaced in a [`Writev] operation multiple times.}
{li [`Yield]: Yield to other threads of control, waiting for additional
output before procedding. The method for achieving this is
application-specific, but once complete, the caller can proceed with
serialization by simply making another call to {!val:operation} or
{!serialize}.}
{li [`Close]: Serialization is complete. No further output will generated.
The action to take as a result, if any, is application-specific.}} *)
val operation : t -> operation
(** [operation t] is the next operation that the caller must perform on behalf
of the serializer [t]. Users should consider using {!serialize} before this
function. See the documentation for the {!type:operation} type for details
on how callers should handle these operations. *)
val serialize : t -> (bigstring iovec list -> [`Ok of int | `Closed]) -> [`Yield | `Close]
(** [serialize t writev] sufaces the next operation of [t] to the caller,
handling a [`Writev] operation with [writev] function and performing an
additional bookkeeping on the caller's behalf. In the event that [writev]
indicates a partial write, {!serialize} will call {!yield} on the
serializer rather than attempting successive [writev] calls. *)
(** {2 Convenience Functions}
These functions are included for testing, debugging, and general
development. They are not the suggested way of driving a serializer in a
production setting. *)
val serialize_to_string : t -> string
(** [serialize_to_string t] runs [t], collecting the output into a string and
returning it. [serialzie_to_string t] immediately closes [t] and ignores
any calls to {!yield} on [t]. *)
val serialize_to_bigstring : t -> bigstring
(** [serialize_to_string t] runs [t], collecting the output into a bigstring
and returning it. [serialzie_to_bigstring t] immediately closes [t] and
ignores any calls to {!yield} on [t]. *)

View File

@ -2,4 +2,4 @@
(name eio)
(public_name eio)
(flags (:standard -open Eio__core -open Eio__core.Private))
(libraries eio__core cstruct lwt-dllist fmt))
(libraries eio__core cstruct lwt-dllist fmt bigstringaf))

View File

@ -23,6 +23,7 @@ module Exn = Exn
module Generic = Generic
module Flow = Flow
module Buf_read = Buf_read
module Buf_write = Buf_write
module Net = Net
module Domain_manager = Domain_manager
module Time = Time

View File

@ -75,6 +75,9 @@ module Flow = Flow
(** Buffered input and parsing *)
module Buf_read = Buf_read
(** Buffered output *)
module Buf_write = Buf_write
(** Networking. *)
module Net = Net