From 3d8dfd9a2ab0d8693715669ab877c8fdad4146c8 Mon Sep 17 00:00:00 2001 From: Bikal Lem Date: Wed, 12 Jan 2022 12:30:31 +0000 Subject: [PATCH] lib_eio: add Stream.is_empty and length Implements `length` and uses it in `is_empty`. --- lib_eio/eio.mli | 10 ++++++++-- lib_eio/stream.ml | 8 ++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/lib_eio/eio.mli b/lib_eio/eio.mli index 3124c27..507d466 100644 --- a/lib_eio/eio.mli +++ b/lib_eio/eio.mli @@ -308,7 +308,13 @@ module Stream : sig it returns [None] if the stream is empty rather than waiting. Note that if another domain may add to the stream then a [None] result may already be out-of-date by the time this returns. *) -end + + val length : 'a t -> int + (** [length t] returns the number of items currently in [t]. *) + + val is_empty : 'a t -> bool + (** [is_empty t] is [length t = 0]. *) +end (** Cancelling other fibres when an exception occurs. *) module Cancel : sig @@ -734,7 +740,7 @@ module Private : sig type 'a enqueue = ('a, exn) result -> unit (** A function provided by the scheduler to reschedule a previously-suspended thread. *) - type _ eff += + type _ eff += | Suspend : (Fibre_context.t -> 'a enqueue -> unit) -> 'a eff (** [Suspend fn] is performed when a fibre must be suspended (e.g. because it called {!Promise.await} on an unresolved promise). diff --git a/lib_eio/stream.ml b/lib_eio/stream.ml index 11a43a2..de19806 100644 --- a/lib_eio/stream.ml +++ b/lib_eio/stream.ml @@ -118,3 +118,11 @@ let take_nonblocking t = end; Mutex.unlock t.mutex; Some v + +let length t = + Mutex.lock t.mutex; + let len = Queue.length t.items in + Mutex.unlock t.mutex; + len + +let is_empty t = (length t = 0)