Ensure Buf_write doesn't leak fibers

A writer could be released while a fiber was waiting for a flush to
complete. Now `Buf_write.create` takes a switch and aborts any waiting
fibers when the switch finishes.
This commit is contained in:
Thomas Leonard 2022-06-28 09:41:12 +01:00
parent bdfb3faa64
commit 9188b31cde
3 changed files with 109 additions and 20 deletions

View File

@ -141,9 +141,12 @@ module Buffers = Deque(struct
end)
module Flushes = Deque(struct
type t = int * (unit -> unit)
let sentinel = 0, fun () -> ()
end)
type t = int * ((unit, exn) result Promise.u)
let sentinel =
let _, r = Promise.create () in
Promise.resolve_ok r ();
0, r
end)
type state =
| Active
@ -177,8 +180,7 @@ let of_buffer buffer =
; id = Ctf.mint_id ()
}
let create size =
of_buffer (Bigstringaf.create size)
exception Released
let writable_exn t =
match t.state with
@ -368,6 +370,22 @@ let is_closed t =
| Closed -> true
| Active | Paused -> false
let abort t =
close t;
let rec aux () =
match Flushes.dequeue_exn t.flushed with
| exception Dequeue_empty -> ()
| (_threshold, r) ->
Promise.resolve_error r Released;
aux ()
in
aux ()
let create ~sw size =
let t = of_buffer (Bigstringaf.create size) in
Switch.on_release sw (fun () -> abort t);
t
let pending_bytes t =
(t.write_pos - t.scheduled_pos) + (t.bytes_received - t.bytes_written)
@ -392,8 +410,8 @@ let flush t =
unpause t;
if not (Buffers.is_empty t.scheduled) then (
let p, r = Promise.create () in
Flushes.enqueue (t.bytes_received, Promise.resolve r) t.flushed;
Promise.await p
Flushes.enqueue (t.bytes_received, r) t.flushed;
Promise.await_exn p
)
let rec shift_buffers t written =
@ -414,12 +432,12 @@ let rec shift_buffers t written =
let rec shift_flushes t =
match Flushes.dequeue_exn t.flushed with
| exception Dequeue_empty -> ()
| (threshold, f) as flush ->
| (threshold, r) as flush ->
(* Be careful: [bytes_written] and [threshold] both wrap, so subtract first. *)
if t.bytes_written - threshold >= 0 then (
(* We have written at least up to [threshold]
(or we're more than [max_int] behind, which we assume won't happen). *)
f ();
Promise.resolve_ok r ();
shift_flushes t
) else (
Flushes.enqueue_front flush t.flushed
@ -467,8 +485,8 @@ let as_flow t =
end
let with_flow ?(initial_size=0x1000) flow fn =
let t = create initial_size in
Switch.run @@ fun sw ->
let t = create ~sw initial_size in
Fiber.fork ~sw (fun () -> Flow.copy (as_flow t) flow);
Fun.protect ~finally:(fun () -> close t) (fun () -> fn t)

View File

@ -262,10 +262,11 @@ val is_closed : t -> bool
Low-level operations for running a serializer. *)
val create : int -> t
(** [create len] creates a serializer with a fixed-length internal buffer of
val create : sw:Switch.t -> int -> t
(** [create ~sw len] creates a serializer with a fixed-length internal buffer of
length [len]. See the Buffered writes section for details about what happens
when [len] is not large enough to support a write. *)
when [len] is not large enough to support a write.
When [sw] is finished, any pending flush operations immediately fail. *)
val of_buffer : Cstruct.buffer -> t
(** [of_buffer buf] creates a serializer, using [buf] as its internal

View File

@ -251,7 +251,8 @@ Cancelled while waiting for some data:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let t = Write.create 100 in
Switch.run @@ fun sw ->
let t = Write.create ~sw 100 in
Fiber.both
(fun () -> ignore (Write.await_batch t); assert false)
(fun () -> failwith "Simulated error");;
@ -271,8 +272,8 @@ Exception: Failure "Simulated error".
## Serialize
```ocaml
let foobar () =
let t = Write.create 100 in
let foobar ~sw =
let t = Write.create ~sw 100 in
Write.string t "foo";
Write.cstruct t (Cstruct.of_string "bar");
Write.close t;
@ -281,7 +282,8 @@ let foobar () =
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.serialize (foobar ()) @@ fun bufs ->
Switch.run @@ fun sw ->
Write.serialize (foobar ~sw) @@ fun bufs ->
traceln "Write %a" Fmt.(Dump.list (using Cstruct.to_string Dump.string)) bufs;
Ok (Cstruct.lenv bufs);;
+Write ["foobar"]
@ -290,7 +292,8 @@ let foobar () =
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Write.serialize (foobar ()) @@ fun bufs ->
Switch.run @@ fun sw ->
Write.serialize (foobar ~sw) @@ fun bufs ->
assert (bufs <> []);
traceln "Write %a" Fmt.(Dump.list (using Cstruct.to_string Dump.string)) bufs;
Error `Closed;;
@ -299,12 +302,16 @@ let foobar () =
```
```ocaml
# Write.serialize_to_string (foobar ());;
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
Write.serialize_to_string (foobar ~sw);;
- : string = "foobar"
```
```ocaml
# Write.serialize_to_cstruct (foobar ()) |> Cstruct.to_string;;
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun sw ->
Write.serialize_to_cstruct (foobar ~sw) |> Cstruct.to_string;;
- : string = "foobar"
```
@ -336,3 +343,66 @@ But we don't flush if cancelled:
(fun () -> failwith "Simulated error");;
Exception: Failure "Simulated error".
```
## Cleanup
Ensure that we don't lose flushing fibers if the writer is aborted:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Switch.run @@ fun main_sw ->
Switch.run (fun sw ->
let t = Write.create ~sw 100 in
Fiber.fork ~sw:main_sw
(fun () ->
Write.string t "foo";
try Write.flush t; assert false
with ex -> traceln "Flush failed: %a" Fmt.exn ex
);
traceln "Finishing writer switch"
);
Fiber.yield ();
traceln "Finishing main switch";;
+Finishing writer switch
+Flush failed: Eio__Buf_write.Released
+Finishing main switch
- : unit = ()
```
And with `with_flow`:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow [`Raise (Failure "Simulated IO error")];
Switch.run @@ fun sw ->
Write.with_flow flow @@ fun t ->
Fiber.fork ~sw (fun () ->
Write.string t "foo";
try Write.flush t; assert false
with ex -> traceln "Flush failed: %a" Fmt.exn ex
);
traceln "with_flow returning; t will be closed";;
+with_flow returning; t will be closed
+Flush failed: Eio__Buf_write.Released
Exception: Failure "Simulated IO error".
```
But the flush does succeed in the normal case:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
Eio_mock.Flow.on_copy_bytes flow [`Yield_then (`Return 2); `Return 1];
Switch.run @@ fun sw ->
Write.with_flow flow @@ fun t ->
Fiber.fork ~sw (fun () ->
Write.string t "foo";
Write.flush t;
traceln "Flush succeeded"
);
traceln "with_flow returning; t should be closed but not aborted";;
+with_flow returning; t should be closed but not aborted
+flow: wrote "fo"
+flow: wrote "o"
+Flush succeeded
- : unit = ()
```