mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-07-21 00:01:30 -04:00
Compare commits
3 Commits
60e67472e3
...
9e1872c10c
Author | SHA1 | Date | |
---|---|---|---|
|
9e1872c10c | ||
|
ddcf89983d | ||
|
5a11ea6df2 |
@ -226,7 +226,7 @@ We can run the previous code with tracing enabled (writing to a new `trace.ctf`
|
||||
|
||||
```ocaml
|
||||
# let () =
|
||||
Eio_unix.Ctf.with_tracing "trace.ctf" @@ fun () ->
|
||||
Eio_unix.Trace.with_tracing "trace.ctf" @@ fun () ->
|
||||
Eio_main.run main;;
|
||||
+x = 1
|
||||
+y = 1
|
||||
|
@ -22,7 +22,7 @@ type t = {
|
||||
domain : Domain.id; (* Prevent access from other domains *)
|
||||
}
|
||||
and fiber_context = {
|
||||
tid : Ctf.id;
|
||||
tid : Trace.id;
|
||||
mutable cancel_context : t;
|
||||
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
|
||||
mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *)
|
||||
@ -194,8 +194,8 @@ module Fiber_context = struct
|
||||
t.cancel_fn <- ignore
|
||||
|
||||
let make ~cc ~vars =
|
||||
let tid = Ctf.mint_id () in
|
||||
Ctf.note_created tid Ctf.Task;
|
||||
let tid = Trace.mint_id () in
|
||||
Trace.create tid Fiber;
|
||||
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
|
||||
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
|
||||
t
|
||||
|
@ -1,110 +0,0 @@
|
||||
(** This library is used to write event traces in mirage-profile's CTF format. *)
|
||||
|
||||
type id = private int
|
||||
(** Each thread/fiber/promise is identified by a unique ID. *)
|
||||
|
||||
(** {2 Recording events}
|
||||
Libraries and applications can use these functions to make the traces more useful. *)
|
||||
|
||||
val label : string -> unit
|
||||
(** [label msg] attaches text [msg] to the current thread. *)
|
||||
|
||||
val note_increase : string -> int -> unit
|
||||
(** [note_increase counter delta] records that [counter] increased by [delta].
|
||||
If [delta] is negative, this records a decrease. *)
|
||||
|
||||
val note_counter_value : string -> int -> unit
|
||||
(** [note_counter_value counter value] records that [counter] is now [value]. *)
|
||||
|
||||
val should_resolve : id -> unit
|
||||
(** [should_resolve id] records that [id] is expected to resolve, and should be highlighted if it doesn't. *)
|
||||
|
||||
(** {2 Recording system events}
|
||||
These are normally only called by the scheduler. *)
|
||||
|
||||
type hiatus_reason =
|
||||
| Wait_for_work
|
||||
| Suspend
|
||||
| Hibernate
|
||||
|
||||
type event =
|
||||
| Wait
|
||||
| Task
|
||||
| Bind
|
||||
| Try
|
||||
| Choose
|
||||
| Pick
|
||||
| Join
|
||||
| Map
|
||||
| Condition
|
||||
| On_success
|
||||
| On_failure
|
||||
| On_termination
|
||||
| On_any
|
||||
| Ignore_result
|
||||
| Async
|
||||
| Promise
|
||||
| Semaphore
|
||||
| Switch
|
||||
| Stream
|
||||
| Mutex
|
||||
(** Types of threads or other recorded objects. *)
|
||||
|
||||
val mint_id : unit -> id
|
||||
(** [mint_id ()] is a fresh unique [id]. *)
|
||||
|
||||
val note_created : ?label:string -> id -> event -> unit
|
||||
(** [note_created t id ty] records the creation of [id]. *)
|
||||
|
||||
val note_read : ?reader:id -> id -> unit
|
||||
(** [note_read src] records that promise [src]'s value was read.
|
||||
@param reader The thread doing the read (default is the current thread). *)
|
||||
|
||||
val note_try_read : id -> unit
|
||||
(** [note_try_read src] records that the current thread wants to read from [src] (which is not currently ready). *)
|
||||
|
||||
val note_switch : id -> unit
|
||||
(** [note_switch id] records that [id] is now the current thread. *)
|
||||
|
||||
val note_hiatus : hiatus_reason -> unit
|
||||
(** [note_hiatus r] records that the system will sleep for reason [r]. *)
|
||||
|
||||
val note_resume : id -> unit
|
||||
(** [note_resume id] records that the system has resumed (used after {!note_hiatus}),
|
||||
and is now running [id]. *)
|
||||
|
||||
val note_fork : unit -> id
|
||||
(** [note_fork ()] records that a new thread has been forked and returns a fresh ID for it. *)
|
||||
|
||||
val note_resolved : id -> ex:exn option -> unit
|
||||
(** [note_resolved id ~ex] records that [id] is now resolved.
|
||||
If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *)
|
||||
|
||||
val note_signal : ?src:id -> id -> unit
|
||||
(** [note_signal ~src dst] records that [dst] was signalled.
|
||||
@param src The thread sending the signal (default is the current thread). *)
|
||||
|
||||
(** {2 Controlling tracing} *)
|
||||
|
||||
type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
|
||||
|
||||
module Control : sig
|
||||
type t
|
||||
|
||||
val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t
|
||||
(** [make ~timestamper b] is a trace buffer that record events in [b].
|
||||
In most cases, the {!Ctf_unix} module provides a simpler interface. *)
|
||||
|
||||
val start : t -> unit
|
||||
(** [start t] begins recording events in [t]. *)
|
||||
|
||||
val stop : t -> unit
|
||||
(** [stop t] stops recording to [t] (which must be the current trace buffer). *)
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
||||
module BS : sig
|
||||
val set_int8 : Cstruct.buffer -> int -> int -> unit
|
||||
val set_int64_le : Cstruct.buffer -> int -> int64 -> unit
|
||||
end
|
@ -15,7 +15,7 @@ let default_traceln ?__POS__:pos fmt =
|
||||
Format.pp_close_box f ();
|
||||
Format.pp_print_flush f ();
|
||||
let msg = Buffer.contents b in
|
||||
Ctf.label msg;
|
||||
Trace.label msg;
|
||||
let lines = String.split_on_char '\n' msg in
|
||||
Mutex.lock traceln_mutex;
|
||||
Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () ->
|
||||
|
@ -7,7 +7,7 @@ module Private = struct
|
||||
module Suspend = Suspend
|
||||
module Cells = Cells
|
||||
module Broadcast = Broadcast
|
||||
module Ctf = Ctf
|
||||
module Trace = Trace
|
||||
module Fiber_context = Cancel.Fiber_context
|
||||
module Debug = Debug
|
||||
|
||||
|
@ -571,7 +571,7 @@ end
|
||||
|
||||
(** @canonical Eio.Private *)
|
||||
module Private : sig
|
||||
module Ctf = Ctf
|
||||
module Trace = Trace
|
||||
|
||||
module Cells = Cells
|
||||
module Broadcast = Broadcast
|
||||
@ -586,7 +586,7 @@ module Private : sig
|
||||
val destroy : t -> unit
|
||||
(** [destroy t] removes [t] from its cancellation context. *)
|
||||
|
||||
val tid : t -> Ctf.id
|
||||
val tid : t -> Trace.id
|
||||
|
||||
(** {2 Cancellation}
|
||||
|
||||
|
@ -19,10 +19,10 @@ let fork ~sw f =
|
||||
Switch.with_op sw @@ fun () ->
|
||||
match f () with
|
||||
| () ->
|
||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||
| exception ex ->
|
||||
Switch.fail sw ex; (* The [with_op] ensures this will succeed *)
|
||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
||||
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
||||
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
|
||||
|
||||
let fork_daemon ~sw f =
|
||||
@ -35,13 +35,13 @@ let fork_daemon ~sw f =
|
||||
match f () with
|
||||
| `Stop_daemon ->
|
||||
(* The daemon asked to stop. *)
|
||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
|
||||
(* The daemon was cancelled because all non-daemon fibers are finished. *)
|
||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||
| exception ex ->
|
||||
Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *)
|
||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
||||
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
||||
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
|
||||
|
||||
let fork_promise ~sw f =
|
||||
|
@ -3,7 +3,7 @@ type 'a state =
|
||||
| Unresolved of Broadcast.t
|
||||
|
||||
type !'a promise = {
|
||||
id : Ctf.id;
|
||||
id : Trace.id;
|
||||
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
|
||||
}
|
||||
|
||||
@ -25,20 +25,20 @@ let create_with_id id =
|
||||
to_public_promise t, to_public_resolver t
|
||||
|
||||
let create ?label () =
|
||||
let id = Ctf.mint_id () in
|
||||
Ctf.note_created ?label id Ctf.Promise;
|
||||
let id = Trace.mint_id () in
|
||||
Trace.create ?label id Promise;
|
||||
create_with_id id
|
||||
|
||||
let create_resolved x =
|
||||
let id = Ctf.mint_id () in
|
||||
Ctf.note_created id Ctf.Promise;
|
||||
let id = Trace.mint_id () in
|
||||
Trace.create id Promise;
|
||||
to_public_promise { id; state = Atomic.make (Resolved x) }
|
||||
|
||||
let await t =
|
||||
let t = of_public_promise t in
|
||||
match Atomic.get t.state with
|
||||
| Resolved x ->
|
||||
Ctf.note_read t.id;
|
||||
Trace.read t.id;
|
||||
x
|
||||
| Unresolved b ->
|
||||
Suspend.enter (fun ctx enqueue ->
|
||||
@ -53,7 +53,7 @@ let await t =
|
||||
| Unresolved _ ->
|
||||
(* We observed the promise to be still unresolved after registering a waiter.
|
||||
Therefore any resolution must happen after we were registered and we will be notified. *)
|
||||
Ctf.note_try_read t.id;
|
||||
Trace.try_read t.id;
|
||||
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
|
||||
if Broadcast.cancel request then enqueue (Error ex)
|
||||
(* else already resumed *)
|
||||
@ -61,7 +61,7 @@ let await t =
|
||||
);
|
||||
match Atomic.get t.state with
|
||||
| Resolved x ->
|
||||
Ctf.note_read t.id;
|
||||
Trace.read t.id;
|
||||
x
|
||||
| Unresolved _ -> assert false
|
||||
|
||||
@ -76,7 +76,7 @@ let resolve t v =
|
||||
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
|
||||
| Unresolved b as prev ->
|
||||
if Atomic.compare_and_set t.state prev (Resolved v) then (
|
||||
Ctf.note_resolved t.id ~ex:None;
|
||||
Trace.resolve t.id ~ex:None;
|
||||
Broadcast.resume_all b
|
||||
) else (
|
||||
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
|
||||
|
@ -19,6 +19,6 @@ let await t id =
|
||||
t.wake <- (fun x ->
|
||||
Cancel.Fiber_context.clear_cancel_fn ctx;
|
||||
t.wake <- ignore;
|
||||
Ctf.note_read ~reader:id ctx.tid;
|
||||
Trace.read ~reader:id ctx.tid;
|
||||
enqueue x
|
||||
)
|
||||
|
@ -1,5 +1,5 @@
|
||||
type t = {
|
||||
id : Ctf.id;
|
||||
id : Trace.id;
|
||||
mutable fibers : int; (* Total, including daemon_fibers and the main function *)
|
||||
mutable daemon_fibers : int;
|
||||
mutable exs : (exn * Printexc.raw_backtrace) option;
|
||||
@ -51,7 +51,7 @@ let combine_exn ex = function
|
||||
let fail ?(bt=Printexc.get_raw_backtrace ()) t ex =
|
||||
check_our_domain t;
|
||||
if t.exs = None then
|
||||
Ctf.note_resolved t.id ~ex:(Some ex);
|
||||
Trace.resolve t.id ~ex:(Some ex);
|
||||
t.exs <- Some (combine_exn (ex, bt) t.exs);
|
||||
try
|
||||
Cancel.cancel t.cancel ex
|
||||
@ -91,7 +91,7 @@ let or_raise = function
|
||||
let rec await_idle t =
|
||||
(* Wait for fibers to finish: *)
|
||||
while t.fibers > 0 do
|
||||
Ctf.note_try_read t.id;
|
||||
Trace.try_read t.id;
|
||||
Single_waiter.await t.waiter t.id
|
||||
done;
|
||||
(* Call on_release handlers: *)
|
||||
@ -118,8 +118,8 @@ let maybe_raise_exs t =
|
||||
| Some (ex, bt) -> Printexc.raise_with_backtrace ex bt
|
||||
|
||||
let create cancel =
|
||||
let id = Ctf.mint_id () in
|
||||
Ctf.note_created id Ctf.Switch;
|
||||
let id = Trace.mint_id () in
|
||||
Trace.create id Switch;
|
||||
{
|
||||
id;
|
||||
fibers = 1; (* The main function counts as a fiber *)
|
||||
@ -135,7 +135,7 @@ let run_internal t fn =
|
||||
| v ->
|
||||
dec_fibers t;
|
||||
await_idle t;
|
||||
Ctf.note_read t.id;
|
||||
Trace.read t.id;
|
||||
maybe_raise_exs t; (* Check for failure while finishing *)
|
||||
(* Success. *)
|
||||
v
|
||||
@ -146,7 +146,7 @@ let run_internal t fn =
|
||||
dec_fibers t;
|
||||
fail ~bt t ex;
|
||||
await_idle t;
|
||||
Ctf.note_read t.id;
|
||||
Trace.read t.id;
|
||||
maybe_raise_exs t;
|
||||
assert false
|
||||
|
||||
|
@ -47,27 +47,8 @@ let mint_id () =
|
||||
Domain.DLS.set next_id_key next_id_local_succ;
|
||||
next_id_local
|
||||
|
||||
type hiatus_reason =
|
||||
| Wait_for_work
|
||||
| Suspend
|
||||
| Hibernate
|
||||
|
||||
type event =
|
||||
| Wait
|
||||
| Task
|
||||
| Bind
|
||||
| Try
|
||||
| Choose
|
||||
| Pick
|
||||
| Join
|
||||
| Map
|
||||
| Condition
|
||||
| On_success
|
||||
| On_failure
|
||||
| On_termination
|
||||
| On_any
|
||||
| Ignore_result
|
||||
| Async
|
||||
type ty =
|
||||
| Fiber
|
||||
| Promise
|
||||
| Semaphore
|
||||
| Switch
|
||||
@ -80,21 +61,7 @@ let current_thread = ref (-1)
|
||||
|
||||
let int_of_thread_type t =
|
||||
match t with
|
||||
| Wait -> 0
|
||||
| Task -> 1
|
||||
| Bind -> 2
|
||||
| Try -> 3
|
||||
| Choose -> 4
|
||||
| Pick -> 5
|
||||
| Join -> 6
|
||||
| Map -> 7
|
||||
| Condition -> 8
|
||||
| On_success -> 9
|
||||
| On_failure -> 10
|
||||
| On_termination -> 11
|
||||
| On_any -> 12
|
||||
| Ignore_result -> 13
|
||||
| Async -> 14
|
||||
| Fiber -> 1
|
||||
| Promise -> 15
|
||||
| Semaphore -> 16
|
||||
| Switch -> 17
|
||||
@ -207,12 +174,12 @@ module Control = struct
|
||||
let op_fails = 3
|
||||
(* let op_becomes = 4 *)
|
||||
let op_label = 5
|
||||
let op_increase = 6
|
||||
(* let op_increase = 6 *)
|
||||
let op_switch = 7
|
||||
(* let op_gc = 8 *)
|
||||
(* let op_old_signal = 9 *)
|
||||
let op_try_read = 10
|
||||
let op_counter_value = 11
|
||||
(* let op_counter_value = 11 *)
|
||||
let op_read_later = 12
|
||||
let op_signal = 13
|
||||
|
||||
@ -329,20 +296,6 @@ module Control = struct
|
||||
|> write_string log.log msg
|
||||
|> end_event
|
||||
|
||||
let note_increase log counter amount =
|
||||
add_event log op_increase (17 + String.length counter)
|
||||
|> write_tid log.log !current_thread
|
||||
|> write64 log.log (Int64.of_int amount)
|
||||
|> write_string log.log counter
|
||||
|> end_event
|
||||
|
||||
let note_counter_value log counter value =
|
||||
add_event log op_counter_value (17 + String.length counter)
|
||||
|> write_tid log.log !current_thread
|
||||
|> write64 log.log (Int64.of_int value)
|
||||
|> write_string log.log counter
|
||||
|> end_event
|
||||
|
||||
let note_switch log new_current =
|
||||
if new_current <> !current_thread then (
|
||||
current_thread := new_current;
|
||||
@ -397,42 +350,34 @@ let label name =
|
||||
| None -> ()
|
||||
| Some log -> Control.note_label log !current_thread name
|
||||
|
||||
let note_fork () =
|
||||
let child = mint_id () in
|
||||
begin match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_created log child Task
|
||||
end;
|
||||
child
|
||||
|
||||
let note_created ?label id ty =
|
||||
let create ?label id ty =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log ->
|
||||
Control.note_created log id ty;
|
||||
Option.iter (Control.note_label log id) label
|
||||
|
||||
let note_switch new_current =
|
||||
let fiber new_current =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_switch log new_current
|
||||
|
||||
let note_hiatus _reason =
|
||||
let hiatus () =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_suspend log ()
|
||||
|
||||
let note_resume new_current =
|
||||
let resume new_current =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_switch log new_current
|
||||
|
||||
let note_try_read input =
|
||||
let try_read input =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_try_read log !current_thread input
|
||||
|
||||
let note_read ?reader input =
|
||||
let read ?reader input =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log ->
|
||||
@ -443,12 +388,12 @@ let note_read ?reader input =
|
||||
in
|
||||
Control.note_read log ~reader input
|
||||
|
||||
let note_resolved id ~ex =
|
||||
let resolve id ~ex =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_resolved log id ~ex
|
||||
|
||||
let note_signal ?src dst =
|
||||
let signal ?src dst =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log ->
|
||||
@ -458,18 +403,3 @@ let note_signal ?src dst =
|
||||
| Some x -> x
|
||||
in
|
||||
Control.note_signal ~src log dst
|
||||
|
||||
let note_increase counter amount =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_increase log counter amount
|
||||
|
||||
let note_counter_value counter value =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_counter_value log counter value
|
||||
|
||||
let should_resolve thread =
|
||||
match !Control.event_log with
|
||||
| None -> ()
|
||||
| Some log -> Control.note_label log thread "__should_resolve" (* Hack! *)
|
78
lib_eio/core/trace.mli
Normal file
78
lib_eio/core/trace.mli
Normal file
@ -0,0 +1,78 @@
|
||||
(** This library is used to write event traces in mirage-profile's CTF format. *)
|
||||
|
||||
type id = private int
|
||||
(** Each thread/fiber/promise is identified by a unique ID. *)
|
||||
|
||||
(** {2 Recording events}
|
||||
Libraries and applications can use these functions to make the traces more useful. *)
|
||||
|
||||
val label : string -> unit
|
||||
(** [label msg] attaches text [msg] to the current thread. *)
|
||||
|
||||
(** {2 Recording system events}
|
||||
These are normally only called by the scheduler. *)
|
||||
|
||||
type ty =
|
||||
| Fiber
|
||||
| Promise
|
||||
| Semaphore
|
||||
| Switch
|
||||
| Stream
|
||||
| Mutex
|
||||
(** Types of recorded objects. *)
|
||||
|
||||
val mint_id : unit -> id
|
||||
(** [mint_id ()] is a fresh unique [id]. *)
|
||||
|
||||
val create : ?label:string -> id -> ty -> unit
|
||||
(** [create t id ty] records the creation of [id]. *)
|
||||
|
||||
val read : ?reader:id -> id -> unit
|
||||
(** [read src] records that promise [src]'s value was read.
|
||||
@param reader The thread doing the read (default is the current thread). *)
|
||||
|
||||
val try_read : id -> unit
|
||||
(** [try_read src] records that the current thread wants to read from [src] (which is not currently ready). *)
|
||||
|
||||
val fiber : id -> unit
|
||||
(** [fiber id] records that fiber [id] is now running. *)
|
||||
|
||||
val hiatus : unit -> unit
|
||||
(** [hiatus ()] records that the system will sleep for reason [r]. *)
|
||||
|
||||
val resume : id -> unit
|
||||
(** [resume id] records that the system has resumed (used after {!hiatus}),
|
||||
and is now running [id]. *)
|
||||
|
||||
val resolve : id -> ex:exn option -> unit
|
||||
(** [resolve id ~ex] records that [id] is now resolved.
|
||||
If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *)
|
||||
|
||||
val signal : ?src:id -> id -> unit
|
||||
(** [signal ~src dst] records that [dst] was signalled.
|
||||
@param src The thread sending the signal (default is the current thread). *)
|
||||
|
||||
(** {2 Controlling tracing} *)
|
||||
|
||||
type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
|
||||
|
||||
module Control : sig
|
||||
type t
|
||||
|
||||
val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t
|
||||
(** [make ~timestamper b] is a trace buffer that record events in [b].
|
||||
In most cases, the {!Eio_unix.Trace} module provides a simpler interface. *)
|
||||
|
||||
val start : t -> unit
|
||||
(** [start t] begins recording events in [t]. *)
|
||||
|
||||
val stop : t -> unit
|
||||
(** [stop t] stops recording to [t] (which must be the current trace buffer). *)
|
||||
end
|
||||
|
||||
(**/**)
|
||||
|
||||
module BS : sig
|
||||
val set_int8 : Cstruct.buffer -> int -> int -> unit
|
||||
val set_int64_le : Cstruct.buffer -> int -> int64 -> unit
|
||||
end
|
@ -6,7 +6,7 @@ type state =
|
||||
exception Poisoned of exn
|
||||
|
||||
type t = {
|
||||
id : Ctf.id;
|
||||
id : Trace.id;
|
||||
mutex : Mutex.t;
|
||||
mutable state : state; (* Owned by [t.mutex] *)
|
||||
waiters : [`Take | `Error of exn] Waiters.t; (* Owned by [t.mutex] *)
|
||||
@ -19,8 +19,8 @@ type t = {
|
||||
|
||||
(* {R} t = create () {mutex t R} *)
|
||||
let create () =
|
||||
let id = Ctf.mint_id () in
|
||||
Ctf.note_created id Ctf.Mutex;
|
||||
let id = Trace.mint_id () in
|
||||
Trace.create id Mutex;
|
||||
{
|
||||
id;
|
||||
mutex = Mutex.create ();
|
||||
@ -33,7 +33,7 @@ let create () =
|
||||
let unlock t =
|
||||
Mutex.lock t.mutex;
|
||||
(* We now have ownership of [t.state] and [t.waiters]. *)
|
||||
Ctf.note_signal t.id;
|
||||
Trace.signal t.id;
|
||||
match t.state with
|
||||
| Unlocked ->
|
||||
Mutex.unlock t.mutex;
|
||||
@ -55,7 +55,7 @@ let lock t =
|
||||
Mutex.lock t.mutex;
|
||||
match t.state with
|
||||
| Locked ->
|
||||
Ctf.note_try_read t.id;
|
||||
Trace.try_read t.id;
|
||||
begin match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
|
||||
| `Error ex -> raise ex (* Poisoned; stop waiting *)
|
||||
| `Take ->
|
||||
@ -64,7 +64,7 @@ let lock t =
|
||||
()
|
||||
end
|
||||
| Unlocked ->
|
||||
Ctf.note_read t.id;
|
||||
Trace.read t.id;
|
||||
t.state <- Locked; (* We transfer R from the state to our caller. *)
|
||||
(* {locked t * R} *)
|
||||
Mutex.unlock t.mutex
|
||||
@ -77,11 +77,11 @@ let try_lock t =
|
||||
Mutex.lock t.mutex;
|
||||
match t.state with
|
||||
| Locked ->
|
||||
Ctf.note_try_read t.id;
|
||||
Trace.try_read t.id;
|
||||
Mutex.unlock t.mutex;
|
||||
false
|
||||
| Unlocked ->
|
||||
Ctf.note_read t.id;
|
||||
Trace.read t.id;
|
||||
t.state <- Locked; (* We transfer R from the state to our caller. *)
|
||||
Mutex.unlock t.mutex;
|
||||
(* {locked t * R} *)
|
||||
|
@ -1,18 +1,18 @@
|
||||
type t = {
|
||||
id : Ctf.id;
|
||||
id : Trace.id;
|
||||
state : Sem_state.t;
|
||||
}
|
||||
|
||||
let make n =
|
||||
let id = Ctf.mint_id () in
|
||||
Ctf.note_created id Ctf.Semaphore;
|
||||
let id = Trace.mint_id () in
|
||||
Trace.create id Semaphore;
|
||||
{
|
||||
id;
|
||||
state = Sem_state.create n;
|
||||
}
|
||||
|
||||
let release t =
|
||||
Ctf.note_signal t.id;
|
||||
Trace.signal t.id;
|
||||
Sem_state.release t.state
|
||||
|
||||
let acquire t =
|
||||
@ -24,7 +24,7 @@ let acquire t =
|
||||
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
|
||||
| None -> () (* Already resumed *)
|
||||
| Some request ->
|
||||
Ctf.note_try_read t.id;
|
||||
Trace.try_read t.id;
|
||||
match Fiber_context.get_error ctx with
|
||||
| Some ex ->
|
||||
if Sem_state.cancel request then enqueue (Error ex);
|
||||
@ -36,7 +36,7 @@ let acquire t =
|
||||
)
|
||||
)
|
||||
);
|
||||
Ctf.note_read t.id
|
||||
Trace.read t.id
|
||||
|
||||
let get_value t =
|
||||
max 0 (Atomic.get t.state.state)
|
||||
|
@ -2,7 +2,7 @@ module Locking = struct
|
||||
type 'a t = {
|
||||
mutex : Mutex.t;
|
||||
|
||||
id : Ctf.id;
|
||||
id : Trace.id;
|
||||
|
||||
capacity : int; (* [capacity > 0] *)
|
||||
items : 'a Queue.t;
|
||||
@ -29,8 +29,8 @@ module Locking = struct
|
||||
|
||||
let create capacity =
|
||||
assert (capacity > 0);
|
||||
let id = Ctf.mint_id () in
|
||||
Ctf.note_created id Ctf.Stream;
|
||||
let id = Trace.mint_id () in
|
||||
Trace.create id Stream;
|
||||
{
|
||||
mutex = Mutex.create ();
|
||||
id;
|
||||
|
@ -4,7 +4,7 @@
|
||||
```ocaml
|
||||
# #require "eio";;
|
||||
# for _ = 1 to 5 do
|
||||
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
|
||||
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
|
||||
done;;
|
||||
1
|
||||
2
|
||||
@ -20,7 +20,7 @@ A new domain gets a new chunk:
|
||||
# Domain.join @@ Domain.spawn
|
||||
(fun () ->
|
||||
for _ = 1 to 5 do
|
||||
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
|
||||
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
|
||||
done);;
|
||||
1024
|
||||
1025
|
||||
@ -34,12 +34,12 @@ When the original domain exhausts its chunk, it jumps to the next free chunk:
|
||||
|
||||
```ocaml
|
||||
# for _ = 1 to 1024 - 9 do
|
||||
Eio.Private.Ctf.mint_id () |> ignore
|
||||
Eio.Private.Trace.mint_id () |> ignore
|
||||
done;;
|
||||
- : unit = ()
|
||||
|
||||
# for _ = 1 to 5 do
|
||||
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
|
||||
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
|
||||
done;;
|
||||
1021
|
||||
1022
|
@ -1,10 +1,10 @@
|
||||
open Bigarray
|
||||
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
|
||||
let timestamper log_buffer ofs =
|
||||
let ns = Mtime.to_uint64_ns @@ Mtime_clock.now () in
|
||||
Ctf.BS.set_int64_le log_buffer ofs ns
|
||||
Trace.BS.set_int64_le log_buffer ofs ns
|
||||
|
||||
let mmap_buffer ~size path =
|
||||
let fd = Unix.(openfile path [O_RDWR; O_CREAT; O_TRUNC] 0o644) in
|
||||
@ -16,6 +16,6 @@ let mmap_buffer ~size path =
|
||||
|
||||
let with_tracing ?(size=0x100000) path fn =
|
||||
let buffer = mmap_buffer ~size path in
|
||||
let trace_config = Ctf.Control.make ~timestamper buffer in
|
||||
Ctf.Control.start trace_config;
|
||||
Fun.protect fn ~finally:(fun () -> Ctf.Control.stop trace_config)
|
||||
let trace_config = Trace.Control.make ~timestamper buffer in
|
||||
Trace.Control.start trace_config;
|
||||
Fun.protect fn ~finally:(fun () -> Trace.Control.stop trace_config)
|
||||
|
@ -1,9 +1,9 @@
|
||||
val timestamper : Eio.Private.Ctf.log_buffer -> int -> unit
|
||||
val timestamper : Eio.Private.Trace.log_buffer -> int -> unit
|
||||
(** Uses [Mtime_clock] to write timestamps. *)
|
||||
|
||||
val mmap_buffer : size:int -> string -> Eio.Private.Ctf.log_buffer
|
||||
val mmap_buffer : size:int -> string -> Eio.Private.Trace.log_buffer
|
||||
(** [mmap_buffer ~size path] initialises file [path] as an empty buffer for tracing. *)
|
||||
|
||||
val with_tracing : ?size:int -> string -> (unit -> 'a) -> 'a
|
||||
(** [with_tracing path fn] is a convenience function that uses {!mmap_buffer} to create a log buffer,
|
||||
calls {!Ctf.Control.start} to start recording, runs [fn], and then stops recording. *)
|
||||
calls {!Trace.Control.start} to start recording, runs [fn], and then stops recording. *)
|
||||
|
@ -26,7 +26,7 @@ let run_in_systhread = Private.run_in_systhread
|
||||
|
||||
module Ipaddr = Net.Ipaddr
|
||||
|
||||
module Ctf = Ctf_unix
|
||||
module Trace = Ctf_unix
|
||||
|
||||
module Process = Process
|
||||
module Net = Net
|
||||
|
@ -97,6 +97,6 @@ module Private : sig
|
||||
module Fork_action = Fork_action
|
||||
end
|
||||
|
||||
module Ctf = Ctf_unix
|
||||
module Trace = Ctf_unix
|
||||
|
||||
module Pi = Pi
|
||||
|
@ -1,7 +1,7 @@
|
||||
(** A suspended fiber with its context. *)
|
||||
|
||||
open Effect.Deep
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
|
||||
type 'a t = {
|
||||
fiber : Eio.Private.Fiber_context.t;
|
||||
@ -11,9 +11,9 @@ type 'a t = {
|
||||
let tid t = Eio.Private.Fiber_context.tid t.fiber
|
||||
|
||||
let continue t v =
|
||||
Ctf.note_switch (tid t);
|
||||
Trace.fiber (tid t);
|
||||
continue t.k v
|
||||
|
||||
let discontinue t ex =
|
||||
Ctf.note_switch (tid t);
|
||||
Trace.fiber (tid t);
|
||||
discontinue t.k ex
|
||||
|
@ -47,7 +47,7 @@ let await_internal ~mutex (t:'a t) id ctx enqueue =
|
||||
let resolved_waiter = ref Hook.null in
|
||||
let finished = Atomic.make false in
|
||||
let enqueue x =
|
||||
Ctf.note_read ~reader:id (Fiber_context.tid ctx);
|
||||
Trace.read ~reader:id (Fiber_context.tid ctx);
|
||||
enqueue x
|
||||
in
|
||||
let cancel ex =
|
||||
|
@ -21,7 +21,7 @@ val is_empty : 'a t -> bool
|
||||
|
||||
val await :
|
||||
mutex:Mutex.t option ->
|
||||
'a t -> Ctf.id -> 'a
|
||||
'a t -> Trace.id -> 'a
|
||||
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
|
||||
When the waiter is woken, the fiber is resumed and returns the result.
|
||||
If [t] can be used from multiple domains:
|
||||
@ -32,7 +32,7 @@ val await :
|
||||
|
||||
val await_internal :
|
||||
mutex:Mutex.t option ->
|
||||
'a t -> Ctf.id -> Fiber_context.t ->
|
||||
'a t -> Trace.id -> Fiber_context.t ->
|
||||
(('a, exn) result -> unit) -> unit
|
||||
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
|
||||
This also allows wrapping the [enqueue] function.
|
||||
|
@ -20,7 +20,7 @@
|
||||
open Eio.Std
|
||||
|
||||
module Fiber_context = Eio.Private.Fiber_context
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
module Fd = Eio_unix.Fd
|
||||
|
||||
module Suspended = Eio_utils.Suspended
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
open Eio.Std
|
||||
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
module Fd = Eio_unix.Fd
|
||||
|
||||
type dir_fd =
|
||||
@ -20,12 +20,12 @@ let file_offset t = function
|
||||
|
||||
let enqueue_read st action (file_offset,fd,buf,len) =
|
||||
let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in
|
||||
Ctf.label "read";
|
||||
Trace.label "read";
|
||||
Sched.submit_rw_req st req
|
||||
|
||||
let rec enqueue_writev args st action =
|
||||
let (file_offset,fd,bufs) = args in
|
||||
Ctf.label "writev";
|
||||
Trace.label "writev";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.writev st.uring ~file_offset fd bufs (Job action)
|
||||
)
|
||||
@ -35,11 +35,11 @@ let rec enqueue_writev args st action =
|
||||
|
||||
let enqueue_write st action (file_offset,fd,buf,len) =
|
||||
let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in
|
||||
Ctf.label "write";
|
||||
Trace.label "write";
|
||||
Sched.submit_rw_req st req
|
||||
|
||||
let rec enqueue_splice ~src ~dst ~len st action =
|
||||
Ctf.label "splice";
|
||||
Trace.label "splice";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.splice st.uring (Job action) ~src ~dst ~len
|
||||
)
|
||||
@ -48,7 +48,7 @@ let rec enqueue_splice ~src ~dst ~len st action =
|
||||
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
|
||||
|
||||
let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action =
|
||||
Ctf.label "openat2";
|
||||
Trace.label "openat2";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action)
|
||||
)
|
||||
@ -57,7 +57,7 @@ let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st ac
|
||||
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
|
||||
|
||||
let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
|
||||
Ctf.label "statx";
|
||||
Trace.label "statx";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.statx st.uring ?fd ~mask path buf flags (Job action)
|
||||
)
|
||||
@ -66,7 +66,7 @@ let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
|
||||
Queue.push (fun st -> enqueue_statx args st action) st.io_q
|
||||
|
||||
let rec enqueue_unlink ((dir, fd, path) as args) st action =
|
||||
Ctf.label "unlinkat";
|
||||
Trace.label "unlinkat";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.unlink st.uring ~dir ~fd path (Job action)
|
||||
)
|
||||
@ -75,7 +75,7 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action =
|
||||
Queue.push (fun st -> enqueue_unlink args st action) st.io_q
|
||||
|
||||
let rec enqueue_connect fd addr st action =
|
||||
Ctf.label "connect";
|
||||
Trace.label "connect";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.connect st.uring fd addr (Job action)
|
||||
)
|
||||
@ -84,7 +84,7 @@ let rec enqueue_connect fd addr st action =
|
||||
Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
|
||||
|
||||
let rec enqueue_send_msg fd ~fds ~dst buf st action =
|
||||
Ctf.label "send_msg";
|
||||
Trace.label "send_msg";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.send_msg st.uring fd ~fds ?dst buf (Job action)
|
||||
)
|
||||
@ -93,7 +93,7 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action =
|
||||
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
|
||||
|
||||
let rec enqueue_recv_msg fd msghdr st action =
|
||||
Ctf.label "recv_msg";
|
||||
Trace.label "recv_msg";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.recv_msg st.uring fd msghdr (Job action);
|
||||
)
|
||||
@ -102,7 +102,7 @@ let rec enqueue_recv_msg fd msghdr st action =
|
||||
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
|
||||
|
||||
let rec enqueue_accept fd client_addr st action =
|
||||
Ctf.label "accept";
|
||||
Trace.label "accept";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.accept st.uring fd client_addr (Job action)
|
||||
) in
|
||||
@ -112,7 +112,7 @@ let rec enqueue_accept fd client_addr st action =
|
||||
)
|
||||
|
||||
let rec enqueue_noop t action =
|
||||
Ctf.label "noop";
|
||||
Trace.label "noop";
|
||||
let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in
|
||||
if job = None then (
|
||||
(* wait until an sqe is available *)
|
||||
@ -147,7 +147,7 @@ let read_upto ?file_offset fd buf len =
|
||||
|
||||
let rec enqueue_readv args st action =
|
||||
let (file_offset,fd,bufs) = args in
|
||||
Ctf.label "readv";
|
||||
Trace.label "readv";
|
||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||
Uring.readv st.uring ~file_offset fd bufs (Job action))
|
||||
in
|
||||
@ -465,7 +465,7 @@ let shutdown socket command =
|
||||
| Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
||||
|
||||
let accept ~sw fd =
|
||||
Ctf.label "accept";
|
||||
Trace.label "accept";
|
||||
Fd.use_exn "accept" fd @@ fun fd ->
|
||||
let client_addr = Uring.Sockaddr.create () in
|
||||
let res = Sched.enter (enqueue_accept fd client_addr) in
|
||||
|
@ -3,13 +3,13 @@
|
||||
open Eio.Std
|
||||
|
||||
module Fiber_context = Eio.Private.Fiber_context
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
|
||||
module Suspended = Eio_utils.Suspended
|
||||
module Zzz = Eio_utils.Zzz
|
||||
module Lf_queue = Eio_utils.Lf_queue
|
||||
|
||||
let system_thread = Ctf.mint_id ()
|
||||
let system_thread = Trace.mint_id ()
|
||||
|
||||
let statx_works = ref false (* Before Linux 5.18, statx is unreliable *)
|
||||
|
||||
@ -119,7 +119,7 @@ let rec enqueue_job t fn =
|
||||
|
||||
(* Cancellations always come from the same domain, so no need to send wake events here. *)
|
||||
let rec enqueue_cancel job t =
|
||||
Ctf.label "cancel";
|
||||
Trace.label "cancel";
|
||||
match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with
|
||||
| None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q
|
||||
| Some _ -> ()
|
||||
@ -162,7 +162,7 @@ let submit_pending_io st =
|
||||
match Queue.take_opt st.io_q with
|
||||
| None -> ()
|
||||
| Some fn ->
|
||||
Ctf.label "submit_pending_io";
|
||||
Trace.label "submit_pending_io";
|
||||
fn st
|
||||
|
||||
let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) =
|
||||
@ -183,7 +183,7 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as re
|
||||
)
|
||||
in
|
||||
if retry then (
|
||||
Ctf.label "await-sqe";
|
||||
Trace.label "await-sqe";
|
||||
(* wait until an sqe is available *)
|
||||
Queue.push (fun st -> submit_rw_req st req) io_q
|
||||
)
|
||||
@ -244,9 +244,9 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
|
||||
(* At this point we're not going to check [run_q] again before sleeping.
|
||||
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
||||
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
||||
Ctf.(note_hiatus Wait_for_work);
|
||||
Trace.hiatus ();
|
||||
let result = Uring.wait ?timeout uring in
|
||||
Ctf.note_resume system_thread;
|
||||
Trace.resume system_thread;
|
||||
Atomic.set st.need_wakeup false;
|
||||
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
|
||||
match result with
|
||||
@ -338,7 +338,7 @@ let free_buf st buf =
|
||||
| Some k -> enqueue_thread st k buf
|
||||
|
||||
let rec enqueue_poll_add fd poll_mask st action =
|
||||
Ctf.label "poll_add";
|
||||
Trace.label "poll_add";
|
||||
let retry = with_cancel_hook ~action st (fun () ->
|
||||
Uring.poll_add st.uring fd poll_mask (Job action)
|
||||
)
|
||||
@ -347,7 +347,7 @@ let rec enqueue_poll_add fd poll_mask st action =
|
||||
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
|
||||
|
||||
let rec enqueue_poll_add_unix fd poll_mask st action cb =
|
||||
Ctf.label "poll_add";
|
||||
Trace.label "poll_add";
|
||||
let retry = with_cancel_hook ~action st (fun () ->
|
||||
Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb))
|
||||
)
|
||||
@ -357,7 +357,7 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb =
|
||||
|
||||
let rec enqueue_readv args st action =
|
||||
let (file_offset,fd,bufs) = args in
|
||||
Ctf.label "readv";
|
||||
Trace.label "readv";
|
||||
let retry = with_cancel_hook ~action st (fun () ->
|
||||
Uring.readv st.uring ~file_offset fd bufs (Job action))
|
||||
in
|
||||
@ -388,7 +388,7 @@ let monitor_event_fd t =
|
||||
let run ~extra_effects st main arg =
|
||||
let rec fork ~new_fiber:fiber fn =
|
||||
let open Effect.Deep in
|
||||
Ctf.note_switch (Fiber_context.tid fiber);
|
||||
Trace.fiber (Fiber_context.tid fiber);
|
||||
match_with fn ()
|
||||
{ retc = (fun () -> Fiber_context.destroy fiber; schedule st);
|
||||
exnc = (fun ex ->
|
||||
|
@ -1,6 +1,6 @@
|
||||
open Eio.Std
|
||||
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
|
||||
let () =
|
||||
Logs.(set_level ~all:true (Some Debug));
|
||||
@ -78,8 +78,8 @@ let test_direct_copy () =
|
||||
let buffer = Buffer.create 20 in
|
||||
let to_output = Eio.Flow.buffer_sink buffer in
|
||||
Switch.run (fun sw ->
|
||||
Fiber.fork ~sw (fun () -> Ctf.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2);
|
||||
Fiber.fork ~sw (fun () -> Ctf.label "copy2"; Eio.Flow.copy from_pipe2 to_output);
|
||||
Fiber.fork ~sw (fun () -> Trace.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2);
|
||||
Fiber.fork ~sw (fun () -> Trace.label "copy2"; Eio.Flow.copy from_pipe2 to_output);
|
||||
Eio.Flow.copy (Eio.Flow.string_source msg) to_pipe1;
|
||||
Eio.Flow.close to_pipe1;
|
||||
);
|
||||
|
@ -18,13 +18,13 @@ module Suspended = Eio_utils.Suspended
|
||||
module Zzz = Eio_utils.Zzz
|
||||
module Lf_queue = Eio_utils.Lf_queue
|
||||
module Fiber_context = Eio.Private.Fiber_context
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
module Rcfd = Eio_unix.Private.Rcfd
|
||||
module Poll = Iomux.Poll
|
||||
|
||||
type exit = [`Exit_scheduler]
|
||||
|
||||
let system_thread = Ctf.mint_id ()
|
||||
let system_thread = Trace.mint_id ()
|
||||
|
||||
(* The type of items in the run queue. *)
|
||||
type runnable =
|
||||
@ -209,12 +209,12 @@ let rec next t : [`Exit_scheduler] =
|
||||
(* At this point we're not going to check [run_q] again before sleeping.
|
||||
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
||||
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
||||
Ctf.(note_hiatus Wait_for_work);
|
||||
Trace.hiatus ();
|
||||
let nready =
|
||||
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
|
||||
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
|
||||
in
|
||||
Ctf.note_resume system_thread;
|
||||
Trace.fiber system_thread;
|
||||
Atomic.set t.need_wakeup false;
|
||||
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
|
||||
Poll.iter_ready t.poll nready (ready t);
|
||||
@ -325,7 +325,7 @@ let enter fn = Effect.perform (Enter fn)
|
||||
let run ~extra_effects t main x =
|
||||
let rec fork ~new_fiber:fiber fn =
|
||||
let open Effect.Deep in
|
||||
Ctf.note_switch (Fiber_context.tid fiber);
|
||||
Trace.fiber (Fiber_context.tid fiber);
|
||||
match_with fn ()
|
||||
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
|
||||
exnc = (fun ex ->
|
||||
|
@ -18,12 +18,12 @@ module Suspended = Eio_utils.Suspended
|
||||
module Zzz = Eio_utils.Zzz
|
||||
module Lf_queue = Eio_utils.Lf_queue
|
||||
module Fiber_context = Eio.Private.Fiber_context
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
module Rcfd = Eio_unix.Private.Rcfd
|
||||
|
||||
type exit = [`Exit_scheduler]
|
||||
|
||||
let system_thread = Ctf.mint_id ()
|
||||
let system_thread = Trace.mint_id ()
|
||||
|
||||
(* The type of items in the run queue. *)
|
||||
type runnable =
|
||||
@ -204,14 +204,14 @@ let rec next t : [`Exit_scheduler] =
|
||||
(* At this point we're not going to check [run_q] again before sleeping.
|
||||
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
||||
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
||||
Ctf.(note_hiatus Wait_for_work);
|
||||
Trace.hiatus ();
|
||||
let cons fd acc = fd :: acc in
|
||||
let read = FdSet.fold cons t.poll.to_read [] in
|
||||
let write = FdSet.fold cons t.poll.to_write [] in
|
||||
match Unix.select read write [] timeout with
|
||||
| exception Unix.(Unix_error (EINTR, _, _)) -> next t
|
||||
| readable, writeable, _ ->
|
||||
Ctf.note_resume system_thread;
|
||||
Trace.resume system_thread;
|
||||
Atomic.set t.need_wakeup false;
|
||||
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
|
||||
List.iter (ready t [ `W ]) writeable;
|
||||
@ -317,7 +317,7 @@ let enter fn = Effect.perform (Enter fn)
|
||||
let run ~extra_effects t main x =
|
||||
let rec fork ~new_fiber:fiber fn =
|
||||
let open Effect.Deep in
|
||||
Ctf.note_switch (Fiber_context.tid fiber);
|
||||
Trace.fiber (Fiber_context.tid fiber);
|
||||
match_with fn ()
|
||||
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
|
||||
exnc = (fun ex ->
|
||||
|
@ -1,9 +1,9 @@
|
||||
(* This module also checks that Eio doesn't pull in a dependency on Unix.
|
||||
See the [dune] file. *)
|
||||
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
|
||||
let () =
|
||||
let bs = Cstruct.create 8 in
|
||||
Ctf.BS.set_int64_le bs.buffer 0 1234L;
|
||||
Trace.BS.set_int64_le bs.buffer 0 1234L;
|
||||
assert (Cstruct.LE.get_uint64 bs 0 = 1234L)
|
||||
|
@ -7,7 +7,7 @@
|
||||
```ocaml
|
||||
open Eio.Std
|
||||
|
||||
module Ctf = Eio.Private.Ctf
|
||||
module Trace = Eio.Private.Trace
|
||||
|
||||
let pp_promise pp f x =
|
||||
match Promise.peek x with
|
||||
@ -96,10 +96,10 @@ Basic semaphore tests:
|
||||
let running = ref 0 in
|
||||
let sem = Semaphore.make 2 in
|
||||
let fork = Fiber.fork_promise ~sw in
|
||||
let a = fork (fun () -> Ctf.label "a"; Semaphore.acquire sem; incr running) in
|
||||
let b = fork (fun () -> Ctf.label "b"; Semaphore.acquire sem; incr running) in
|
||||
let c = fork (fun () -> Ctf.label "c"; Semaphore.acquire sem; incr running) in
|
||||
let d = fork (fun () -> Ctf.label "d"; Semaphore.acquire sem; incr running) in
|
||||
let a = fork (fun () -> Trace.label "a"; Semaphore.acquire sem; incr running) in
|
||||
let b = fork (fun () -> Trace.label "b"; Semaphore.acquire sem; incr running) in
|
||||
let c = fork (fun () -> Trace.label "c"; Semaphore.acquire sem; incr running) in
|
||||
let d = fork (fun () -> Trace.label "d"; Semaphore.acquire sem; incr running) in
|
||||
traceln "Semaphore means that only %d threads are running" !running;
|
||||
Promise.await_exn a;
|
||||
Promise.await_exn b;
|
||||
@ -132,8 +132,8 @@ Releasing a semaphore when no-one is waiting for it:
|
||||
let sem = Semaphore.make 0 in
|
||||
Semaphore.release sem; (* Release with free-counter *)
|
||||
traceln "Initial config: %d" (Semaphore.get_value sem);
|
||||
Fiber.fork ~sw (fun () -> Ctf.label "a"; Semaphore.acquire sem);
|
||||
Fiber.fork ~sw (fun () -> Ctf.label "b"; Semaphore.acquire sem);
|
||||
Fiber.fork ~sw (fun () -> Trace.label "a"; Semaphore.acquire sem);
|
||||
Fiber.fork ~sw (fun () -> Trace.label "b"; Semaphore.acquire sem);
|
||||
traceln "A running: %d" (Semaphore.get_value sem);
|
||||
Semaphore.release sem; (* Release with a non-empty wait-queue *)
|
||||
traceln "Now b running: %d" (Semaphore.get_value sem);
|
||||
|
Loading…
x
Reference in New Issue
Block a user