Merge pull request #161 from talex5/buffers

Add more functions to Buf_read
This commit is contained in:
Thomas Leonard 2022-01-28 14:44:20 +00:00 committed by GitHub
commit f16c440cb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 560 additions and 24 deletions

View File

@ -22,6 +22,7 @@ This is an unreleased repository, as it's very much a work-in-progress.
* [Performance](#performance)
* [Networking](#networking)
* [Design Note: Object Capabilities](#design-note-object-capabilities)
* [Buffering and Parsing](#buffering-and-parsing)
* [Filesystem Access](#filesystem-access)
* [Time](#time)
* [Multicore Support](#multicore-support)
@ -481,6 +482,84 @@ However, it still makes non-malicious code easier to understand and test
and may allow for an Ocap extension to the language in the future.
See [Emily][] for a previous attempt at this.
## Buffering and Parsing
Reading from an Eio flow directly may give you more or less data than you wanted.
For example, if you want to read a line of text from a TCP stream,
the flow will tend to give you the data in packet-sized chunks, not lines.
To solve this, you can wrap the flow with a buffer and read from that.
Here's a simple command-line interface that reads `stdin` one line at a time:
```ocaml
let cli ~stdin ~stdout =
let buf = Eio.Buf_read.of_flow stdin ~initial_size:100 ~max_size:1_000_000 in
while true do
let line = Eio.Buf_read.line buf in
traceln "> %s" line;
match line with
| "h" | "help" -> Eio.Flow.copy_string "It's just an example\n" stdout
| x -> Eio.Flow.copy_string (Fmt.str "Unknown command %S\n" x) stdout
done
```
Let's try it with some test data (you could use the real stdin if you prefer):
```ocaml
# Eio_main.run @@ fun env ->
cli
~stdin:(Eio.Flow.string_source "help\nexit\nquit\nbye\nstop\n")
~stdout:(Eio.Stdenv.stdout env);;
+> help
It's just an example
+> exit
Unknown command "exit"
+> quit
Unknown command "quit"
+> bye
Unknown command "bye"
+> stop
Unknown command "stop"
Exception: End_of_file.
```
`Buf_read.of_flow` allocates an internal buffer (with the given `initial_size`).
When you try to read a line from it, it will take a whole line from the buffer if possible.
If not, it will ask the underlying flow for the next chunk of data, until it has enough.
For high performance applications, you should use a larger initial buffer
so that fewer reads on the underlying flow are needed.
If the user enters a line that doesn't fit in the buffer then the buffer will be enlarged as needed.
However, it will raise an exception if the buffer would need to grow above `max_size`.
This is useful when handling untrusted input, since otherwise when you try to read one line an
attacker could just keep sending e.g. 'x' characters until your service ran out of memory and crashed.
As well as calling individual parsers (like `line`) directly,
you can also build larger parsers from smaller ones.
For example:
```ocaml
open Eio.Buf_read.Syntax
type message = { src : string; body : string }
let message =
let+ src = Eio.Buf_read.(string "FROM:" *> line)
and+ body = Eio.Buf_read.take_all in
{ src; body }
```
```ocaml
# Eio_main.run @@ fun _ ->
let flow = Eio.Flow.string_source "FROM:Alice\nHello!\n" in
match Eio.Buf_read.parse message flow ~max_size:1024 with
| Ok { src; body } -> traceln "%s sent %S" src body
| Error (`Msg err) -> traceln "Parse failed: %s" err;;
+Alice sent "Hello!\n"
- : unit = ()
```
## Filesystem Access
Access to the filesystem is also controlled by capabilities, and `env` provides two:

View File

@ -18,6 +18,8 @@
(optint (>= 0.1.0))
(psq (>= 0.2.0))
(fmt (>= 0.8.9))
(astring (and (>= 0.8.5) :with-test))
(crowbar (and (>= 0.2) :with-test))
(alcotest (and (>= 1.4.0) :with-test))))
(package
(name eio_linux)

View File

@ -16,6 +16,8 @@ depends: [
"optint" {>= "0.1.0"}
"psq" {>= "0.2.0"}
"fmt" {>= "0.8.9"}
"astring" {>= "0.8.5" & with-test}
"crowbar" {>= "0.2" & with-test}
"alcotest" {>= "1.4.0" & with-test}
"odoc" {with-doc}
]

4
fuzz/dune Normal file
View File

@ -0,0 +1,4 @@
(test
(package eio)
(libraries cstruct crowbar fmt astring eio)
(name test))

170
fuzz/test.ml Normal file
View File

@ -0,0 +1,170 @@
(* This file contains a simple model of `Buf_read`, using a single string.
It runs random operations on both the model and the real buffer and
checks they always give the same result. *)
open Astring
let debug = false
module Buf_read = Eio.Buf_read
exception Buffer_limit_exceeded = Buf_read.Buffer_limit_exceeded
let initial_size = 10
let max_size = 100
let mock_flow next = object (self : #Eio.Flow.read)
val mutable next = next
method read_methods = []
method read_into buf =
match next with
| [] ->
raise End_of_file
| "" :: xs ->
next <- xs;
self#read_into buf
| x :: xs ->
let len = min (Cstruct.length buf) (String.length x) in
Cstruct.blit_from_string x 0 buf 0 len;
let x' = String.with_index_range x ~first:len in
next <- (if x' = "" then xs else x' :: xs);
len
end
module Model = struct
type t = string ref
let of_chunks chunks = ref (String.concat chunks)
let take_all t =
let old = !t in
if String.length old >= max_size then raise Buffer_limit_exceeded;
t := "";
old
let line t =
match String.cut ~sep:"\n" !t with
| Some (line, rest) ->
if String.length line >= max_size then raise Buffer_limit_exceeded;
t := rest;
if String.is_suffix ~affix:"\r" line then String.with_index_range line ~last:(String.length line - 2)
else line
| None when !t = "" -> raise End_of_file
| None when String.length !t >= max_size -> raise Buffer_limit_exceeded
| None -> take_all t
let any_char t =
match !t with
| "" -> raise End_of_file
| s ->
t := String.with_index_range s ~first:1;
String.get_head s
let peek_char t = String.head !t
let consume t n =
t := String.with_index_range !t ~first:n
let char c t =
match peek_char t with
| Some c2 when c = c2 -> consume t 1
| Some _ -> failwith "char"
| None -> raise End_of_file
let string s t =
if debug then Fmt.pr "string %S@." s;
let len_t = String.length !t in
if not (String.is_prefix ~affix:(String.with_range s ~len:len_t) !t) then failwith "string";
if String.length s > max_size then raise Buffer_limit_exceeded;
if String.is_prefix ~affix:s !t then consume t (String.length s)
else raise End_of_file
let take n t =
if n < 0 then invalid_arg "neg";
if n > max_size then raise Buffer_limit_exceeded
else if String.length !t >= n then (
let data = String.with_range !t ~len:n in
t := String.with_range !t ~first:n;
data
) else raise End_of_file
let take_while p t =
match String.find (Fun.negate p) !t with
| Some i when i >= max_size -> raise Buffer_limit_exceeded
| Some i ->
let data = String.with_range !t ~len:i in
consume t i;
data
| None -> take_all t
let skip_while p t =
match String.find (Fun.negate p) !t with
| Some i -> consume t i
| None -> t := ""
let skip n t =
if n < 0 then invalid_arg "skip";
if n > String.length !t then (
t := "";
raise End_of_file;
);
consume t n
let eof t =
if !t <> "" then failwith "not eof"
end
type op = Op : 'a Crowbar.printer * 'a Buf_read.parser * (Model.t -> 'a) -> op
let unit = Fmt.(const string) "()"
let dump_char f c = Fmt.pf f "%C" c
let digit = function
| '0'..'9' -> true
| _ -> false
let op =
let label (name, gen) = Crowbar.with_printer Fmt.(const string name) gen in
Crowbar.choose @@ List.map label [
"line", Crowbar.const @@ Op (Fmt.Dump.string, Buf_read.line, Model.line);
"char 'x'", Crowbar.const @@ Op (unit, Buf_read.char 'x', Model.char 'x');
"any_char", Crowbar.const @@ Op (dump_char, Buf_read.any_char, Model.any_char);
"peek_char", Crowbar.const @@ Op (Fmt.Dump.option dump_char, Buf_read.peek_char, Model.peek_char);
"string", Crowbar.(map [bytes]) (fun s -> Op (unit, Buf_read.string s, Model.string s));
"take", Crowbar.(map [int]) (fun n -> Op (Fmt.Dump.string, Buf_read.take n, Model.take n));
"take_all", Crowbar.const @@ Op (Fmt.Dump.string, Buf_read.take_all, Model.take_all);
"take_while digit", Crowbar.const @@ Op (Fmt.Dump.string, Buf_read.take_while digit, Model.take_while digit);
"skip_while digit", Crowbar.const @@ Op (unit, Buf_read.skip_while digit, Model.skip_while digit);
"skip", Crowbar.(map [int]) (fun n -> Op (unit, Buf_read.skip n, Model.skip n));
"eof", Crowbar.const @@ Op (unit, Buf_read.eof, Model.eof);
]
let catch f x =
match f x with
| y -> Ok y
| exception End_of_file -> Error "EOF"
| exception Invalid_argument _ -> Error "Invalid"
| exception Failure _ -> Error "Failure"
| exception Buffer_limit_exceeded -> Error "TooBig"
let random chunks ops =
let model = Model.of_chunks chunks in
let chunks_len = String.length !model in
let r = Buf_read.of_flow (mock_flow chunks) ~initial_size ~max_size in
if debug then print_endline "*** start ***";
let check_eq (Op (pp, a, b)) =
if debug then (
Fmt.pr "---@.";
Fmt.pr "real :%S@." (Cstruct.to_string (Buf_read.peek r));
Fmt.pr "model:%S@." !model;
);
let x = catch a r in
let y = catch b model in
Crowbar.check_eq ~pp:Fmt.(result ~ok:pp ~error:string) x y
in
List.iter check_eq ops;
Crowbar.check_eq ~pp:Fmt.int (Buf_read.consumed_bytes r) (chunks_len - String.length !model)
let () =
Crowbar.(add_test ~name:"random ops" [list bytes; list op] random)

0
fuzz/test.mli Normal file
View File

View File

@ -5,11 +5,39 @@ type t = {
mutable pos : int;
mutable len : int;
mutable flow : Flow.read option; (* None if we've seen eof *)
mutable consumed : int; (* Total bytes consumed so far *)
max_size : int;
}
type 'a parser = t -> 'a
let map f x r = f (x r)
let pair x y r =
let a = x r in
let b = y r in
a, b
let bind x f r = f (x r) r
module Syntax = struct
let ( let+ ) x f r = f (x r)
let ( let* ) = bind
let ( and* ) = pair
let ( and+ ) = pair
let ( <* ) a b t =
let x = a t in
ignore (b t);
x
let ( *> ) a b t =
ignore (a t);
b t
end
open Syntax
let capacity t = Bigarray.Array1.dim t.buf
let of_flow ?initial_size ~max_size flow =
@ -17,7 +45,7 @@ let of_flow ?initial_size ~max_size flow =
if max_size <= 0 then Fmt.invalid_arg "Max size %d should be positive!" max_size;
let initial_size = Option.value initial_size ~default:(min 4096 max_size) in
let buf = Bigarray.(Array1.create char c_layout initial_size) in
{ buf; pos = 0; len = 0; flow = Some flow; max_size }
{ buf; pos = 0; len = 0; flow = Some flow; max_size; consumed = 0 }
let peek t =
Cstruct.of_bigarray ~off:t.pos ~len:t.len t.buf
@ -25,15 +53,23 @@ let peek t =
let consume t n =
if n < 0 || n > t.len then Fmt.invalid_arg "Can't consume %d bytes of a %d byte buffer!" n t.len;
t.pos <- t.pos + n;
t.len <- t.len - n
t.len <- t.len - n;
t.consumed <- t.consumed + n
let consume_all t =
t.consumed <- t.consumed + t.len;
t.len <- 0
let buffered_bytes t = t.len
let consumed_bytes t = t.consumed
let eof_seen t = t.flow = None
let ensure t n =
assert (n >= 0);
if t.len < n then (
if n > t.max_size then raise Buffer_limit_exceeded;
(* We don't have enough data yet, so we'll need to do a read. *)
match t.flow with
| None -> raise End_of_file
@ -44,7 +80,6 @@ let ensure t n =
let cap = capacity t in
if n > cap then (
(* [n] bytes won't fit. We need to resize the buffer. *)
if n > t.max_size then raise Buffer_limit_exceeded;
let new_size = max n (min t.max_size (cap * 2)) in
let new_buf = Bigarray.(Array1.create char c_layout new_size) in
Cstruct.blit
@ -102,7 +137,13 @@ let any_char t =
consume t 1;
c
let peek_char t =
match ensure t 1 with
| () -> Some (get t 0)
| exception End_of_file -> None
let take len t =
if len < 0 then Fmt.invalid_arg "take: %d is negative!" len;
ensure t len;
let data = Cstruct.to_string (Cstruct.of_bigarray t.buf ~off:t.pos ~len) in
consume t len;
@ -110,22 +151,23 @@ let take len t =
let string s t =
let rec aux i =
if i = String.length s then true
else if i < t.len then (
if i = String.length s then (
consume t i
) else if i < t.len then (
if get t i = s.[i] then aux (i + 1)
else false
else (
let buf = peek t in
let len = min (String.length s) (Cstruct.length buf) in
Fmt.failwith "Expected %S but got %S"
s
(Cstruct.to_string buf ~off:0 ~len)
)
) else (
ensure t (t.len + 1);
aux i
)
in
if not (aux 0) then (
let buf = peek t in
let len = min (String.length s) (Cstruct.length buf) in
Fmt.failwith "Expected %S but got %S"
s
(Cstruct.to_string buf ~off:0 ~len)
)
aux 0
let take_all t =
try
@ -156,8 +198,36 @@ let take_while p t =
data
let skip_while p t =
let len = count_while p t in
consume t len
let rec aux i =
if i < t.len then (
if p (get t i) then aux (i + 1)
else consume t i
) else (
consume t t.len;
ensure t 1;
aux 0
)
in
try aux 0
with End_of_file -> ()
let rec skip n t =
if n <= t.len then (
consume t n
) else (
let n = n - t.len in
consume_all t;
ensure t (min n (capacity t));
skip n t
)
let skip n t =
if n < 0 then Fmt.invalid_arg "skip: %d is negative!" n;
try skip n t
with End_of_file ->
(* Skip isn't atomic, so discard everything in this case for consistency. *)
consume t t.len;
raise End_of_file
let line t =
(* Return the index of the first '\n', reading more data as needed. *)
@ -181,3 +251,25 @@ let line t =
let line = Cstruct.to_string (Cstruct.of_bigarray t.buf ~off:t.pos ~len) in
consume t (nl + 1);
line
let eof t =
if t.len = 0 && eof_seen t then ()
else (
match ensure t 1 with
| () -> failwith "Unexpected data after parsing"
| exception End_of_file -> ()
)
let pp_pos f t =
Fmt.pf f "at offset %d" (consumed_bytes t)
let format_errors p t =
match p t with
| v -> Ok v
| exception Failure msg -> Fmt.error_msg "%s (%a)" msg pp_pos t
| exception End_of_file -> Fmt.error_msg "Unexpected end-of-file at offset %d" (t.consumed + t.len)
| exception Buffer_limit_exceeded -> Fmt.error_msg "Buffer size limit exceeded when reading %a" pp_pos t
let parse ?initial_size ~max_size p flow =
let buf = of_flow flow ?initial_size ~max_size in
format_errors (p <* eof) buf

View File

@ -486,14 +486,37 @@ end
(** Buffered input and parsing *)
module Buf_read : sig
(** This module provides fairly efficient non-backtracking parsers.
It is modelled on Angstrom's API, and you should use that if
backtracking is needed. *)
type t
exception Buffer_limit_exceeded
type 'a parser = t -> 'a
(** An ['a parser] is a function that consumes and returns a value of type ['a].
@raise Failure The flow can't be parsed as a value of type ['a].
@raise End_of_file The flow ended without enough data to parse an ['a].
@raise Buffer_limit_exceeded The value was larger than the requested maximum buffer size. *)
val parse : ?initial_size:int -> max_size:int -> 'a parser -> #Flow.read -> ('a, [> `Msg of string]) result
(** [parse p flow ~max_size] uses [p] to parse everything in [flow].
It is a convenience function that does
[let buf = of_flow flow ~max_size in format_errors (p <* eof) buf]
@param initial_size see {!of_flow}. *)
val of_flow : ?initial_size:int -> max_size:int -> #Flow.read -> t
(** [of_flow ~max_size flow] is a buffered reader backed by [flow].
@param initial_size The initial amount of memory to allocate for the buffer.
@param max_size The maximum size to which the buffer may grow. *)
@param max_size The maximum size to which the buffer may grow.
This must be large enough to hold the largest single item
you want to parse (e.g. the longest line, if using
{!line}), plus any terminator needed to know the value is
complete (e.g. the newline character(s)). This is just to
prevent a run-away input from consuming all memory, and
you can usually just set it much larger than you expect
to need. *)
val as_flow : t -> Flow.read
(** [as_flow t] is a buffered flow. Reading from it will return data from the buffer,
@ -501,12 +524,6 @@ module Buf_read : sig
(** {2 Reading data} *)
type 'a parser = t -> 'a
(** An ['a parser] is a function that consumes and returns a value of type ['a].
@raise Failure The flow can't be parsed as a value of type ['a].
@raise End_of_file The flow ended without enough data to parse an ['a].
@raise Buffer_limit_exceeded The value was larger than the maximum requested buffer size. *)
val line : string parser
(** [line t] parses one line.
Lines can be terminated by either LF or CRLF.
@ -521,6 +538,10 @@ module Buf_read : sig
val any_char : char parser
(** [any_char] parses one character. *)
val peek_char : char option parser
(** [peek_char] returns [Some c] where [c] is the next character, but does not consume it.
Returns [None] at the end of the input stream rather than raising [End_of_file]. *)
val string : string -> unit parser
(** [string s] checks that [s] is the next string in the stream and consumes it.
@raise Failure if [s] is not a prefix of the stream. *)
@ -531,7 +552,8 @@ module Buf_read : sig
val take_all : string parser
(** [take_all] takes all remaining data until end-of-file.
Returns [""] if already at end-of-file.
@raise Buffer_limit_exceeded if the remaining data exceeds the buffer limit *)
@raise Buffer_limit_exceeded if the remaining data exceeds or equals the buffer limit
(it needs one extra byte to confirm it has reached end-of-file). *)
val take_while : (char -> bool) -> string parser
(** [take_while p] finds the first byte for which [p] is false
@ -542,7 +564,60 @@ module Buf_read : sig
val skip_while : (char -> bool) -> unit parser
(** [skip_while p] skips zero or more bytes for which [p] is [true].
[skip_while p t] does the same thing as [ignore (take_while p t)]. *)
[skip_while p t] does the same thing as [ignore (take_while p t)],
except that it is not limited by the buffer size. *)
val skip : int -> unit parser
(** [skip n] discards the next [n] bytes.
[skip n] = [map ignore (take n)],
except that the number of skipped bytes may be larger than the buffer (it will not grow).
Note: if [End_of_file] is raised, all bytes in the stream will have been consumed. *)
val eof : unit parser
(** [eof] checks that there are no further bytes in the stream.
@raise Failure if there are further bytes *)
(** {2 Combinators} *)
val pair : 'a parser -> 'b parser -> ('a * 'b) parser
(** [pair a b] is a parser that first uses [a] to parse a value [x],
then uses [b] to parse a value [y], then returns [(x, y)].
Note that this module does not support backtracking, so if [b] fails
then the bytes consumed by [a] are lost. *)
val map : ('a -> 'b) -> ('a parser -> 'b parser)
(** [map f a] is a parser that parses the stream with [a] to get [v],
and then returns [f v]. *)
val bind : 'a parser -> ('a -> 'b parser) -> 'b parser
(** [bind a f] is a parser that first uses [a] to parse a value [v],
then uses [f v] to select the next parser, and then uses that. *)
val format_errors : 'a parser -> ('a, [> `Msg of string]) result parser
(** [format_errors p] catches [Failure], [End_of_file] and
[Buffer_limit_exceeded] exceptions and returns them as a formatted error message. *)
module Syntax : sig
val ( let+ ) : 'a parser -> ('a -> 'b) -> 'b parser
(** Syntax for {!map}. *)
val ( let* ) : 'a parser -> ('a -> 'b parser) -> 'b parser
(** Syntax for {!bind} *)
val ( and+ ) : 'a parser -> 'b parser -> ('a * 'b) parser
(** Syntax for {!pair} *)
val ( and* ) : 'a parser -> 'b parser -> ('a * 'b) parser
(** Syntax for {!pair} (same as [and+]). *)
val ( <* ) : 'a parser -> 'b parser -> 'a parser
(** [a <* b] is [map fst (pair a b)].
It parses two things and keeps only the first. *)
val ( *> ) : 'a parser -> 'b parser -> 'b parser
(** [a *> b] is [map snd (pair a b)].
It parses two things and keeps only the second. *)
end
(** {2 Low-level API} *)
@ -567,6 +642,10 @@ module Buf_read : sig
(** [consume t n] discards the first [n] bytes from [t].
[buffered_bytes t' = buffered_bytes t - n] *)
val consumed_bytes : t -> int
(** [consumed_bytes t] is the total number of bytes consumed.
i.e. it is the offset into the stream of the next byte to be parsed. *)
val eof_seen : t -> bool
(** [eof_seen t] indicates whether we've received [End_of_file] from the underlying flow.
If so, there will never be any further data beyond what [peek] already returns. *)

View File

@ -3,6 +3,7 @@
```
```ocaml
module R = Eio.Buf_read;;
open R.Syntax;;
let traceln fmt = Fmt.pr ("+" ^^ fmt ^^ "@.")
@ -43,6 +44,11 @@ let read flow n =
let is_digit = function
| '0'..'9' -> true
| _ -> false
let test ?(max_size=10) input p =
next := input;
let i = R.of_flow mock_flow ~max_size in
p i
```
@ -250,17 +256,27 @@ Exception: End_of_file.
```ocaml
# let i = R.of_flow mock_flow ~max_size:100;;
val i : R.t = <abstr>
# next := ["ab"; "c"]; R.any_char i;;
+mock_flow returning 2 bytes
- : char = 'a'
# R.peek_char i;;
- : char option = Some 'b'
# R.any_char i;;
- : char = 'b'
# R.any_char i;;
+mock_flow returning 1 bytes
- : char = 'c'
# R.any_char i;;
+mock_flow returning Eof
Exception: End_of_file.
# R.peek_char i;;
- : char option = None
```
## Fixed-length strings
@ -301,6 +317,8 @@ Exception: End_of_file.
Exception: End_of_file.
# R.string "bc" i;;
- : unit = ()
# peek i;;
- : string = ""
```
## Scanning
@ -308,16 +326,33 @@ Exception: End_of_file.
```ocaml
# let i = R.of_flow mock_flow ~max_size:100;;
val i : R.t = <abstr>
# next := ["aa"; "a0"; "123de"]; R.skip_while ((=) 'a') i;;
+mock_flow returning 2 bytes
+mock_flow returning 2 bytes
- : unit = ()
# R.take_while is_digit i;;
+mock_flow returning 5 bytes
- : string = "0123"
# R.take_while (Fun.negate is_digit) i;;
+mock_flow returning Eof
- : string = "de"
# test ["abc"; "def"; "ghi"] (R.skip 5 *> R.take_all);;
+mock_flow returning 3 bytes
+mock_flow returning 3 bytes
+mock_flow returning 3 bytes
+mock_flow returning Eof
- : string = "fghi"
# test ~max_size:3 ["abcdefg"] (R.skip 5 *> R.take_all);;
+mock_flow returning 3 bytes
+mock_flow returning 3 bytes
+mock_flow returning 1 bytes
+mock_flow returning Eof
- : string = "fg"
```
## Take all
@ -347,3 +382,76 @@ Exception: Eio__Buf_read.Buffer_limit_exceeded.
# R.take 3 i;;
- : string = "abc"
```
## Combinators
Parsers can be combined in the usual ways:
```ocaml
# test ["abc"] (R.map String.uppercase_ascii (R.take 2));;
+mock_flow returning 3 bytes
- : string = "AB"
# test ["abc"] (R.pair R.any_char R.take_all);;
+mock_flow returning 3 bytes
+mock_flow returning Eof
- : char * string = ('a', "bc")
# test ["abc"] (R.bind R.any_char R.char);;
+mock_flow returning 3 bytes
Exception: Failure "Expected 'a' but got 'b'".
```
Syntax:
```ocaml
# test ["abc"] (let+ x = R.take 2 in String.uppercase_ascii x);;
+mock_flow returning 3 bytes
- : string = "AB"
# test ["abc"] (let+ x = R.any_char and+ y = R.take_all in (x, y));;
+mock_flow returning 3 bytes
+mock_flow returning Eof
- : char * string = ('a', "bc")
# test ["abc"] (let* x = R.any_char in R.char x);;
+mock_flow returning 3 bytes
Exception: Failure "Expected 'a' but got 'b'".
# test ["aac"] (let* x = R.any_char in R.char x *> R.take_all);;
+mock_flow returning 3 bytes
+mock_flow returning Eof
- : string = "c"
# test ["ab"] (R.any_char <* R.any_char);;
+mock_flow returning 2 bytes
- : char = 'a'
# test ["ab"] (R.any_char *> R.any_char);;
+mock_flow returning 2 bytes
- : char = 'b'
```
## Error handling
```ocaml
# test ["abc"] R.(format_errors (take 3));;
+mock_flow returning 3 bytes
- : (string, [> `Msg of string ]) result = Ok "abc"
# test ["abc"] R.(format_errors (take 2 <* eof));;
+mock_flow returning 3 bytes
- : (string, [> `Msg of string ]) result =
Error (`Msg "Unexpected data after parsing (at offset 2)")
# test ["abc"] R.(format_errors (take 4 <* eof));;
+mock_flow returning 3 bytes
+mock_flow returning Eof
- : (string, [> `Msg of string ]) result =
Error (`Msg "Unexpected end-of-file at offset 3")
# test ~max_size:2 ["abc"] R.(format_errors line);;
+mock_flow returning 2 bytes
- : (string, [> `Msg of string ]) result =
Error (`Msg "Buffer size limit exceeded when reading at offset 0")
```