mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-07-22 00:01:23 -04:00
Compare commits
3 Commits
60e67472e3
...
9e1872c10c
Author | SHA1 | Date | |
---|---|---|---|
|
9e1872c10c | ||
|
ddcf89983d | ||
|
5a11ea6df2 |
@ -226,7 +226,7 @@ We can run the previous code with tracing enabled (writing to a new `trace.ctf`
|
|||||||
|
|
||||||
```ocaml
|
```ocaml
|
||||||
# let () =
|
# let () =
|
||||||
Eio_unix.Ctf.with_tracing "trace.ctf" @@ fun () ->
|
Eio_unix.Trace.with_tracing "trace.ctf" @@ fun () ->
|
||||||
Eio_main.run main;;
|
Eio_main.run main;;
|
||||||
+x = 1
|
+x = 1
|
||||||
+y = 1
|
+y = 1
|
||||||
|
@ -22,7 +22,7 @@ type t = {
|
|||||||
domain : Domain.id; (* Prevent access from other domains *)
|
domain : Domain.id; (* Prevent access from other domains *)
|
||||||
}
|
}
|
||||||
and fiber_context = {
|
and fiber_context = {
|
||||||
tid : Ctf.id;
|
tid : Trace.id;
|
||||||
mutable cancel_context : t;
|
mutable cancel_context : t;
|
||||||
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
|
mutable cancel_node : fiber_context Lwt_dllist.node option; (* Our entry in [cancel_context.fibers] *)
|
||||||
mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *)
|
mutable cancel_fn : exn -> unit; (* Encourage the current operation to finish *)
|
||||||
@ -194,8 +194,8 @@ module Fiber_context = struct
|
|||||||
t.cancel_fn <- ignore
|
t.cancel_fn <- ignore
|
||||||
|
|
||||||
let make ~cc ~vars =
|
let make ~cc ~vars =
|
||||||
let tid = Ctf.mint_id () in
|
let tid = Trace.mint_id () in
|
||||||
Ctf.note_created tid Ctf.Task;
|
Trace.create tid Fiber;
|
||||||
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
|
let t = { tid; cancel_context = cc; cancel_node = None; cancel_fn = ignore; vars } in
|
||||||
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
|
t.cancel_node <- Some (Lwt_dllist.add_r t cc.fibers);
|
||||||
t
|
t
|
||||||
|
@ -1,110 +0,0 @@
|
|||||||
(** This library is used to write event traces in mirage-profile's CTF format. *)
|
|
||||||
|
|
||||||
type id = private int
|
|
||||||
(** Each thread/fiber/promise is identified by a unique ID. *)
|
|
||||||
|
|
||||||
(** {2 Recording events}
|
|
||||||
Libraries and applications can use these functions to make the traces more useful. *)
|
|
||||||
|
|
||||||
val label : string -> unit
|
|
||||||
(** [label msg] attaches text [msg] to the current thread. *)
|
|
||||||
|
|
||||||
val note_increase : string -> int -> unit
|
|
||||||
(** [note_increase counter delta] records that [counter] increased by [delta].
|
|
||||||
If [delta] is negative, this records a decrease. *)
|
|
||||||
|
|
||||||
val note_counter_value : string -> int -> unit
|
|
||||||
(** [note_counter_value counter value] records that [counter] is now [value]. *)
|
|
||||||
|
|
||||||
val should_resolve : id -> unit
|
|
||||||
(** [should_resolve id] records that [id] is expected to resolve, and should be highlighted if it doesn't. *)
|
|
||||||
|
|
||||||
(** {2 Recording system events}
|
|
||||||
These are normally only called by the scheduler. *)
|
|
||||||
|
|
||||||
type hiatus_reason =
|
|
||||||
| Wait_for_work
|
|
||||||
| Suspend
|
|
||||||
| Hibernate
|
|
||||||
|
|
||||||
type event =
|
|
||||||
| Wait
|
|
||||||
| Task
|
|
||||||
| Bind
|
|
||||||
| Try
|
|
||||||
| Choose
|
|
||||||
| Pick
|
|
||||||
| Join
|
|
||||||
| Map
|
|
||||||
| Condition
|
|
||||||
| On_success
|
|
||||||
| On_failure
|
|
||||||
| On_termination
|
|
||||||
| On_any
|
|
||||||
| Ignore_result
|
|
||||||
| Async
|
|
||||||
| Promise
|
|
||||||
| Semaphore
|
|
||||||
| Switch
|
|
||||||
| Stream
|
|
||||||
| Mutex
|
|
||||||
(** Types of threads or other recorded objects. *)
|
|
||||||
|
|
||||||
val mint_id : unit -> id
|
|
||||||
(** [mint_id ()] is a fresh unique [id]. *)
|
|
||||||
|
|
||||||
val note_created : ?label:string -> id -> event -> unit
|
|
||||||
(** [note_created t id ty] records the creation of [id]. *)
|
|
||||||
|
|
||||||
val note_read : ?reader:id -> id -> unit
|
|
||||||
(** [note_read src] records that promise [src]'s value was read.
|
|
||||||
@param reader The thread doing the read (default is the current thread). *)
|
|
||||||
|
|
||||||
val note_try_read : id -> unit
|
|
||||||
(** [note_try_read src] records that the current thread wants to read from [src] (which is not currently ready). *)
|
|
||||||
|
|
||||||
val note_switch : id -> unit
|
|
||||||
(** [note_switch id] records that [id] is now the current thread. *)
|
|
||||||
|
|
||||||
val note_hiatus : hiatus_reason -> unit
|
|
||||||
(** [note_hiatus r] records that the system will sleep for reason [r]. *)
|
|
||||||
|
|
||||||
val note_resume : id -> unit
|
|
||||||
(** [note_resume id] records that the system has resumed (used after {!note_hiatus}),
|
|
||||||
and is now running [id]. *)
|
|
||||||
|
|
||||||
val note_fork : unit -> id
|
|
||||||
(** [note_fork ()] records that a new thread has been forked and returns a fresh ID for it. *)
|
|
||||||
|
|
||||||
val note_resolved : id -> ex:exn option -> unit
|
|
||||||
(** [note_resolved id ~ex] records that [id] is now resolved.
|
|
||||||
If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *)
|
|
||||||
|
|
||||||
val note_signal : ?src:id -> id -> unit
|
|
||||||
(** [note_signal ~src dst] records that [dst] was signalled.
|
|
||||||
@param src The thread sending the signal (default is the current thread). *)
|
|
||||||
|
|
||||||
(** {2 Controlling tracing} *)
|
|
||||||
|
|
||||||
type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
|
|
||||||
|
|
||||||
module Control : sig
|
|
||||||
type t
|
|
||||||
|
|
||||||
val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t
|
|
||||||
(** [make ~timestamper b] is a trace buffer that record events in [b].
|
|
||||||
In most cases, the {!Ctf_unix} module provides a simpler interface. *)
|
|
||||||
|
|
||||||
val start : t -> unit
|
|
||||||
(** [start t] begins recording events in [t]. *)
|
|
||||||
|
|
||||||
val stop : t -> unit
|
|
||||||
(** [stop t] stops recording to [t] (which must be the current trace buffer). *)
|
|
||||||
end
|
|
||||||
|
|
||||||
(**/**)
|
|
||||||
|
|
||||||
module BS : sig
|
|
||||||
val set_int8 : Cstruct.buffer -> int -> int -> unit
|
|
||||||
val set_int64_le : Cstruct.buffer -> int -> int64 -> unit
|
|
||||||
end
|
|
@ -15,7 +15,7 @@ let default_traceln ?__POS__:pos fmt =
|
|||||||
Format.pp_close_box f ();
|
Format.pp_close_box f ();
|
||||||
Format.pp_print_flush f ();
|
Format.pp_print_flush f ();
|
||||||
let msg = Buffer.contents b in
|
let msg = Buffer.contents b in
|
||||||
Ctf.label msg;
|
Trace.label msg;
|
||||||
let lines = String.split_on_char '\n' msg in
|
let lines = String.split_on_char '\n' msg in
|
||||||
Mutex.lock traceln_mutex;
|
Mutex.lock traceln_mutex;
|
||||||
Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () ->
|
Fun.protect ~finally:(fun () -> Mutex.unlock traceln_mutex) @@ fun () ->
|
||||||
|
@ -7,7 +7,7 @@ module Private = struct
|
|||||||
module Suspend = Suspend
|
module Suspend = Suspend
|
||||||
module Cells = Cells
|
module Cells = Cells
|
||||||
module Broadcast = Broadcast
|
module Broadcast = Broadcast
|
||||||
module Ctf = Ctf
|
module Trace = Trace
|
||||||
module Fiber_context = Cancel.Fiber_context
|
module Fiber_context = Cancel.Fiber_context
|
||||||
module Debug = Debug
|
module Debug = Debug
|
||||||
|
|
||||||
|
@ -571,7 +571,7 @@ end
|
|||||||
|
|
||||||
(** @canonical Eio.Private *)
|
(** @canonical Eio.Private *)
|
||||||
module Private : sig
|
module Private : sig
|
||||||
module Ctf = Ctf
|
module Trace = Trace
|
||||||
|
|
||||||
module Cells = Cells
|
module Cells = Cells
|
||||||
module Broadcast = Broadcast
|
module Broadcast = Broadcast
|
||||||
@ -586,7 +586,7 @@ module Private : sig
|
|||||||
val destroy : t -> unit
|
val destroy : t -> unit
|
||||||
(** [destroy t] removes [t] from its cancellation context. *)
|
(** [destroy t] removes [t] from its cancellation context. *)
|
||||||
|
|
||||||
val tid : t -> Ctf.id
|
val tid : t -> Trace.id
|
||||||
|
|
||||||
(** {2 Cancellation}
|
(** {2 Cancellation}
|
||||||
|
|
||||||
|
@ -19,10 +19,10 @@ let fork ~sw f =
|
|||||||
Switch.with_op sw @@ fun () ->
|
Switch.with_op sw @@ fun () ->
|
||||||
match f () with
|
match f () with
|
||||||
| () ->
|
| () ->
|
||||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||||
| exception ex ->
|
| exception ex ->
|
||||||
Switch.fail sw ex; (* The [with_op] ensures this will succeed *)
|
Switch.fail sw ex; (* The [with_op] ensures this will succeed *)
|
||||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
||||||
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
|
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
|
||||||
|
|
||||||
let fork_daemon ~sw f =
|
let fork_daemon ~sw f =
|
||||||
@ -35,13 +35,13 @@ let fork_daemon ~sw f =
|
|||||||
match f () with
|
match f () with
|
||||||
| `Stop_daemon ->
|
| `Stop_daemon ->
|
||||||
(* The daemon asked to stop. *)
|
(* The daemon asked to stop. *)
|
||||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||||
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
|
| exception Cancel.Cancelled Exit when not (Cancel.is_on sw.cancel) ->
|
||||||
(* The daemon was cancelled because all non-daemon fibers are finished. *)
|
(* The daemon was cancelled because all non-daemon fibers are finished. *)
|
||||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:None
|
||||||
| exception ex ->
|
| exception ex ->
|
||||||
Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *)
|
Switch.fail sw ex; (* The [with_daemon] ensures this will succeed *)
|
||||||
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
Trace.resolve (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
|
||||||
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
|
) (* else the fiber should report the error to [sw], but [sw] is failed anyway *)
|
||||||
|
|
||||||
let fork_promise ~sw f =
|
let fork_promise ~sw f =
|
||||||
|
@ -3,7 +3,7 @@ type 'a state =
|
|||||||
| Unresolved of Broadcast.t
|
| Unresolved of Broadcast.t
|
||||||
|
|
||||||
type !'a promise = {
|
type !'a promise = {
|
||||||
id : Ctf.id;
|
id : Trace.id;
|
||||||
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
|
state : 'a state Atomic.t; (* Note: we always switch to Resolved before broadcasting *)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,20 +25,20 @@ let create_with_id id =
|
|||||||
to_public_promise t, to_public_resolver t
|
to_public_promise t, to_public_resolver t
|
||||||
|
|
||||||
let create ?label () =
|
let create ?label () =
|
||||||
let id = Ctf.mint_id () in
|
let id = Trace.mint_id () in
|
||||||
Ctf.note_created ?label id Ctf.Promise;
|
Trace.create ?label id Promise;
|
||||||
create_with_id id
|
create_with_id id
|
||||||
|
|
||||||
let create_resolved x =
|
let create_resolved x =
|
||||||
let id = Ctf.mint_id () in
|
let id = Trace.mint_id () in
|
||||||
Ctf.note_created id Ctf.Promise;
|
Trace.create id Promise;
|
||||||
to_public_promise { id; state = Atomic.make (Resolved x) }
|
to_public_promise { id; state = Atomic.make (Resolved x) }
|
||||||
|
|
||||||
let await t =
|
let await t =
|
||||||
let t = of_public_promise t in
|
let t = of_public_promise t in
|
||||||
match Atomic.get t.state with
|
match Atomic.get t.state with
|
||||||
| Resolved x ->
|
| Resolved x ->
|
||||||
Ctf.note_read t.id;
|
Trace.read t.id;
|
||||||
x
|
x
|
||||||
| Unresolved b ->
|
| Unresolved b ->
|
||||||
Suspend.enter (fun ctx enqueue ->
|
Suspend.enter (fun ctx enqueue ->
|
||||||
@ -53,7 +53,7 @@ let await t =
|
|||||||
| Unresolved _ ->
|
| Unresolved _ ->
|
||||||
(* We observed the promise to be still unresolved after registering a waiter.
|
(* We observed the promise to be still unresolved after registering a waiter.
|
||||||
Therefore any resolution must happen after we were registered and we will be notified. *)
|
Therefore any resolution must happen after we were registered and we will be notified. *)
|
||||||
Ctf.note_try_read t.id;
|
Trace.try_read t.id;
|
||||||
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
|
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
|
||||||
if Broadcast.cancel request then enqueue (Error ex)
|
if Broadcast.cancel request then enqueue (Error ex)
|
||||||
(* else already resumed *)
|
(* else already resumed *)
|
||||||
@ -61,7 +61,7 @@ let await t =
|
|||||||
);
|
);
|
||||||
match Atomic.get t.state with
|
match Atomic.get t.state with
|
||||||
| Resolved x ->
|
| Resolved x ->
|
||||||
Ctf.note_read t.id;
|
Trace.read t.id;
|
||||||
x
|
x
|
||||||
| Unresolved _ -> assert false
|
| Unresolved _ -> assert false
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ let resolve t v =
|
|||||||
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
|
| Resolved _ -> invalid_arg "Can't resolve already-resolved promise"
|
||||||
| Unresolved b as prev ->
|
| Unresolved b as prev ->
|
||||||
if Atomic.compare_and_set t.state prev (Resolved v) then (
|
if Atomic.compare_and_set t.state prev (Resolved v) then (
|
||||||
Ctf.note_resolved t.id ~ex:None;
|
Trace.resolve t.id ~ex:None;
|
||||||
Broadcast.resume_all b
|
Broadcast.resume_all b
|
||||||
) else (
|
) else (
|
||||||
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
|
(* Otherwise, the promise was already resolved. Retry (to get the error). *)
|
||||||
|
@ -19,6 +19,6 @@ let await t id =
|
|||||||
t.wake <- (fun x ->
|
t.wake <- (fun x ->
|
||||||
Cancel.Fiber_context.clear_cancel_fn ctx;
|
Cancel.Fiber_context.clear_cancel_fn ctx;
|
||||||
t.wake <- ignore;
|
t.wake <- ignore;
|
||||||
Ctf.note_read ~reader:id ctx.tid;
|
Trace.read ~reader:id ctx.tid;
|
||||||
enqueue x
|
enqueue x
|
||||||
)
|
)
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
type t = {
|
type t = {
|
||||||
id : Ctf.id;
|
id : Trace.id;
|
||||||
mutable fibers : int; (* Total, including daemon_fibers and the main function *)
|
mutable fibers : int; (* Total, including daemon_fibers and the main function *)
|
||||||
mutable daemon_fibers : int;
|
mutable daemon_fibers : int;
|
||||||
mutable exs : (exn * Printexc.raw_backtrace) option;
|
mutable exs : (exn * Printexc.raw_backtrace) option;
|
||||||
@ -51,7 +51,7 @@ let combine_exn ex = function
|
|||||||
let fail ?(bt=Printexc.get_raw_backtrace ()) t ex =
|
let fail ?(bt=Printexc.get_raw_backtrace ()) t ex =
|
||||||
check_our_domain t;
|
check_our_domain t;
|
||||||
if t.exs = None then
|
if t.exs = None then
|
||||||
Ctf.note_resolved t.id ~ex:(Some ex);
|
Trace.resolve t.id ~ex:(Some ex);
|
||||||
t.exs <- Some (combine_exn (ex, bt) t.exs);
|
t.exs <- Some (combine_exn (ex, bt) t.exs);
|
||||||
try
|
try
|
||||||
Cancel.cancel t.cancel ex
|
Cancel.cancel t.cancel ex
|
||||||
@ -91,7 +91,7 @@ let or_raise = function
|
|||||||
let rec await_idle t =
|
let rec await_idle t =
|
||||||
(* Wait for fibers to finish: *)
|
(* Wait for fibers to finish: *)
|
||||||
while t.fibers > 0 do
|
while t.fibers > 0 do
|
||||||
Ctf.note_try_read t.id;
|
Trace.try_read t.id;
|
||||||
Single_waiter.await t.waiter t.id
|
Single_waiter.await t.waiter t.id
|
||||||
done;
|
done;
|
||||||
(* Call on_release handlers: *)
|
(* Call on_release handlers: *)
|
||||||
@ -118,8 +118,8 @@ let maybe_raise_exs t =
|
|||||||
| Some (ex, bt) -> Printexc.raise_with_backtrace ex bt
|
| Some (ex, bt) -> Printexc.raise_with_backtrace ex bt
|
||||||
|
|
||||||
let create cancel =
|
let create cancel =
|
||||||
let id = Ctf.mint_id () in
|
let id = Trace.mint_id () in
|
||||||
Ctf.note_created id Ctf.Switch;
|
Trace.create id Switch;
|
||||||
{
|
{
|
||||||
id;
|
id;
|
||||||
fibers = 1; (* The main function counts as a fiber *)
|
fibers = 1; (* The main function counts as a fiber *)
|
||||||
@ -135,7 +135,7 @@ let run_internal t fn =
|
|||||||
| v ->
|
| v ->
|
||||||
dec_fibers t;
|
dec_fibers t;
|
||||||
await_idle t;
|
await_idle t;
|
||||||
Ctf.note_read t.id;
|
Trace.read t.id;
|
||||||
maybe_raise_exs t; (* Check for failure while finishing *)
|
maybe_raise_exs t; (* Check for failure while finishing *)
|
||||||
(* Success. *)
|
(* Success. *)
|
||||||
v
|
v
|
||||||
@ -146,7 +146,7 @@ let run_internal t fn =
|
|||||||
dec_fibers t;
|
dec_fibers t;
|
||||||
fail ~bt t ex;
|
fail ~bt t ex;
|
||||||
await_idle t;
|
await_idle t;
|
||||||
Ctf.note_read t.id;
|
Trace.read t.id;
|
||||||
maybe_raise_exs t;
|
maybe_raise_exs t;
|
||||||
assert false
|
assert false
|
||||||
|
|
||||||
|
@ -47,27 +47,8 @@ let mint_id () =
|
|||||||
Domain.DLS.set next_id_key next_id_local_succ;
|
Domain.DLS.set next_id_key next_id_local_succ;
|
||||||
next_id_local
|
next_id_local
|
||||||
|
|
||||||
type hiatus_reason =
|
type ty =
|
||||||
| Wait_for_work
|
| Fiber
|
||||||
| Suspend
|
|
||||||
| Hibernate
|
|
||||||
|
|
||||||
type event =
|
|
||||||
| Wait
|
|
||||||
| Task
|
|
||||||
| Bind
|
|
||||||
| Try
|
|
||||||
| Choose
|
|
||||||
| Pick
|
|
||||||
| Join
|
|
||||||
| Map
|
|
||||||
| Condition
|
|
||||||
| On_success
|
|
||||||
| On_failure
|
|
||||||
| On_termination
|
|
||||||
| On_any
|
|
||||||
| Ignore_result
|
|
||||||
| Async
|
|
||||||
| Promise
|
| Promise
|
||||||
| Semaphore
|
| Semaphore
|
||||||
| Switch
|
| Switch
|
||||||
@ -80,21 +61,7 @@ let current_thread = ref (-1)
|
|||||||
|
|
||||||
let int_of_thread_type t =
|
let int_of_thread_type t =
|
||||||
match t with
|
match t with
|
||||||
| Wait -> 0
|
| Fiber -> 1
|
||||||
| Task -> 1
|
|
||||||
| Bind -> 2
|
|
||||||
| Try -> 3
|
|
||||||
| Choose -> 4
|
|
||||||
| Pick -> 5
|
|
||||||
| Join -> 6
|
|
||||||
| Map -> 7
|
|
||||||
| Condition -> 8
|
|
||||||
| On_success -> 9
|
|
||||||
| On_failure -> 10
|
|
||||||
| On_termination -> 11
|
|
||||||
| On_any -> 12
|
|
||||||
| Ignore_result -> 13
|
|
||||||
| Async -> 14
|
|
||||||
| Promise -> 15
|
| Promise -> 15
|
||||||
| Semaphore -> 16
|
| Semaphore -> 16
|
||||||
| Switch -> 17
|
| Switch -> 17
|
||||||
@ -207,12 +174,12 @@ module Control = struct
|
|||||||
let op_fails = 3
|
let op_fails = 3
|
||||||
(* let op_becomes = 4 *)
|
(* let op_becomes = 4 *)
|
||||||
let op_label = 5
|
let op_label = 5
|
||||||
let op_increase = 6
|
(* let op_increase = 6 *)
|
||||||
let op_switch = 7
|
let op_switch = 7
|
||||||
(* let op_gc = 8 *)
|
(* let op_gc = 8 *)
|
||||||
(* let op_old_signal = 9 *)
|
(* let op_old_signal = 9 *)
|
||||||
let op_try_read = 10
|
let op_try_read = 10
|
||||||
let op_counter_value = 11
|
(* let op_counter_value = 11 *)
|
||||||
let op_read_later = 12
|
let op_read_later = 12
|
||||||
let op_signal = 13
|
let op_signal = 13
|
||||||
|
|
||||||
@ -329,20 +296,6 @@ module Control = struct
|
|||||||
|> write_string log.log msg
|
|> write_string log.log msg
|
||||||
|> end_event
|
|> end_event
|
||||||
|
|
||||||
let note_increase log counter amount =
|
|
||||||
add_event log op_increase (17 + String.length counter)
|
|
||||||
|> write_tid log.log !current_thread
|
|
||||||
|> write64 log.log (Int64.of_int amount)
|
|
||||||
|> write_string log.log counter
|
|
||||||
|> end_event
|
|
||||||
|
|
||||||
let note_counter_value log counter value =
|
|
||||||
add_event log op_counter_value (17 + String.length counter)
|
|
||||||
|> write_tid log.log !current_thread
|
|
||||||
|> write64 log.log (Int64.of_int value)
|
|
||||||
|> write_string log.log counter
|
|
||||||
|> end_event
|
|
||||||
|
|
||||||
let note_switch log new_current =
|
let note_switch log new_current =
|
||||||
if new_current <> !current_thread then (
|
if new_current <> !current_thread then (
|
||||||
current_thread := new_current;
|
current_thread := new_current;
|
||||||
@ -397,42 +350,34 @@ let label name =
|
|||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log -> Control.note_label log !current_thread name
|
| Some log -> Control.note_label log !current_thread name
|
||||||
|
|
||||||
let note_fork () =
|
let create ?label id ty =
|
||||||
let child = mint_id () in
|
|
||||||
begin match !Control.event_log with
|
|
||||||
| None -> ()
|
|
||||||
| Some log -> Control.note_created log child Task
|
|
||||||
end;
|
|
||||||
child
|
|
||||||
|
|
||||||
let note_created ?label id ty =
|
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log ->
|
| Some log ->
|
||||||
Control.note_created log id ty;
|
Control.note_created log id ty;
|
||||||
Option.iter (Control.note_label log id) label
|
Option.iter (Control.note_label log id) label
|
||||||
|
|
||||||
let note_switch new_current =
|
let fiber new_current =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log -> Control.note_switch log new_current
|
| Some log -> Control.note_switch log new_current
|
||||||
|
|
||||||
let note_hiatus _reason =
|
let hiatus () =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log -> Control.note_suspend log ()
|
| Some log -> Control.note_suspend log ()
|
||||||
|
|
||||||
let note_resume new_current =
|
let resume new_current =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log -> Control.note_switch log new_current
|
| Some log -> Control.note_switch log new_current
|
||||||
|
|
||||||
let note_try_read input =
|
let try_read input =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log -> Control.note_try_read log !current_thread input
|
| Some log -> Control.note_try_read log !current_thread input
|
||||||
|
|
||||||
let note_read ?reader input =
|
let read ?reader input =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log ->
|
| Some log ->
|
||||||
@ -443,12 +388,12 @@ let note_read ?reader input =
|
|||||||
in
|
in
|
||||||
Control.note_read log ~reader input
|
Control.note_read log ~reader input
|
||||||
|
|
||||||
let note_resolved id ~ex =
|
let resolve id ~ex =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log -> Control.note_resolved log id ~ex
|
| Some log -> Control.note_resolved log id ~ex
|
||||||
|
|
||||||
let note_signal ?src dst =
|
let signal ?src dst =
|
||||||
match !Control.event_log with
|
match !Control.event_log with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some log ->
|
| Some log ->
|
||||||
@ -458,18 +403,3 @@ let note_signal ?src dst =
|
|||||||
| Some x -> x
|
| Some x -> x
|
||||||
in
|
in
|
||||||
Control.note_signal ~src log dst
|
Control.note_signal ~src log dst
|
||||||
|
|
||||||
let note_increase counter amount =
|
|
||||||
match !Control.event_log with
|
|
||||||
| None -> ()
|
|
||||||
| Some log -> Control.note_increase log counter amount
|
|
||||||
|
|
||||||
let note_counter_value counter value =
|
|
||||||
match !Control.event_log with
|
|
||||||
| None -> ()
|
|
||||||
| Some log -> Control.note_counter_value log counter value
|
|
||||||
|
|
||||||
let should_resolve thread =
|
|
||||||
match !Control.event_log with
|
|
||||||
| None -> ()
|
|
||||||
| Some log -> Control.note_label log thread "__should_resolve" (* Hack! *)
|
|
78
lib_eio/core/trace.mli
Normal file
78
lib_eio/core/trace.mli
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
(** This library is used to write event traces in mirage-profile's CTF format. *)
|
||||||
|
|
||||||
|
type id = private int
|
||||||
|
(** Each thread/fiber/promise is identified by a unique ID. *)
|
||||||
|
|
||||||
|
(** {2 Recording events}
|
||||||
|
Libraries and applications can use these functions to make the traces more useful. *)
|
||||||
|
|
||||||
|
val label : string -> unit
|
||||||
|
(** [label msg] attaches text [msg] to the current thread. *)
|
||||||
|
|
||||||
|
(** {2 Recording system events}
|
||||||
|
These are normally only called by the scheduler. *)
|
||||||
|
|
||||||
|
type ty =
|
||||||
|
| Fiber
|
||||||
|
| Promise
|
||||||
|
| Semaphore
|
||||||
|
| Switch
|
||||||
|
| Stream
|
||||||
|
| Mutex
|
||||||
|
(** Types of recorded objects. *)
|
||||||
|
|
||||||
|
val mint_id : unit -> id
|
||||||
|
(** [mint_id ()] is a fresh unique [id]. *)
|
||||||
|
|
||||||
|
val create : ?label:string -> id -> ty -> unit
|
||||||
|
(** [create t id ty] records the creation of [id]. *)
|
||||||
|
|
||||||
|
val read : ?reader:id -> id -> unit
|
||||||
|
(** [read src] records that promise [src]'s value was read.
|
||||||
|
@param reader The thread doing the read (default is the current thread). *)
|
||||||
|
|
||||||
|
val try_read : id -> unit
|
||||||
|
(** [try_read src] records that the current thread wants to read from [src] (which is not currently ready). *)
|
||||||
|
|
||||||
|
val fiber : id -> unit
|
||||||
|
(** [fiber id] records that fiber [id] is now running. *)
|
||||||
|
|
||||||
|
val hiatus : unit -> unit
|
||||||
|
(** [hiatus ()] records that the system will sleep for reason [r]. *)
|
||||||
|
|
||||||
|
val resume : id -> unit
|
||||||
|
(** [resume id] records that the system has resumed (used after {!hiatus}),
|
||||||
|
and is now running [id]. *)
|
||||||
|
|
||||||
|
val resolve : id -> ex:exn option -> unit
|
||||||
|
(** [resolve id ~ex] records that [id] is now resolved.
|
||||||
|
If [ex = None] then [id] was successful, otherwise it failed with exception [ex]. *)
|
||||||
|
|
||||||
|
val signal : ?src:id -> id -> unit
|
||||||
|
(** [signal ~src dst] records that [dst] was signalled.
|
||||||
|
@param src The thread sending the signal (default is the current thread). *)
|
||||||
|
|
||||||
|
(** {2 Controlling tracing} *)
|
||||||
|
|
||||||
|
type log_buffer = (char, Bigarray.int8_unsigned_elt, Bigarray.c_layout) Bigarray.Array1.t
|
||||||
|
|
||||||
|
module Control : sig
|
||||||
|
type t
|
||||||
|
|
||||||
|
val make : timestamper:(log_buffer -> int -> unit) -> log_buffer -> t
|
||||||
|
(** [make ~timestamper b] is a trace buffer that record events in [b].
|
||||||
|
In most cases, the {!Eio_unix.Trace} module provides a simpler interface. *)
|
||||||
|
|
||||||
|
val start : t -> unit
|
||||||
|
(** [start t] begins recording events in [t]. *)
|
||||||
|
|
||||||
|
val stop : t -> unit
|
||||||
|
(** [stop t] stops recording to [t] (which must be the current trace buffer). *)
|
||||||
|
end
|
||||||
|
|
||||||
|
(**/**)
|
||||||
|
|
||||||
|
module BS : sig
|
||||||
|
val set_int8 : Cstruct.buffer -> int -> int -> unit
|
||||||
|
val set_int64_le : Cstruct.buffer -> int -> int64 -> unit
|
||||||
|
end
|
@ -6,7 +6,7 @@ type state =
|
|||||||
exception Poisoned of exn
|
exception Poisoned of exn
|
||||||
|
|
||||||
type t = {
|
type t = {
|
||||||
id : Ctf.id;
|
id : Trace.id;
|
||||||
mutex : Mutex.t;
|
mutex : Mutex.t;
|
||||||
mutable state : state; (* Owned by [t.mutex] *)
|
mutable state : state; (* Owned by [t.mutex] *)
|
||||||
waiters : [`Take | `Error of exn] Waiters.t; (* Owned by [t.mutex] *)
|
waiters : [`Take | `Error of exn] Waiters.t; (* Owned by [t.mutex] *)
|
||||||
@ -19,8 +19,8 @@ type t = {
|
|||||||
|
|
||||||
(* {R} t = create () {mutex t R} *)
|
(* {R} t = create () {mutex t R} *)
|
||||||
let create () =
|
let create () =
|
||||||
let id = Ctf.mint_id () in
|
let id = Trace.mint_id () in
|
||||||
Ctf.note_created id Ctf.Mutex;
|
Trace.create id Mutex;
|
||||||
{
|
{
|
||||||
id;
|
id;
|
||||||
mutex = Mutex.create ();
|
mutex = Mutex.create ();
|
||||||
@ -33,7 +33,7 @@ let create () =
|
|||||||
let unlock t =
|
let unlock t =
|
||||||
Mutex.lock t.mutex;
|
Mutex.lock t.mutex;
|
||||||
(* We now have ownership of [t.state] and [t.waiters]. *)
|
(* We now have ownership of [t.state] and [t.waiters]. *)
|
||||||
Ctf.note_signal t.id;
|
Trace.signal t.id;
|
||||||
match t.state with
|
match t.state with
|
||||||
| Unlocked ->
|
| Unlocked ->
|
||||||
Mutex.unlock t.mutex;
|
Mutex.unlock t.mutex;
|
||||||
@ -55,7 +55,7 @@ let lock t =
|
|||||||
Mutex.lock t.mutex;
|
Mutex.lock t.mutex;
|
||||||
match t.state with
|
match t.state with
|
||||||
| Locked ->
|
| Locked ->
|
||||||
Ctf.note_try_read t.id;
|
Trace.try_read t.id;
|
||||||
begin match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
|
begin match Waiters.await ~mutex:(Some t.mutex) t.waiters t.id with
|
||||||
| `Error ex -> raise ex (* Poisoned; stop waiting *)
|
| `Error ex -> raise ex (* Poisoned; stop waiting *)
|
||||||
| `Take ->
|
| `Take ->
|
||||||
@ -64,7 +64,7 @@ let lock t =
|
|||||||
()
|
()
|
||||||
end
|
end
|
||||||
| Unlocked ->
|
| Unlocked ->
|
||||||
Ctf.note_read t.id;
|
Trace.read t.id;
|
||||||
t.state <- Locked; (* We transfer R from the state to our caller. *)
|
t.state <- Locked; (* We transfer R from the state to our caller. *)
|
||||||
(* {locked t * R} *)
|
(* {locked t * R} *)
|
||||||
Mutex.unlock t.mutex
|
Mutex.unlock t.mutex
|
||||||
@ -77,11 +77,11 @@ let try_lock t =
|
|||||||
Mutex.lock t.mutex;
|
Mutex.lock t.mutex;
|
||||||
match t.state with
|
match t.state with
|
||||||
| Locked ->
|
| Locked ->
|
||||||
Ctf.note_try_read t.id;
|
Trace.try_read t.id;
|
||||||
Mutex.unlock t.mutex;
|
Mutex.unlock t.mutex;
|
||||||
false
|
false
|
||||||
| Unlocked ->
|
| Unlocked ->
|
||||||
Ctf.note_read t.id;
|
Trace.read t.id;
|
||||||
t.state <- Locked; (* We transfer R from the state to our caller. *)
|
t.state <- Locked; (* We transfer R from the state to our caller. *)
|
||||||
Mutex.unlock t.mutex;
|
Mutex.unlock t.mutex;
|
||||||
(* {locked t * R} *)
|
(* {locked t * R} *)
|
||||||
|
@ -1,18 +1,18 @@
|
|||||||
type t = {
|
type t = {
|
||||||
id : Ctf.id;
|
id : Trace.id;
|
||||||
state : Sem_state.t;
|
state : Sem_state.t;
|
||||||
}
|
}
|
||||||
|
|
||||||
let make n =
|
let make n =
|
||||||
let id = Ctf.mint_id () in
|
let id = Trace.mint_id () in
|
||||||
Ctf.note_created id Ctf.Semaphore;
|
Trace.create id Semaphore;
|
||||||
{
|
{
|
||||||
id;
|
id;
|
||||||
state = Sem_state.create n;
|
state = Sem_state.create n;
|
||||||
}
|
}
|
||||||
|
|
||||||
let release t =
|
let release t =
|
||||||
Ctf.note_signal t.id;
|
Trace.signal t.id;
|
||||||
Sem_state.release t.state
|
Sem_state.release t.state
|
||||||
|
|
||||||
let acquire t =
|
let acquire t =
|
||||||
@ -24,7 +24,7 @@ let acquire t =
|
|||||||
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
|
match Sem_state.suspend t.state (fun () -> enqueue (Ok ())) with
|
||||||
| None -> () (* Already resumed *)
|
| None -> () (* Already resumed *)
|
||||||
| Some request ->
|
| Some request ->
|
||||||
Ctf.note_try_read t.id;
|
Trace.try_read t.id;
|
||||||
match Fiber_context.get_error ctx with
|
match Fiber_context.get_error ctx with
|
||||||
| Some ex ->
|
| Some ex ->
|
||||||
if Sem_state.cancel request then enqueue (Error ex);
|
if Sem_state.cancel request then enqueue (Error ex);
|
||||||
@ -36,7 +36,7 @@ let acquire t =
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
Ctf.note_read t.id
|
Trace.read t.id
|
||||||
|
|
||||||
let get_value t =
|
let get_value t =
|
||||||
max 0 (Atomic.get t.state.state)
|
max 0 (Atomic.get t.state.state)
|
||||||
|
@ -2,7 +2,7 @@ module Locking = struct
|
|||||||
type 'a t = {
|
type 'a t = {
|
||||||
mutex : Mutex.t;
|
mutex : Mutex.t;
|
||||||
|
|
||||||
id : Ctf.id;
|
id : Trace.id;
|
||||||
|
|
||||||
capacity : int; (* [capacity > 0] *)
|
capacity : int; (* [capacity > 0] *)
|
||||||
items : 'a Queue.t;
|
items : 'a Queue.t;
|
||||||
@ -29,8 +29,8 @@ module Locking = struct
|
|||||||
|
|
||||||
let create capacity =
|
let create capacity =
|
||||||
assert (capacity > 0);
|
assert (capacity > 0);
|
||||||
let id = Ctf.mint_id () in
|
let id = Trace.mint_id () in
|
||||||
Ctf.note_created id Ctf.Stream;
|
Trace.create id Stream;
|
||||||
{
|
{
|
||||||
mutex = Mutex.create ();
|
mutex = Mutex.create ();
|
||||||
id;
|
id;
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
```ocaml
|
```ocaml
|
||||||
# #require "eio";;
|
# #require "eio";;
|
||||||
# for _ = 1 to 5 do
|
# for _ = 1 to 5 do
|
||||||
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
|
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
|
||||||
done;;
|
done;;
|
||||||
1
|
1
|
||||||
2
|
2
|
||||||
@ -20,7 +20,7 @@ A new domain gets a new chunk:
|
|||||||
# Domain.join @@ Domain.spawn
|
# Domain.join @@ Domain.spawn
|
||||||
(fun () ->
|
(fun () ->
|
||||||
for _ = 1 to 5 do
|
for _ = 1 to 5 do
|
||||||
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
|
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
|
||||||
done);;
|
done);;
|
||||||
1024
|
1024
|
||||||
1025
|
1025
|
||||||
@ -34,12 +34,12 @@ When the original domain exhausts its chunk, it jumps to the next free chunk:
|
|||||||
|
|
||||||
```ocaml
|
```ocaml
|
||||||
# for _ = 1 to 1024 - 9 do
|
# for _ = 1 to 1024 - 9 do
|
||||||
Eio.Private.Ctf.mint_id () |> ignore
|
Eio.Private.Trace.mint_id () |> ignore
|
||||||
done;;
|
done;;
|
||||||
- : unit = ()
|
- : unit = ()
|
||||||
|
|
||||||
# for _ = 1 to 5 do
|
# for _ = 1 to 5 do
|
||||||
Printf.printf "%d\n%!" (Eio.Private.Ctf.mint_id () :> int)
|
Printf.printf "%d\n%!" (Eio.Private.Trace.mint_id () :> int)
|
||||||
done;;
|
done;;
|
||||||
1021
|
1021
|
||||||
1022
|
1022
|
@ -1,10 +1,10 @@
|
|||||||
open Bigarray
|
open Bigarray
|
||||||
|
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
|
|
||||||
let timestamper log_buffer ofs =
|
let timestamper log_buffer ofs =
|
||||||
let ns = Mtime.to_uint64_ns @@ Mtime_clock.now () in
|
let ns = Mtime.to_uint64_ns @@ Mtime_clock.now () in
|
||||||
Ctf.BS.set_int64_le log_buffer ofs ns
|
Trace.BS.set_int64_le log_buffer ofs ns
|
||||||
|
|
||||||
let mmap_buffer ~size path =
|
let mmap_buffer ~size path =
|
||||||
let fd = Unix.(openfile path [O_RDWR; O_CREAT; O_TRUNC] 0o644) in
|
let fd = Unix.(openfile path [O_RDWR; O_CREAT; O_TRUNC] 0o644) in
|
||||||
@ -16,6 +16,6 @@ let mmap_buffer ~size path =
|
|||||||
|
|
||||||
let with_tracing ?(size=0x100000) path fn =
|
let with_tracing ?(size=0x100000) path fn =
|
||||||
let buffer = mmap_buffer ~size path in
|
let buffer = mmap_buffer ~size path in
|
||||||
let trace_config = Ctf.Control.make ~timestamper buffer in
|
let trace_config = Trace.Control.make ~timestamper buffer in
|
||||||
Ctf.Control.start trace_config;
|
Trace.Control.start trace_config;
|
||||||
Fun.protect fn ~finally:(fun () -> Ctf.Control.stop trace_config)
|
Fun.protect fn ~finally:(fun () -> Trace.Control.stop trace_config)
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
val timestamper : Eio.Private.Ctf.log_buffer -> int -> unit
|
val timestamper : Eio.Private.Trace.log_buffer -> int -> unit
|
||||||
(** Uses [Mtime_clock] to write timestamps. *)
|
(** Uses [Mtime_clock] to write timestamps. *)
|
||||||
|
|
||||||
val mmap_buffer : size:int -> string -> Eio.Private.Ctf.log_buffer
|
val mmap_buffer : size:int -> string -> Eio.Private.Trace.log_buffer
|
||||||
(** [mmap_buffer ~size path] initialises file [path] as an empty buffer for tracing. *)
|
(** [mmap_buffer ~size path] initialises file [path] as an empty buffer for tracing. *)
|
||||||
|
|
||||||
val with_tracing : ?size:int -> string -> (unit -> 'a) -> 'a
|
val with_tracing : ?size:int -> string -> (unit -> 'a) -> 'a
|
||||||
(** [with_tracing path fn] is a convenience function that uses {!mmap_buffer} to create a log buffer,
|
(** [with_tracing path fn] is a convenience function that uses {!mmap_buffer} to create a log buffer,
|
||||||
calls {!Ctf.Control.start} to start recording, runs [fn], and then stops recording. *)
|
calls {!Trace.Control.start} to start recording, runs [fn], and then stops recording. *)
|
||||||
|
@ -26,7 +26,7 @@ let run_in_systhread = Private.run_in_systhread
|
|||||||
|
|
||||||
module Ipaddr = Net.Ipaddr
|
module Ipaddr = Net.Ipaddr
|
||||||
|
|
||||||
module Ctf = Ctf_unix
|
module Trace = Ctf_unix
|
||||||
|
|
||||||
module Process = Process
|
module Process = Process
|
||||||
module Net = Net
|
module Net = Net
|
||||||
|
@ -97,6 +97,6 @@ module Private : sig
|
|||||||
module Fork_action = Fork_action
|
module Fork_action = Fork_action
|
||||||
end
|
end
|
||||||
|
|
||||||
module Ctf = Ctf_unix
|
module Trace = Ctf_unix
|
||||||
|
|
||||||
module Pi = Pi
|
module Pi = Pi
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
(** A suspended fiber with its context. *)
|
(** A suspended fiber with its context. *)
|
||||||
|
|
||||||
open Effect.Deep
|
open Effect.Deep
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
|
|
||||||
type 'a t = {
|
type 'a t = {
|
||||||
fiber : Eio.Private.Fiber_context.t;
|
fiber : Eio.Private.Fiber_context.t;
|
||||||
@ -11,9 +11,9 @@ type 'a t = {
|
|||||||
let tid t = Eio.Private.Fiber_context.tid t.fiber
|
let tid t = Eio.Private.Fiber_context.tid t.fiber
|
||||||
|
|
||||||
let continue t v =
|
let continue t v =
|
||||||
Ctf.note_switch (tid t);
|
Trace.fiber (tid t);
|
||||||
continue t.k v
|
continue t.k v
|
||||||
|
|
||||||
let discontinue t ex =
|
let discontinue t ex =
|
||||||
Ctf.note_switch (tid t);
|
Trace.fiber (tid t);
|
||||||
discontinue t.k ex
|
discontinue t.k ex
|
||||||
|
@ -47,7 +47,7 @@ let await_internal ~mutex (t:'a t) id ctx enqueue =
|
|||||||
let resolved_waiter = ref Hook.null in
|
let resolved_waiter = ref Hook.null in
|
||||||
let finished = Atomic.make false in
|
let finished = Atomic.make false in
|
||||||
let enqueue x =
|
let enqueue x =
|
||||||
Ctf.note_read ~reader:id (Fiber_context.tid ctx);
|
Trace.read ~reader:id (Fiber_context.tid ctx);
|
||||||
enqueue x
|
enqueue x
|
||||||
in
|
in
|
||||||
let cancel ex =
|
let cancel ex =
|
||||||
|
@ -21,7 +21,7 @@ val is_empty : 'a t -> bool
|
|||||||
|
|
||||||
val await :
|
val await :
|
||||||
mutex:Mutex.t option ->
|
mutex:Mutex.t option ->
|
||||||
'a t -> Ctf.id -> 'a
|
'a t -> Trace.id -> 'a
|
||||||
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
|
(** [await ~mutex t id] suspends the current fiber and adds its continuation to [t].
|
||||||
When the waiter is woken, the fiber is resumed and returns the result.
|
When the waiter is woken, the fiber is resumed and returns the result.
|
||||||
If [t] can be used from multiple domains:
|
If [t] can be used from multiple domains:
|
||||||
@ -32,7 +32,7 @@ val await :
|
|||||||
|
|
||||||
val await_internal :
|
val await_internal :
|
||||||
mutex:Mutex.t option ->
|
mutex:Mutex.t option ->
|
||||||
'a t -> Ctf.id -> Fiber_context.t ->
|
'a t -> Trace.id -> Fiber_context.t ->
|
||||||
(('a, exn) result -> unit) -> unit
|
(('a, exn) result -> unit) -> unit
|
||||||
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
|
(** [await_internal ~mutex t id ctx enqueue] is like [await], but the caller has to suspend the fiber.
|
||||||
This also allows wrapping the [enqueue] function.
|
This also allows wrapping the [enqueue] function.
|
||||||
|
@ -20,7 +20,7 @@
|
|||||||
open Eio.Std
|
open Eio.Std
|
||||||
|
|
||||||
module Fiber_context = Eio.Private.Fiber_context
|
module Fiber_context = Eio.Private.Fiber_context
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
module Fd = Eio_unix.Fd
|
module Fd = Eio_unix.Fd
|
||||||
|
|
||||||
module Suspended = Eio_utils.Suspended
|
module Suspended = Eio_utils.Suspended
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
open Eio.Std
|
open Eio.Std
|
||||||
|
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
module Fd = Eio_unix.Fd
|
module Fd = Eio_unix.Fd
|
||||||
|
|
||||||
type dir_fd =
|
type dir_fd =
|
||||||
@ -20,12 +20,12 @@ let file_offset t = function
|
|||||||
|
|
||||||
let enqueue_read st action (file_offset,fd,buf,len) =
|
let enqueue_read st action (file_offset,fd,buf,len) =
|
||||||
let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in
|
let req = { Sched.op=`R; file_offset; len; fd; cur_off = 0; buf; action } in
|
||||||
Ctf.label "read";
|
Trace.label "read";
|
||||||
Sched.submit_rw_req st req
|
Sched.submit_rw_req st req
|
||||||
|
|
||||||
let rec enqueue_writev args st action =
|
let rec enqueue_writev args st action =
|
||||||
let (file_offset,fd,bufs) = args in
|
let (file_offset,fd,bufs) = args in
|
||||||
Ctf.label "writev";
|
Trace.label "writev";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.writev st.uring ~file_offset fd bufs (Job action)
|
Uring.writev st.uring ~file_offset fd bufs (Job action)
|
||||||
)
|
)
|
||||||
@ -35,11 +35,11 @@ let rec enqueue_writev args st action =
|
|||||||
|
|
||||||
let enqueue_write st action (file_offset,fd,buf,len) =
|
let enqueue_write st action (file_offset,fd,buf,len) =
|
||||||
let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in
|
let req = { Sched.op=`W; file_offset; len; fd; cur_off = 0; buf; action } in
|
||||||
Ctf.label "write";
|
Trace.label "write";
|
||||||
Sched.submit_rw_req st req
|
Sched.submit_rw_req st req
|
||||||
|
|
||||||
let rec enqueue_splice ~src ~dst ~len st action =
|
let rec enqueue_splice ~src ~dst ~len st action =
|
||||||
Ctf.label "splice";
|
Trace.label "splice";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.splice st.uring (Job action) ~src ~dst ~len
|
Uring.splice st.uring (Job action) ~src ~dst ~len
|
||||||
)
|
)
|
||||||
@ -48,7 +48,7 @@ let rec enqueue_splice ~src ~dst ~len st action =
|
|||||||
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
|
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action =
|
let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st action =
|
||||||
Ctf.label "openat2";
|
Trace.label "openat2";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action)
|
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action)
|
||||||
)
|
)
|
||||||
@ -57,7 +57,7 @@ let rec enqueue_openat2 ((access, flags, perm, resolve, fd, path) as args) st ac
|
|||||||
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
|
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
|
let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
|
||||||
Ctf.label "statx";
|
Trace.label "statx";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.statx st.uring ?fd ~mask path buf flags (Job action)
|
Uring.statx st.uring ?fd ~mask path buf flags (Job action)
|
||||||
)
|
)
|
||||||
@ -66,7 +66,7 @@ let rec enqueue_statx ((fd, path, buf, flags, mask) as args) st action =
|
|||||||
Queue.push (fun st -> enqueue_statx args st action) st.io_q
|
Queue.push (fun st -> enqueue_statx args st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_unlink ((dir, fd, path) as args) st action =
|
let rec enqueue_unlink ((dir, fd, path) as args) st action =
|
||||||
Ctf.label "unlinkat";
|
Trace.label "unlinkat";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.unlink st.uring ~dir ~fd path (Job action)
|
Uring.unlink st.uring ~dir ~fd path (Job action)
|
||||||
)
|
)
|
||||||
@ -75,7 +75,7 @@ let rec enqueue_unlink ((dir, fd, path) as args) st action =
|
|||||||
Queue.push (fun st -> enqueue_unlink args st action) st.io_q
|
Queue.push (fun st -> enqueue_unlink args st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_connect fd addr st action =
|
let rec enqueue_connect fd addr st action =
|
||||||
Ctf.label "connect";
|
Trace.label "connect";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.connect st.uring fd addr (Job action)
|
Uring.connect st.uring fd addr (Job action)
|
||||||
)
|
)
|
||||||
@ -84,7 +84,7 @@ let rec enqueue_connect fd addr st action =
|
|||||||
Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
|
Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_send_msg fd ~fds ~dst buf st action =
|
let rec enqueue_send_msg fd ~fds ~dst buf st action =
|
||||||
Ctf.label "send_msg";
|
Trace.label "send_msg";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.send_msg st.uring fd ~fds ?dst buf (Job action)
|
Uring.send_msg st.uring fd ~fds ?dst buf (Job action)
|
||||||
)
|
)
|
||||||
@ -93,7 +93,7 @@ let rec enqueue_send_msg fd ~fds ~dst buf st action =
|
|||||||
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
|
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_recv_msg fd msghdr st action =
|
let rec enqueue_recv_msg fd msghdr st action =
|
||||||
Ctf.label "recv_msg";
|
Trace.label "recv_msg";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.recv_msg st.uring fd msghdr (Job action);
|
Uring.recv_msg st.uring fd msghdr (Job action);
|
||||||
)
|
)
|
||||||
@ -102,7 +102,7 @@ let rec enqueue_recv_msg fd msghdr st action =
|
|||||||
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
|
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_accept fd client_addr st action =
|
let rec enqueue_accept fd client_addr st action =
|
||||||
Ctf.label "accept";
|
Trace.label "accept";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.accept st.uring fd client_addr (Job action)
|
Uring.accept st.uring fd client_addr (Job action)
|
||||||
) in
|
) in
|
||||||
@ -112,7 +112,7 @@ let rec enqueue_accept fd client_addr st action =
|
|||||||
)
|
)
|
||||||
|
|
||||||
let rec enqueue_noop t action =
|
let rec enqueue_noop t action =
|
||||||
Ctf.label "noop";
|
Trace.label "noop";
|
||||||
let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in
|
let job = Sched.enqueue_job t (fun () -> Uring.noop t.uring (Job_no_cancel action)) in
|
||||||
if job = None then (
|
if job = None then (
|
||||||
(* wait until an sqe is available *)
|
(* wait until an sqe is available *)
|
||||||
@ -147,7 +147,7 @@ let read_upto ?file_offset fd buf len =
|
|||||||
|
|
||||||
let rec enqueue_readv args st action =
|
let rec enqueue_readv args st action =
|
||||||
let (file_offset,fd,bufs) = args in
|
let (file_offset,fd,bufs) = args in
|
||||||
Ctf.label "readv";
|
Trace.label "readv";
|
||||||
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
let retry = Sched.with_cancel_hook ~action st (fun () ->
|
||||||
Uring.readv st.uring ~file_offset fd bufs (Job action))
|
Uring.readv st.uring ~file_offset fd bufs (Job action))
|
||||||
in
|
in
|
||||||
@ -465,7 +465,7 @@ let shutdown socket command =
|
|||||||
| Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
| Unix.Unix_error (code, name, arg) -> raise @@ Err.wrap code name arg
|
||||||
|
|
||||||
let accept ~sw fd =
|
let accept ~sw fd =
|
||||||
Ctf.label "accept";
|
Trace.label "accept";
|
||||||
Fd.use_exn "accept" fd @@ fun fd ->
|
Fd.use_exn "accept" fd @@ fun fd ->
|
||||||
let client_addr = Uring.Sockaddr.create () in
|
let client_addr = Uring.Sockaddr.create () in
|
||||||
let res = Sched.enter (enqueue_accept fd client_addr) in
|
let res = Sched.enter (enqueue_accept fd client_addr) in
|
||||||
|
@ -3,13 +3,13 @@
|
|||||||
open Eio.Std
|
open Eio.Std
|
||||||
|
|
||||||
module Fiber_context = Eio.Private.Fiber_context
|
module Fiber_context = Eio.Private.Fiber_context
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
|
|
||||||
module Suspended = Eio_utils.Suspended
|
module Suspended = Eio_utils.Suspended
|
||||||
module Zzz = Eio_utils.Zzz
|
module Zzz = Eio_utils.Zzz
|
||||||
module Lf_queue = Eio_utils.Lf_queue
|
module Lf_queue = Eio_utils.Lf_queue
|
||||||
|
|
||||||
let system_thread = Ctf.mint_id ()
|
let system_thread = Trace.mint_id ()
|
||||||
|
|
||||||
let statx_works = ref false (* Before Linux 5.18, statx is unreliable *)
|
let statx_works = ref false (* Before Linux 5.18, statx is unreliable *)
|
||||||
|
|
||||||
@ -119,7 +119,7 @@ let rec enqueue_job t fn =
|
|||||||
|
|
||||||
(* Cancellations always come from the same domain, so no need to send wake events here. *)
|
(* Cancellations always come from the same domain, so no need to send wake events here. *)
|
||||||
let rec enqueue_cancel job t =
|
let rec enqueue_cancel job t =
|
||||||
Ctf.label "cancel";
|
Trace.label "cancel";
|
||||||
match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with
|
match enqueue_job t (fun () -> Uring.cancel t.uring job Cancel_job) with
|
||||||
| None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q
|
| None -> Queue.push (fun t -> enqueue_cancel job t) t.io_q
|
||||||
| Some _ -> ()
|
| Some _ -> ()
|
||||||
@ -162,7 +162,7 @@ let submit_pending_io st =
|
|||||||
match Queue.take_opt st.io_q with
|
match Queue.take_opt st.io_q with
|
||||||
| None -> ()
|
| None -> ()
|
||||||
| Some fn ->
|
| Some fn ->
|
||||||
Ctf.label "submit_pending_io";
|
Trace.label "submit_pending_io";
|
||||||
fn st
|
fn st
|
||||||
|
|
||||||
let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) =
|
let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) =
|
||||||
@ -183,7 +183,7 @@ let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as re
|
|||||||
)
|
)
|
||||||
in
|
in
|
||||||
if retry then (
|
if retry then (
|
||||||
Ctf.label "await-sqe";
|
Trace.label "await-sqe";
|
||||||
(* wait until an sqe is available *)
|
(* wait until an sqe is available *)
|
||||||
Queue.push (fun st -> submit_rw_req st req) io_q
|
Queue.push (fun st -> submit_rw_req st req) io_q
|
||||||
)
|
)
|
||||||
@ -244,9 +244,9 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
|
|||||||
(* At this point we're not going to check [run_q] again before sleeping.
|
(* At this point we're not going to check [run_q] again before sleeping.
|
||||||
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
||||||
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
||||||
Ctf.(note_hiatus Wait_for_work);
|
Trace.hiatus ();
|
||||||
let result = Uring.wait ?timeout uring in
|
let result = Uring.wait ?timeout uring in
|
||||||
Ctf.note_resume system_thread;
|
Trace.resume system_thread;
|
||||||
Atomic.set st.need_wakeup false;
|
Atomic.set st.need_wakeup false;
|
||||||
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
|
Lf_queue.push run_q IO; (* Re-inject IO job in the run queue *)
|
||||||
match result with
|
match result with
|
||||||
@ -338,7 +338,7 @@ let free_buf st buf =
|
|||||||
| Some k -> enqueue_thread st k buf
|
| Some k -> enqueue_thread st k buf
|
||||||
|
|
||||||
let rec enqueue_poll_add fd poll_mask st action =
|
let rec enqueue_poll_add fd poll_mask st action =
|
||||||
Ctf.label "poll_add";
|
Trace.label "poll_add";
|
||||||
let retry = with_cancel_hook ~action st (fun () ->
|
let retry = with_cancel_hook ~action st (fun () ->
|
||||||
Uring.poll_add st.uring fd poll_mask (Job action)
|
Uring.poll_add st.uring fd poll_mask (Job action)
|
||||||
)
|
)
|
||||||
@ -347,7 +347,7 @@ let rec enqueue_poll_add fd poll_mask st action =
|
|||||||
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
|
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
|
||||||
|
|
||||||
let rec enqueue_poll_add_unix fd poll_mask st action cb =
|
let rec enqueue_poll_add_unix fd poll_mask st action cb =
|
||||||
Ctf.label "poll_add";
|
Trace.label "poll_add";
|
||||||
let retry = with_cancel_hook ~action st (fun () ->
|
let retry = with_cancel_hook ~action st (fun () ->
|
||||||
Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb))
|
Uring.poll_add st.uring fd poll_mask (Job_fn (action, cb))
|
||||||
)
|
)
|
||||||
@ -357,7 +357,7 @@ let rec enqueue_poll_add_unix fd poll_mask st action cb =
|
|||||||
|
|
||||||
let rec enqueue_readv args st action =
|
let rec enqueue_readv args st action =
|
||||||
let (file_offset,fd,bufs) = args in
|
let (file_offset,fd,bufs) = args in
|
||||||
Ctf.label "readv";
|
Trace.label "readv";
|
||||||
let retry = with_cancel_hook ~action st (fun () ->
|
let retry = with_cancel_hook ~action st (fun () ->
|
||||||
Uring.readv st.uring ~file_offset fd bufs (Job action))
|
Uring.readv st.uring ~file_offset fd bufs (Job action))
|
||||||
in
|
in
|
||||||
@ -388,7 +388,7 @@ let monitor_event_fd t =
|
|||||||
let run ~extra_effects st main arg =
|
let run ~extra_effects st main arg =
|
||||||
let rec fork ~new_fiber:fiber fn =
|
let rec fork ~new_fiber:fiber fn =
|
||||||
let open Effect.Deep in
|
let open Effect.Deep in
|
||||||
Ctf.note_switch (Fiber_context.tid fiber);
|
Trace.fiber (Fiber_context.tid fiber);
|
||||||
match_with fn ()
|
match_with fn ()
|
||||||
{ retc = (fun () -> Fiber_context.destroy fiber; schedule st);
|
{ retc = (fun () -> Fiber_context.destroy fiber; schedule st);
|
||||||
exnc = (fun ex ->
|
exnc = (fun ex ->
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
open Eio.Std
|
open Eio.Std
|
||||||
|
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
Logs.(set_level ~all:true (Some Debug));
|
Logs.(set_level ~all:true (Some Debug));
|
||||||
@ -78,8 +78,8 @@ let test_direct_copy () =
|
|||||||
let buffer = Buffer.create 20 in
|
let buffer = Buffer.create 20 in
|
||||||
let to_output = Eio.Flow.buffer_sink buffer in
|
let to_output = Eio.Flow.buffer_sink buffer in
|
||||||
Switch.run (fun sw ->
|
Switch.run (fun sw ->
|
||||||
Fiber.fork ~sw (fun () -> Ctf.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2);
|
Fiber.fork ~sw (fun () -> Trace.label "copy1"; Eio.Flow.copy from_pipe1 to_pipe2; Eio.Flow.close to_pipe2);
|
||||||
Fiber.fork ~sw (fun () -> Ctf.label "copy2"; Eio.Flow.copy from_pipe2 to_output);
|
Fiber.fork ~sw (fun () -> Trace.label "copy2"; Eio.Flow.copy from_pipe2 to_output);
|
||||||
Eio.Flow.copy (Eio.Flow.string_source msg) to_pipe1;
|
Eio.Flow.copy (Eio.Flow.string_source msg) to_pipe1;
|
||||||
Eio.Flow.close to_pipe1;
|
Eio.Flow.close to_pipe1;
|
||||||
);
|
);
|
||||||
|
@ -18,13 +18,13 @@ module Suspended = Eio_utils.Suspended
|
|||||||
module Zzz = Eio_utils.Zzz
|
module Zzz = Eio_utils.Zzz
|
||||||
module Lf_queue = Eio_utils.Lf_queue
|
module Lf_queue = Eio_utils.Lf_queue
|
||||||
module Fiber_context = Eio.Private.Fiber_context
|
module Fiber_context = Eio.Private.Fiber_context
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
module Rcfd = Eio_unix.Private.Rcfd
|
module Rcfd = Eio_unix.Private.Rcfd
|
||||||
module Poll = Iomux.Poll
|
module Poll = Iomux.Poll
|
||||||
|
|
||||||
type exit = [`Exit_scheduler]
|
type exit = [`Exit_scheduler]
|
||||||
|
|
||||||
let system_thread = Ctf.mint_id ()
|
let system_thread = Trace.mint_id ()
|
||||||
|
|
||||||
(* The type of items in the run queue. *)
|
(* The type of items in the run queue. *)
|
||||||
type runnable =
|
type runnable =
|
||||||
@ -209,12 +209,12 @@ let rec next t : [`Exit_scheduler] =
|
|||||||
(* At this point we're not going to check [run_q] again before sleeping.
|
(* At this point we're not going to check [run_q] again before sleeping.
|
||||||
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
||||||
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
||||||
Ctf.(note_hiatus Wait_for_work);
|
Trace.hiatus ();
|
||||||
let nready =
|
let nready =
|
||||||
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
|
try Poll.ppoll_or_poll t.poll (t.poll_maxi + 1) timeout
|
||||||
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
|
with Unix.Unix_error (Unix.EINTR, _, "") -> 0
|
||||||
in
|
in
|
||||||
Ctf.note_resume system_thread;
|
Trace.fiber system_thread;
|
||||||
Atomic.set t.need_wakeup false;
|
Atomic.set t.need_wakeup false;
|
||||||
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
|
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
|
||||||
Poll.iter_ready t.poll nready (ready t);
|
Poll.iter_ready t.poll nready (ready t);
|
||||||
@ -325,7 +325,7 @@ let enter fn = Effect.perform (Enter fn)
|
|||||||
let run ~extra_effects t main x =
|
let run ~extra_effects t main x =
|
||||||
let rec fork ~new_fiber:fiber fn =
|
let rec fork ~new_fiber:fiber fn =
|
||||||
let open Effect.Deep in
|
let open Effect.Deep in
|
||||||
Ctf.note_switch (Fiber_context.tid fiber);
|
Trace.fiber (Fiber_context.tid fiber);
|
||||||
match_with fn ()
|
match_with fn ()
|
||||||
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
|
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
|
||||||
exnc = (fun ex ->
|
exnc = (fun ex ->
|
||||||
|
@ -18,12 +18,12 @@ module Suspended = Eio_utils.Suspended
|
|||||||
module Zzz = Eio_utils.Zzz
|
module Zzz = Eio_utils.Zzz
|
||||||
module Lf_queue = Eio_utils.Lf_queue
|
module Lf_queue = Eio_utils.Lf_queue
|
||||||
module Fiber_context = Eio.Private.Fiber_context
|
module Fiber_context = Eio.Private.Fiber_context
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
module Rcfd = Eio_unix.Private.Rcfd
|
module Rcfd = Eio_unix.Private.Rcfd
|
||||||
|
|
||||||
type exit = [`Exit_scheduler]
|
type exit = [`Exit_scheduler]
|
||||||
|
|
||||||
let system_thread = Ctf.mint_id ()
|
let system_thread = Trace.mint_id ()
|
||||||
|
|
||||||
(* The type of items in the run queue. *)
|
(* The type of items in the run queue. *)
|
||||||
type runnable =
|
type runnable =
|
||||||
@ -204,14 +204,14 @@ let rec next t : [`Exit_scheduler] =
|
|||||||
(* At this point we're not going to check [run_q] again before sleeping.
|
(* At this point we're not going to check [run_q] again before sleeping.
|
||||||
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
If [need_wakeup] is still [true], this is fine because we don't promise to do that.
|
||||||
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
If [need_wakeup = false], a wake-up event will arrive and wake us up soon. *)
|
||||||
Ctf.(note_hiatus Wait_for_work);
|
Trace.hiatus ();
|
||||||
let cons fd acc = fd :: acc in
|
let cons fd acc = fd :: acc in
|
||||||
let read = FdSet.fold cons t.poll.to_read [] in
|
let read = FdSet.fold cons t.poll.to_read [] in
|
||||||
let write = FdSet.fold cons t.poll.to_write [] in
|
let write = FdSet.fold cons t.poll.to_write [] in
|
||||||
match Unix.select read write [] timeout with
|
match Unix.select read write [] timeout with
|
||||||
| exception Unix.(Unix_error (EINTR, _, _)) -> next t
|
| exception Unix.(Unix_error (EINTR, _, _)) -> next t
|
||||||
| readable, writeable, _ ->
|
| readable, writeable, _ ->
|
||||||
Ctf.note_resume system_thread;
|
Trace.resume system_thread;
|
||||||
Atomic.set t.need_wakeup false;
|
Atomic.set t.need_wakeup false;
|
||||||
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
|
Lf_queue.push t.run_q IO; (* Re-inject IO job in the run queue *)
|
||||||
List.iter (ready t [ `W ]) writeable;
|
List.iter (ready t [ `W ]) writeable;
|
||||||
@ -317,7 +317,7 @@ let enter fn = Effect.perform (Enter fn)
|
|||||||
let run ~extra_effects t main x =
|
let run ~extra_effects t main x =
|
||||||
let rec fork ~new_fiber:fiber fn =
|
let rec fork ~new_fiber:fiber fn =
|
||||||
let open Effect.Deep in
|
let open Effect.Deep in
|
||||||
Ctf.note_switch (Fiber_context.tid fiber);
|
Trace.fiber (Fiber_context.tid fiber);
|
||||||
match_with fn ()
|
match_with fn ()
|
||||||
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
|
{ retc = (fun () -> Fiber_context.destroy fiber; next t);
|
||||||
exnc = (fun ex ->
|
exnc = (fun ex ->
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
(* This module also checks that Eio doesn't pull in a dependency on Unix.
|
(* This module also checks that Eio doesn't pull in a dependency on Unix.
|
||||||
See the [dune] file. *)
|
See the [dune] file. *)
|
||||||
|
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
let bs = Cstruct.create 8 in
|
let bs = Cstruct.create 8 in
|
||||||
Ctf.BS.set_int64_le bs.buffer 0 1234L;
|
Trace.BS.set_int64_le bs.buffer 0 1234L;
|
||||||
assert (Cstruct.LE.get_uint64 bs 0 = 1234L)
|
assert (Cstruct.LE.get_uint64 bs 0 = 1234L)
|
||||||
|
@ -7,7 +7,7 @@
|
|||||||
```ocaml
|
```ocaml
|
||||||
open Eio.Std
|
open Eio.Std
|
||||||
|
|
||||||
module Ctf = Eio.Private.Ctf
|
module Trace = Eio.Private.Trace
|
||||||
|
|
||||||
let pp_promise pp f x =
|
let pp_promise pp f x =
|
||||||
match Promise.peek x with
|
match Promise.peek x with
|
||||||
@ -96,10 +96,10 @@ Basic semaphore tests:
|
|||||||
let running = ref 0 in
|
let running = ref 0 in
|
||||||
let sem = Semaphore.make 2 in
|
let sem = Semaphore.make 2 in
|
||||||
let fork = Fiber.fork_promise ~sw in
|
let fork = Fiber.fork_promise ~sw in
|
||||||
let a = fork (fun () -> Ctf.label "a"; Semaphore.acquire sem; incr running) in
|
let a = fork (fun () -> Trace.label "a"; Semaphore.acquire sem; incr running) in
|
||||||
let b = fork (fun () -> Ctf.label "b"; Semaphore.acquire sem; incr running) in
|
let b = fork (fun () -> Trace.label "b"; Semaphore.acquire sem; incr running) in
|
||||||
let c = fork (fun () -> Ctf.label "c"; Semaphore.acquire sem; incr running) in
|
let c = fork (fun () -> Trace.label "c"; Semaphore.acquire sem; incr running) in
|
||||||
let d = fork (fun () -> Ctf.label "d"; Semaphore.acquire sem; incr running) in
|
let d = fork (fun () -> Trace.label "d"; Semaphore.acquire sem; incr running) in
|
||||||
traceln "Semaphore means that only %d threads are running" !running;
|
traceln "Semaphore means that only %d threads are running" !running;
|
||||||
Promise.await_exn a;
|
Promise.await_exn a;
|
||||||
Promise.await_exn b;
|
Promise.await_exn b;
|
||||||
@ -132,8 +132,8 @@ Releasing a semaphore when no-one is waiting for it:
|
|||||||
let sem = Semaphore.make 0 in
|
let sem = Semaphore.make 0 in
|
||||||
Semaphore.release sem; (* Release with free-counter *)
|
Semaphore.release sem; (* Release with free-counter *)
|
||||||
traceln "Initial config: %d" (Semaphore.get_value sem);
|
traceln "Initial config: %d" (Semaphore.get_value sem);
|
||||||
Fiber.fork ~sw (fun () -> Ctf.label "a"; Semaphore.acquire sem);
|
Fiber.fork ~sw (fun () -> Trace.label "a"; Semaphore.acquire sem);
|
||||||
Fiber.fork ~sw (fun () -> Ctf.label "b"; Semaphore.acquire sem);
|
Fiber.fork ~sw (fun () -> Trace.label "b"; Semaphore.acquire sem);
|
||||||
traceln "A running: %d" (Semaphore.get_value sem);
|
traceln "A running: %d" (Semaphore.get_value sem);
|
||||||
Semaphore.release sem; (* Release with a non-empty wait-queue *)
|
Semaphore.release sem; (* Release with a non-empty wait-queue *)
|
||||||
traceln "Now b running: %d" (Semaphore.get_value sem);
|
traceln "Now b running: %d" (Semaphore.get_value sem);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user