Compare commits

..

3 Commits

Author SHA1 Message Date
Thomas Leonard
9e1872c10c
Merge pull request #634 from talex5/rename-ctf
Rename Ctf to Trace and tidy
2023-10-24 11:36:26 +01:00
Thomas Leonard
ddcf89983d Rename trace functions
e.g. `Ctf.note_read` becomes `Trace.read`.

Also, remove some unused functions.
2023-10-23 11:57:47 +01:00
Thomas Leonard
5a11ea6df2 Rename Ctf to Trace
This is to minimise the diff when switching to runtime events.
2023-10-23 11:23:09 +01:00
31 changed files with 207 additions and 309 deletions

View File

@ -226,7 +226,7 @@ We can run the previous code with tracing enabled (writing to a new `trace.ctf`
```ocaml
# let () =
Eio_unix.Ctf.with_tracing "trace.ctf" @@ fun () ->
Eio_unix.Trace.with_tracing "trace.ctf" @@ fun () ->
Eio_main.run main;;
+x = 1
+y = 1

View File

@ -22,7 +22,7 @@ type t = {
domain : Domain.id; (* Prevent access from other domains *)
}
and fiber_context = {
tid : Ctf.id;
tid : Trace.id;
mutable cancel_context : t;
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *)
@ -194,8 +194,8 @@ module Fiber_context = struct
t.cancel_fn <- ignore
let make ~cc ~vars =
let tid = Ctf.mint_id () in
Ctf.note_created tid Ctf.Task;
let tid = Trace.mint_id () in
Trace.create tid Fiber;
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
t

View File

@ -1,110 +0,0 @@
(** This library is used to write event traces in mirage-profile's CTF format. *)
type id = private int
(** Each thread/fiber/promise is identified by a unique ID. *)
(** {2 Recording events}
Libraries and applications can use these functions to make the traces more useful. *)
val label : string -> unit
(** [label msg] attaches text [msg] to the current thread. *)
val note_increase : string -> int -> unit
(** [note_increase counter delta] records that [counter] increased by [delta].
If [delta] is negative, this records a decrease. *)
val note_counter_value : string -> int -> unit
(** [note_counter_value counter value] records that [counter] is now [value]. *)
val should_resolve : id -> unit
(** [should_resolve id] records that [id] is expected to resolve, and should be highlighted if it doesn't. *)
(** {2 Recording system events}
These are normally only called by the scheduler. *)
type hiatus_reason =
| Wait_for_work
| Suspend
| Hibernate
type event =
| Wait
| Task
| Bind
| Try
| Choose
| Pick
| Join
| Map
| Condition
| On_success
| On_failure
| On_termination
| On_any
| Ignore_result
| Async
| Promise
| Semaphore
| Switch
| Stream
| Mutex
(** Types of threads or other recorded objects. *)
val mint_id : unit -> id
(** [mint_id ()] is a fresh unique [id]. *)
val note_created : ?label:string -> id -> event -> unit
(** [note_created t id ty] records the creation of [id]. *)
val note_read : ?reader:id -> id -> unit
(** [note_read src] records that promise [src]'s value was read.
@param reader The thread doing the read (default is the current thread). *)
val note_try_read : id -> unit
(** [note_try_read src] records that the current thread wants to read from [src] (which is not currently ready). *)
val note_switch : id -> unit
(** [note_switch id] records that [id] is now the current thread. *)
val note_hiatus : hiatus_reason -> unit
(** [note_hiatus r] records that the system will sleep for reason [r]. *)
val note_resume : id -> unit
(** [note_resume id] records that the system has resumed (used after {!note_hiatus}),
and is now running [id]. *)
val note_fork : unit -> id
(** [note_fork ()] records that a new thread has been forked and returns a fresh ID for it. *)
val note_resolved : id -> ex:exn option -> unit
(** [note_resolved id ~ex] records that [id] is now resolved.
If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *)
val note_signal : ?src:id -> id -> unit
(** [note_signal ~src dst] records that [dst] was signalled.
@param src The thread sending the signal (default is the current thread). *)
(** {2 Controlling tracing} *)
type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
module Control : sig
type t
val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t
(** [make ~timestamper b] is a trace buffer that record events in [b].
In most cases, the {!Ctf_unix} module provides a simpler interface. *)
val start : t -> unit
(** [start t] begins recording events in [t]. *)
val stop : t -> unit
(** [stop t] stops recording to [t] (which must be the current trace buffer). *)
end
(**/**)
module BS : sig
val set_int8 : Cstruct.buffer -> int -> int -> unit
val set_int64_le : Cstruct.buffer -> int -> int64 -> unit
end

View File

@ -15,7 +15,7 @@ let default_traceln ?__POS__:pos fmt =
Format.pp_close_box f ();
Format.pp_print_flush f ();
let msg = Buffer.contents b in
Ctf.label msg;
Trace.label msg;
let lines = String.split_on_char '\n' msg in
Mutex.lock traceln_mutex;
Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () ->

View File

@ -7,7 +7,7 @@ module Private = struct
module Suspend = Suspend
module Cells = Cells
module Broadcast = Broadcast
module Ctf = Ctf
module Trace = Trace
module Fiber_context = Cancel.Fiber_context
module Debug = Debug

View File

@ -571,7 +571,7 @@ end
(** @canonical Eio.Private *)
module Private : sig
module Ctf = Ctf
module Trace = Trace
module Cells = Cells
module Broadcast = Broadcast
@ -586,7 +586,7 @@ module Private : sig
val destroy : t -> unit
(** [destroy t] removes [t] from its cancellation context. *)
val tid : t -> Ctf.id
val tid : t -> Trace.id
(** {2 Cancellation}

View File

@ -19,10 +19,10 @@ let fork ~sw f =
Switch.with_op sw @@ fun () ->
match f () with
| () ->
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception ex ->
Switch.fail sw ex; (* The [with_op] ensures this will succeed *)
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
let fork_daemon ~sw f =
@ -35,13 +35,13 @@ let fork_daemon ~sw f =
match f () with
| `Stop_daemon ->
(* The daemon asked to stop. *)
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
(* The daemon was cancelled because all non-daemon fibers are finished. *)
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception ex ->
Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *)
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
let fork_promise ~sw f =

View File

@ -3,7 +3,7 @@ type 'a state =
| Unresolved of Broadcast.t
type !'a promise = {
id : Ctf.id;
id : Trace.id;
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
}
@ -25,20 +25,20 @@ let create_with_id id =
to_public_promise t, to_public_resolver t
let create ?label () =
let id = Ctf.mint_id () in
Ctf.note_created ?label id Ctf.Promise;
let id = Trace.mint_id () in
Trace.create ?label id Promise;
create_with_id id
let create_resolved x =
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Promise;
let id = Trace.mint_id () in
Trace.create id Promise;
to_public_promise { id; state = Atomic.make (Resolved x) }
let await t =
let t = of_public_promise t in
match Atomic.get t.state with
| Resolved x ->
Ctf.note_read t.id;
Trace.read t.id;
x
| Unresolved b ->
Suspend.enter (fun ctx enqueue ->
@ -53,7 +53,7 @@ let await t =
| Unresolved _ ->
(* We observed the promise to be still unresolved after registering a waiter.
Therefore any resolution must happen after we were registered and we will be notified. *)
Ctf.note_try_read t.id;
Trace.try_read t.id;
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
if Broadcast.cancel request then enqueue (Error ex)
(* else already resumed *)
@ -61,7 +61,7 @@ let await t =
);
match Atomic.get t.state with
| Resolved x ->
Ctf.note_read t.id;
Trace.read t.id;
x
| Unresolved _ -> assert false
@ -76,7 +76,7 @@ let resolve t v =
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
| Unresolved b as prev ->
if Atomic.compare_and_set t.state prev (Resolved v) then (
Ctf.note_resolved t.id ~ex:None;
Trace.resolve t.id ~ex:None;
Broadcast.resume_all b
) else (
(* Otherwise, the promise was already resolved. Retry (to get the error). *)

View File

@ -19,6 +19,6 @@ let await t id =
t.wake <- (fun x ->
Cancel.Fiber_context.clear_cancel_fn ctx;
t.wake <- ignore;
Ctf.note_read ~reader:id ctx.tid;
Trace.read ~reader:id ctx.tid;
enqueue x
)

View File

@ -1,5 +1,5 @@
type t = {
id : Ctf.id;
id : Trace.id;
mutable fibers : int; (* Total, including daemon_fibers and the main function *)
mutable daemon_fibers : int;
mutable exs : (exn * Printexc.raw_backtrace) option;
@ -51,7 +51,7 @@ let combine_exn ex = function
let fail ?(bt=Printexc.get_raw_backtrace ()) t ex =
check_our_domain t;
if t.exs = None then
Ctf.note_resolved t.id ~ex:(Some ex);
Trace.resolve t.id ~ex:(Some ex);
t.exs <- Some (combine_exn (ex, bt) t.exs);
try
Cancel.cancel t.cancel ex
@ -91,7 +91,7 @@ let or_raise = function
let rec await_idle t =
(* Wait for fibers to finish: *)
while t.fibers > 0 do
Ctf.note_try_read t.id;
Trace.try_read t.id;
Single_waiter.await t.waiter t.id
done;
(* Call on_release handlers: *)
@ -118,8 +118,8 @@ let maybe_raise_exs t =
| Some (ex, bt) -> Printexc.raise_with_backtrace ex bt
let create cancel =
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Switch;
let id = Trace.mint_id () in
Trace.create id Switch;
{
id;
fibers = 1; (* The main function counts as a fiber *)
@ -135,7 +135,7 @@ let run_internal t fn =
| v ->
dec_fibers t;
await_idle t;
Ctf.note_read t.id;
Trace.read t.id;
maybe_raise_exs t; (* Check for failure while finishing *)
(* Success. *)
v
@ -146,7 +146,7 @@ let run_internal t fn =
dec_fibers t;
fail ~bt t ex;
await_idle t;
Ctf.note_read t.id;
Trace.read t.id;
maybe_raise_exs t;
assert false

View File

@ -47,27 +47,8 @@ let mint_id () =
Domain.DLS.set next_id_key next_id_local_succ;
next_id_local
type hiatus_reason =
| Wait_for_work
| Suspend
| Hibernate
type event =
| Wait
| Task
| Bind
| Try
| Choose
| Pick
| Join
| Map
| Condition
| On_success
| On_failure
| On_termination
| On_any
| Ignore_result
| Async
type ty =
| Fiber
| Promise
| Semaphore
| Switch
@ -80,21 +61,7 @@ let current_thread = ref (-1)
let int_of_thread_type t =
match t with
| Wait -> 0
| Task -> 1
| Bind -> 2
| Try -> 3
| Choose -> 4
| Pick -> 5
| Join -> 6
| Map -> 7
| Condition -> 8
| On_success -> 9
| On_failure -> 10
| On_termination -> 11
| On_any -> 12
| Ignore_result -> 13
| Async -> 14
| Fiber -> 1
| Promise -> 15
| Semaphore -> 16
| Switch -> 17
@ -207,12 +174,12 @@ module Control = struct
let op_fails = 3
(* let op_becomes = 4 *)
let op_label = 5
let op_increase = 6
(* let op_increase = 6 *)
let op_switch = 7
(* let op_gc = 8 *)
(* let op_old_signal = 9 *)
let op_try_read = 10
let op_counter_value = 11
(* let op_counter_value = 11 *)
let op_read_later = 12
let op_signal = 13
@ -329,20 +296,6 @@ module Control = struct
|> write_string log.log msg
|> end_event
let note_increase log counter amount =
add_event log op_increase (17 + String.length counter)
|> write_tid log.log !current_thread
|> write64 log.log (Int64.of_int amount)
|> write_string log.log counter
|> end_event
let note_counter_value log counter value =
add_event log op_counter_value (17 + String.length counter)
|> write_tid log.log !current_thread
|> write64 log.log (Int64.of_int value)
|> write_string log.log counter
|> end_event
let note_switch log new_current =
if new_current <> !current_thread then (
current_thread := new_current;
@ -397,42 +350,34 @@ let label name =
| None -> ()
| Some log -> Control.note_label log !current_thread name
let note_fork () =
let child = mint_id () in
begin match !Control.event_log with
| None -> ()
| Some log -> Control.note_created log child Task
end;
child
let note_created ?label id ty =
let create ?label id ty =
match !Control.event_log with
| None -> ()
| Some log ->
Control.note_created log id ty;
Option.iter (Control.note_label log id) label
let note_switch new_current =
let fiber new_current =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_switch log new_current
let note_hiatus _reason =
let hiatus () =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_suspend log ()
let note_resume new_current =
let resume new_current =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_switch log new_current
let note_try_read input =
let try_read input =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_try_read log !current_thread input
let note_read ?reader input =
let read ?reader input =
match !Control.event_log with
| None -> ()
| Some log ->
@ -443,12 +388,12 @@ let note_read ?reader input =
in
Control.note_read log ~reader input
let note_resolved id ~ex =
let resolve id ~ex =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_resolved log id ~ex
let note_signal ?src dst =
let signal ?src dst =
match !Control.event_log with
| None -> ()
| Some log ->
@ -458,18 +403,3 @@ let note_signal ?src dst =
| Some x -> x
in
Control.note_signal ~src log dst
let note_increase counter amount =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_increase log counter amount
let note_counter_value counter value =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_counter_value log counter value
let should_resolve thread =
match !Control.event_log with
| None -> ()
| Some log -> Control.note_label log thread "__should_resolve" (* Hack! *)

78
lib_eio/core/trace.mli Normal file
View File

@ -0,0 +1,78 @@
(** This library is used to write event traces in mirage-profile's CTF format. *)
type id = private int
(** Each thread/fiber/promise is identified by a unique ID. *)
(** {2 Recording events}
Libraries and applications can use these functions to make the traces more useful. *)
val label : string -> unit
(** [label msg] attaches text [msg] to the current thread. *)
(** {2 Recording system events}
These are normally only called by the scheduler. *)
type ty =
| Fiber
| Promise
| Semaphore
| Switch
| Stream
| Mutex
(** Types of recorded objects. *)
val mint_id : unit -> id
(** [mint_id ()] is a fresh unique [id]. *)
val create : ?label:string -> id -> ty -> unit
(** [create t id ty] records the creation of [id]. *)
val read : ?reader:id -> id -> unit
(** [read src] records that promise [src]'s value was read.
@param reader The thread doing the read (default is the current thread). *)
val try_read : id -> unit
(** [try_read src] records that the current thread wants to read from [src] (which is not currently ready). *)
val fiber : id -> unit
(** [fiber id] records that fiber [id] is now running. *)
val hiatus : unit -> unit
(** [hiatus ()] records that the system will sleep for reason [r]. *)
val resume : id -> unit
(** [resume id] records that the system has resumed (used after {!hiatus}),
and is now running [id]. *)
val resolve : id -> ex:exn option -> unit
(** [resolve id ~ex] records that [id] is now resolved.
If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *)
val signal : ?src:id -> id -> unit
(** [signal ~src dst] records that [dst] was signalled.
@param src The thread sending the signal (default is the current thread). *)
(** {2 Controlling tracing} *)
type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
module Control : sig
type t
val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t
(** [make ~timestamper b] is a trace buffer that record events in [b].
In most cases, the {!Eio_unix.Trace} module provides a simpler interface. *)
val start : t -> unit
(** [start t] begins recording events in [t]. *)
val stop : t -> unit
(** [stop t] stops recording to [t] (which must be the current trace buffer). *)
end
(**/**)
module BS : sig
val set_int8 : Cstruct.buffer -> int -> int -> unit
val set_int64_le : Cstruct.buffer -> int -> int64 -> unit
end

View File

@ -6,7 +6,7 @@ type state =
exception Poisoned of exn
type t = {
id : Ctf.id;
id : Trace.id;
mutex : Mutex.t;
mutable state : state; (* Owned by [t.mutex] *)
waiters : [`Take | `Error of exn] Waiters.t; (* Owned by [t.mutex] *)
@ -19,8 +19,8 @@ type t = {
(* {R} t = create () {mutex t R} *)
let create () =
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Mutex;
let id = Trace.mint_id () in
Trace.create id Mutex;
{
id;
mutex = Mutex.create ();
@ -33,7 +33,7 @@ let create () =
let unlock t =
Mutex.lock t.mutex;
(* We now have ownership of [t.state] and [t.waiters]. *)
Ctf.note_signal t.id;
Trace.signal t.id;
match t.state with
| Unlocked ->
Mutex.unlock t.mutex;
@ -55,7 +55,7 @@ let lock t =
Mutex.lock t.mutex;
match t.state with
| Locked ->
Ctf.note_try_read t.id;
Trace.try_read t.id;
begin match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
| `Error ex -> raise ex (* Poisoned; stop waiting *)
| `Take ->
@ -64,7 +64,7 @@ let lock t =
()
end
| Unlocked ->
Ctf.note_read t.id;
Trace.read t.id;
t.state <- Locked; (* We transfer R from the state to our caller. *)
(* {locked t * R} *)
Mutex.unlock t.mutex
@ -77,11 +77,11 @@ let try_lock t =
Mutex.lock t.mutex;
match t.state with
| Locked ->
Ctf.note_try_read t.id;
Trace.try_read t.id;
Mutex.unlock t.mutex;
false
| Unlocked ->
Ctf.note_read t.id;
Trace.read t.id;
t.state <- Locked; (* We transfer R from the state to our caller. *)
Mutex.unlock t.mutex;
(* {locked t * R} *)

View File

@ -1,18 +1,18 @@
type t = {
id : Ctf.id;
id : Trace.id;
state : Sem_state.t;
}
let make n =
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Semaphore;
let id = Trace.mint_id () in
Trace.create id Semaphore;
{
id;
state = Sem_state.create n;
}
let release t =
Ctf.note_signal t.id;
Trace.signal t.id;
Sem_state.release t.state
let acquire t =
@ -24,7 +24,7 @@ let acquire t =
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
| None -> () (* Already resumed *)
| Some request ->
Ctf.note_try_read t.id;
Trace.try_read t.id;
match Fiber_context.get_error ctx with
| Some ex ->
if Sem_state.cancel request then enqueue (Error ex);
@ -36,7 +36,7 @@ let acquire t =
)
)
);
Ctf.note_read t.id
Trace.read t.id
let get_value t =
max 0 (Atomic.get t.state.state)

View File

@ -2,7 +2,7 @@ module Locking = struct
type 'a t = {
mutex : Mutex.t;
id : Ctf.id;
id : Trace.id;
capacity : int; (* [capacity > 0] *)
items : 'a Queue.t;
@ -29,8 +29,8 @@ module Locking = struct
let create capacity =
assert (capacity > 0);
let id = Ctf.mint_id () in
Ctf.note_created id Ctf.Stream;
let id = Trace.mint_id () in
Trace.create id Stream;
{
mutex = Mutex.create ();
id;

View File

@ -4,7 +4,7 @@
```ocaml
# #require "eio";;
# for _ = 1 to 5 do
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
done;;
1
2
@ -20,7 +20,7 @@ A new domain gets a new chunk:
# Domain.join @@ Domain.spawn
(fun () ->
for _ = 1 to 5 do
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
done);;
1024
1025
@ -34,12 +34,12 @@ When the original domain exhausts its chunk, it jumps to the next free chunk:
```ocaml
# for _ = 1 to 1024 - 9 do
Eio.Private.Ctf.mint_id () |> ignore
Eio.Private.Trace.mint_id () |> ignore
done;;
- : unit = ()
# for _ = 1 to 5 do
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
done;;
1021
1022

View File

@ -1,10 +1,10 @@
open Bigarray
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
let timestamper log_buffer ofs =
let ns = Mtime.to_uint64_ns @@ Mtime_clock.now () in
Ctf.BS.set_int64_le log_buffer ofs ns
Trace.BS.set_int64_le log_buffer ofs ns
let mmap_buffer ~size path =
let fd = Unix.(openfile path [O_RDWR; O_CREAT; O_TRUNC] 0o644) in
@ -16,6 +16,6 @@ let mmap_buffer ~size path =
let with_tracing ?(size=0x100000) path fn =
let buffer = mmap_buffer ~size path in
let trace_config = Ctf.Control.make ~timestamper buffer in
Ctf.Control.start trace_config;
Fun.protect fn ~finally:(fun () -> Ctf.Control.stop trace_config)
let trace_config = Trace.Control.make ~timestamper buffer in
Trace.Control.start trace_config;
Fun.protect fn ~finally:(fun () -> Trace.Control.stop trace_config)

View File

@ -1,9 +1,9 @@
val timestamper : Eio.Private.Ctf.log_buffer -> int -> unit
val timestamper : Eio.Private.Trace.log_buffer -> int -> unit
(** Uses [Mtime_clock] to write timestamps. *)
val mmap_buffer : size:int -> string -> Eio.Private.Ctf.log_buffer
val mmap_buffer : size:int -> string -> Eio.Private.Trace.log_buffer
(** [mmap_buffer ~size path] initialises file [path] as an empty buffer for tracing. *)
val with_tracing : ?size:int -> string -> (unit -> 'a) -> 'a
(** [with_tracing path fn] is a convenience function that uses {!mmap_buffer} to create a log buffer,
calls {!Ctf.Control.start} to start recording, runs [fn], and then stops recording. *)
calls {!Trace.Control.start} to start recording, runs [fn], and then stops recording. *)

View File

@ -26,7 +26,7 @@ let run_in_systhread = Private.run_in_systhread
module Ipaddr = Net.Ipaddr
module Ctf = Ctf_unix
module Trace = Ctf_unix
module Process = Process
module Net = Net

View File

@ -97,6 +97,6 @@ module Private : sig
module Fork_action = Fork_action
end
module Ctf = Ctf_unix
module Trace = Ctf_unix
module Pi = Pi

View File

@ -1,7 +1,7 @@
(** A suspended fiber with its context. *)
open Effect.Deep
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
type 'a t = {
fiber : Eio.Private.Fiber_context.t;
@ -11,9 +11,9 @@ type 'a t = {
let tid t = Eio.Private.Fiber_context.tid t.fiber
let continue t v =
Ctf.note_switch (tid t);
Trace.fiber (tid t);
continue t.k v
let discontinue t ex =
Ctf.note_switch (tid t);
Trace.fiber (tid t);
discontinue t.k ex

View File

@ -47,7 +47,7 @@ let await_internal ~mutex (t:'a t) id ctx enqueue =
let resolved_waiter = ref Hook.null in
let finished = Atomic.make false in
let enqueue x =
Ctf.note_read ~reader:id (Fiber_context.tid ctx);
Trace.read ~reader:id (Fiber_context.tid ctx);
enqueue x
in
let cancel ex =

View File

@ -21,7 +21,7 @@ val is_empty : 'a t -> bool
val await :
mutex:Mutex.t option ->
'a t -> Ctf.id -> 'a
'a t -> Trace.id -> 'a
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
When the waiter is woken, the fiber is resumed and returns the result.
If [t] can be used from multiple domains:
@ -32,7 +32,7 @@ val await :
val await_internal :
mutex:Mutex.t option ->
'a t -> Ctf.id -> Fiber_context.t ->
'a t -> Trace.id -> Fiber_context.t ->
(('a, exn) result -> unit) -> unit
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
This also allows wrapping the [enqueue] function.

View File

@ -20,7 +20,7 @@
open Eio.Std
module Fiber_context = Eio.Private.Fiber_context
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
module Fd = Eio_unix.Fd
module Suspended = Eio_utils.Suspended

View File

@ -2,7 +2,7 @@
open Eio.Std
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
module Fd = Eio_unix.Fd
type dir_fd =
@ -20,12 +20,12 @@ let file_offset t = function
let enqueue_read st action (file_offset,fd,buf,len) =
let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in
Ctf.label "read";
Trace.label "read";
Sched.submit_rw_req st req
let rec enqueue_writev args st action =
let (file_offset,fd,bufs) = args in
Ctf.label "writev";
Trace.label "writev";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.writev st.uring ~file_offset fd bufs (Job action)
)
@ -35,11 +35,11 @@ let rec enqueue_writev args st action =
let enqueue_write st action (file_offset,fd,buf,len) =
let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in
Ctf.label "write";
Trace.label "write";
Sched.submit_rw_req st req
let rec enqueue_splice ~src ~dst ~len st action =
Ctf.label "splice";
Trace.label "splice";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.splice st.uring (Job action) ~src ~dst ~len
)
@ -48,7 +48,7 @@ let rec enqueue_splice ~src ~dst ~len st action =
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action =
Ctf.label "openat2";
Trace.label "openat2";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action)
)
@ -57,7 +57,7 @@ let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st ac
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
Ctf.label "statx";
Trace.label "statx";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.statx st.uring ?fd ~mask path buf flags (Job action)
)
@ -66,7 +66,7 @@ let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
Queue.push (fun st -> enqueue_statx args st action) st.io_q
let rec enqueue_unlink ((dir, fd, path) as args) st action =
Ctf.label "unlinkat";
Trace.label "unlinkat";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.unlink st.uring ~dir ~fd path (Job action)
)
@ -75,7 +75,7 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action =
Queue.push (fun st -> enqueue_unlink args st action) st.io_q
let rec enqueue_connect fd addr st action =
Ctf.label "connect";
Trace.label "connect";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.connect st.uring fd addr (Job action)
)
@ -84,7 +84,7 @@ let rec enqueue_connect fd addr st action =
Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
let rec enqueue_send_msg fd ~fds ~dst buf st action =
Ctf.label "send_msg";
Trace.label "send_msg";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.send_msg st.uring fd ~fds ?dst buf (Job action)
)
@ -93,7 +93,7 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action =
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
let rec enqueue_recv_msg fd msghdr st action =
Ctf.label "recv_msg";
Trace.label "recv_msg";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.recv_msg st.uring fd msghdr (Job action);
)
@ -102,7 +102,7 @@ let rec enqueue_recv_msg fd msghdr st action =
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
let rec enqueue_accept fd client_addr st action =
Ctf.label "accept";
Trace.label "accept";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.accept st.uring fd client_addr (Job action)
) in
@ -112,7 +112,7 @@ let rec enqueue_accept fd client_addr st action =
)
let rec enqueue_noop t action =
Ctf.label "noop";
Trace.label "noop";
let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in
if job = None then (
(* wait until an sqe is available *)
@ -147,7 +147,7 @@ let read_upto ?file_offset fd buf len =
let rec enqueue_readv args st action =
let (file_offset,fd,bufs) = args in
Ctf.label "readv";
Trace.label "readv";
let retry = Sched.with_cancel_hook ~action st (fun () ->
Uring.readv st.uring ~file_offset fd bufs (Job action))
in
@ -465,7 +465,7 @@ let shutdown socket command =
| Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
let accept ~sw fd =
Ctf.label "accept";
Trace.label "accept";
Fd.use_exn "accept" fd @@ fun fd ->
let client_addr = Uring.Sockaddr.create () in
let res = Sched.enter (enqueue_accept fd client_addr) in

View File

@ -3,13 +3,13 @@
open Eio.Std
module Fiber_context = Eio.Private.Fiber_context
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
module Suspended = Eio_utils.Suspended
module Zzz = Eio_utils.Zzz
module Lf_queue = Eio_utils.Lf_queue
let system_thread = Ctf.mint_id ()
let system_thread = Trace.mint_id ()
let statx_works = ref false (* Before Linux 5.18, statx is unreliable *)
@ -119,7 +119,7 @@ let rec enqueue_job t fn =
(* Cancellations always come from the same domain, so no need to send wake events here. *)
let rec enqueue_cancel job t =
Ctf.label "cancel";
Trace.label "cancel";
match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with
| None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q
| Some _ -> ()
@ -162,7 +162,7 @@ let submit_pending_io st =
match Queue.take_opt st.io_q with
| None -> ()
| Some fn ->
Ctf.label "submit_pending_io";
Trace.label "submit_pending_io";
fn st
let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) =
@ -183,7 +183,7 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as re
)
in
if retry then (
Ctf.label "await-sqe";
Trace.label "await-sqe";
(* wait until an sqe is available *)
Queue.push (fun st -> submit_rw_req st req) io_q
)
@ -244,9 +244,9 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Ctf.(note_hiatus Wait_for_work);
Trace.hiatus ();
let result = Uring.wait ?timeout uring in
Ctf.note_resume system_thread;
Trace.resume system_thread;
Atomic.set st.need_wakeup false;
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
match result with
@ -338,7 +338,7 @@ let free_buf st buf =
| Some k -> enqueue_thread st k buf
let rec enqueue_poll_add fd poll_mask st action =
Ctf.label "poll_add";
Trace.label "poll_add";
let retry = with_cancel_hook ~action st (fun () ->
Uring.poll_add st.uring fd poll_mask (Job action)
)
@ -347,7 +347,7 @@ let rec enqueue_poll_add fd poll_mask st action =
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
let rec enqueue_poll_add_unix fd poll_mask st action cb =
Ctf.label "poll_add";
Trace.label "poll_add";
let retry = with_cancel_hook ~action st (fun () ->
Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb))
)
@ -357,7 +357,7 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb =
let rec enqueue_readv args st action =
let (file_offset,fd,bufs) = args in
Ctf.label "readv";
Trace.label "readv";
let retry = with_cancel_hook ~action st (fun () ->
Uring.readv st.uring ~file_offset fd bufs (Job action))
in
@ -388,7 +388,7 @@ let monitor_event_fd t =
let run ~extra_effects st main arg =
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Ctf.note_switch (Fiber_context.tid fiber);
Trace.fiber (Fiber_context.tid fiber);
match_with fn ()
{ retc = (fun () -> Fiber_context.destroy fiber; schedule st);
exnc = (fun ex ->

View File

@ -1,6 +1,6 @@
open Eio.Std
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
let () =
Logs.(set_level ~all:true (Some Debug));
@ -78,8 +78,8 @@ let test_direct_copy () =
let buffer = Buffer.create 20 in
let to_output = Eio.Flow.buffer_sink buffer in
Switch.run (fun sw ->
Fiber.fork ~sw (fun () -> Ctf.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2);
Fiber.fork ~sw (fun () -> Ctf.label "copy2"; Eio.Flow.copy from_pipe2 to_output);
Fiber.fork ~sw (fun () -> Trace.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2);
Fiber.fork ~sw (fun () -> Trace.label "copy2"; Eio.Flow.copy from_pipe2 to_output);
Eio.Flow.copy (Eio.Flow.string_source msg) to_pipe1;
Eio.Flow.close to_pipe1;
);

View File

@ -18,13 +18,13 @@ module Suspended = Eio_utils.Suspended
module Zzz = Eio_utils.Zzz
module Lf_queue = Eio_utils.Lf_queue
module Fiber_context = Eio.Private.Fiber_context
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
module Rcfd = Eio_unix.Private.Rcfd
module Poll = Iomux.Poll
type exit = [`Exit_scheduler]
let system_thread = Ctf.mint_id ()
let system_thread = Trace.mint_id ()
(* The type of items in the run queue. *)
type runnable =
@ -209,12 +209,12 @@ let rec next t : [`Exit_scheduler] =
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Ctf.(note_hiatus Wait_for_work);
Trace.hiatus ();
let nready =
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
in
Ctf.note_resume system_thread;
Trace.fiber system_thread;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
Poll.iter_ready t.poll nready (ready t);
@ -325,7 +325,7 @@ let enter fn = Effect.perform (Enter fn)
let run ~extra_effects t main x =
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Ctf.note_switch (Fiber_context.tid fiber);
Trace.fiber (Fiber_context.tid fiber);
match_with fn ()
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
exnc = (fun ex ->

View File

@ -18,12 +18,12 @@ module Suspended = Eio_utils.Suspended
module Zzz = Eio_utils.Zzz
module Lf_queue = Eio_utils.Lf_queue
module Fiber_context = Eio.Private.Fiber_context
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
module Rcfd = Eio_unix.Private.Rcfd
type exit = [`Exit_scheduler]
let system_thread = Ctf.mint_id ()
let system_thread = Trace.mint_id ()
(* The type of items in the run queue. *)
type runnable =
@ -204,14 +204,14 @@ let rec next t : [`Exit_scheduler] =
(* At this point we're not going to check [run_q] again before sleeping.
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
Ctf.(note_hiatus Wait_for_work);
Trace.hiatus ();
let cons fd acc = fd :: acc in
let read = FdSet.fold cons t.poll.to_read [] in
let write = FdSet.fold cons t.poll.to_write [] in
match Unix.select read write [] timeout with
| exception Unix.(Unix_error (EINTR, _, _)) -> next t
| readable, writeable, _ ->
Ctf.note_resume system_thread;
Trace.resume system_thread;
Atomic.set t.need_wakeup false;
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
List.iter (ready t [ `W ]) writeable;
@ -317,7 +317,7 @@ let enter fn = Effect.perform (Enter fn)
let run ~extra_effects t main x =
let rec fork ~new_fiber:fiber fn =
let open Effect.Deep in
Ctf.note_switch (Fiber_context.tid fiber);
Trace.fiber (Fiber_context.tid fiber);
match_with fn ()
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
exnc = (fun ex ->

View File

@ -1,9 +1,9 @@
(* This module also checks that Eio doesn't pull in a dependency on Unix.
See the [dune] file. *)
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
let () =
let bs = Cstruct.create 8 in
Ctf.BS.set_int64_le bs.buffer 0 1234L;
Trace.BS.set_int64_le bs.buffer 0 1234L;
assert (Cstruct.LE.get_uint64 bs 0 = 1234L)

View File

@ -7,7 +7,7 @@
```ocaml
open Eio.Std
module Ctf = Eio.Private.Ctf
module Trace = Eio.Private.Trace
let pp_promise pp f x =
match Promise.peek x with
@ -96,10 +96,10 @@ Basic semaphore tests:
let running = ref 0 in
let sem = Semaphore.make 2 in
let fork = Fiber.fork_promise ~sw in
let a = fork (fun () -> Ctf.label "a"; Semaphore.acquire sem; incr running) in
let b = fork (fun () -> Ctf.label "b"; Semaphore.acquire sem; incr running) in
let c = fork (fun () -> Ctf.label "c"; Semaphore.acquire sem; incr running) in
let d = fork (fun () -> Ctf.label "d"; Semaphore.acquire sem; incr running) in
let a = fork (fun () -> Trace.label "a"; Semaphore.acquire sem; incr running) in
let b = fork (fun () -> Trace.label "b"; Semaphore.acquire sem; incr running) in
let c = fork (fun () -> Trace.label "c"; Semaphore.acquire sem; incr running) in
let d = fork (fun () -> Trace.label "d"; Semaphore.acquire sem; incr running) in
traceln "Semaphore means that only %d threads are running" !running;
Promise.await_exn a;
Promise.await_exn b;
@ -132,8 +132,8 @@ Releasing a semaphore when no-one is waiting for it:
let sem = Semaphore.make 0 in
Semaphore.release sem; (* Release with free-counter *)
traceln "Initial config: %d" (Semaphore.get_value sem);
Fiber.fork ~sw (fun () -> Ctf.label "a"; Semaphore.acquire sem);
Fiber.fork ~sw (fun () -> Ctf.label "b"; Semaphore.acquire sem);
Fiber.fork ~sw (fun () -> Trace.label "a"; Semaphore.acquire sem);
Fiber.fork ~sw (fun () -> Trace.label "b"; Semaphore.acquire sem);
traceln "A running: %d" (Semaphore.get_value sem);
Semaphore.release sem; (* Release with a non-empty wait-queue *)
traceln "Now b running: %d" (Semaphore.get_value sem);