Report use of closed FDs better

Previously, attempting to use a closed FD would raise an exception in
the scheduler context, causing the whole event loop to exit. Now it's
just reported back to the caller as an error. This means you get a stack
trace, a chance to clean up or recover, and you get to see some output
from MDX.
This commit is contained in:
Thomas Leonard 2022-07-18 13:26:53 +01:00
parent b46ffba2c5
commit 547622fdb7
3 changed files with 148 additions and 83 deletions

View File

@ -50,17 +50,21 @@ module FD = struct
mutable fd : [`Open of Unix.file_descr | `Closed]
}
let get op = function
let get_exn op = function
| { fd = `Open fd; _ } -> fd
| { fd = `Closed ; _ } -> invalid_arg (op ^ ": file descriptor used after calling close!")
let get op = function
| { fd = `Open fd; _ } -> Ok fd
| { fd = `Closed ; _ } -> Error (Invalid_argument (op ^ ": file descriptor used after calling close!"))
let is_open = function
| { fd = `Open _; _ } -> true
| { fd = `Closed; _ } -> false
let close t =
Ctf.label "close";
let fd = get "close" t in
let fd = get_exn "close" t in
t.fd <- `Closed;
Eio.Switch.remove_hook t.release_hook;
if t.close_unix then (
@ -79,7 +83,7 @@ module FD = struct
| exception Unix.Unix_error(Unix.ESPIPE, "lseek", "") -> false
let to_unix op t =
let fd = get "to_unix" t in
let fd = get_exn "to_unix" t in
match op with
| `Peek -> fd
| `Take ->
@ -167,7 +171,7 @@ let wakeup t =
match
Log.debug (fun f -> f "Sending wakeup on eventfd %a" FD.pp t.eventfd);
Atomic.set t.need_wakeup false; (* [t] will check [run_q] after getting the event below *)
let sent = Unix.single_write (FD.get "wakeup" t.eventfd) wake_buffer 0 8 in
let sent = Unix.single_write (FD.get_exn "wakeup" t.eventfd) wake_buffer 0 8 in
assert (sent = 8)
with
| () -> Mutex.unlock t.eventfd_mutex
@ -253,22 +257,24 @@ let clear_cancel (action : _ Suspended.t) =
ignore (Fiber_context.clear_cancel_fn action.fiber : bool)
let rec submit_rw_req st ({op; file_offset; fd; buf; len; cur_off; action} as req) =
let fd = FD.get "submit_rw_req" fd in
let {uring;io_q;_} = st in
let off = Uring.Region.to_offset buf + cur_off in
let len = match len with Exactly l | Upto l -> l in
let len = len - cur_off in
let retry = with_cancel_hook ~action st (fun () ->
match op with
|`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read req)
|`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write req)
match FD.get "submit_rw_req" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok fd ->
let {uring;io_q;_} = st in
let off = Uring.Region.to_offset buf + cur_off in
let len = match len with Exactly l | Upto l -> l in
let len = len - cur_off in
let retry = with_cancel_hook ~action st (fun () ->
match op with
|`R -> Uring.read_fixed uring ~file_offset fd ~off ~len (Read req)
|`W -> Uring.write_fixed uring ~file_offset fd ~off ~len (Write req)
)
in
if retry then (
Ctf.label "await-sqe";
(* wait until an sqe is available *)
Queue.push (fun st -> submit_rw_req st req) io_q
)
in
if retry then (
Ctf.label "await-sqe";
(* wait until an sqe is available *)
Queue.push (fun st -> submit_rw_req st req) io_q
)
(* TODO bind from unixsupport *)
let errno_is_retry = function -62 | -11 | -4 -> true |_ -> false
@ -292,11 +298,14 @@ let rec enqueue_readv args st action =
| None -> FD.uring_file_offset fd
in
Ctf.label "readv";
let retry = with_cancel_hook ~action st (fun () ->
Uring.readv st.uring ~file_offset (FD.get "readv" fd) bufs (Job action))
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_readv args st action) st.io_q
match FD.get "readv" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok fd ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.readv st.uring ~file_offset fd bufs (Job action))
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_readv args st action) st.io_q
let rec enqueue_writev args st action =
let (file_offset,fd,bufs) = args in
@ -306,22 +315,28 @@ let rec enqueue_writev args st action =
| None -> FD.uring_file_offset fd
in
Ctf.label "writev";
let retry = with_cancel_hook ~action st (fun () ->
Uring.writev st.uring ~file_offset (FD.get "writev" fd) bufs (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_writev args st action) st.io_q
match FD.get "writev" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok fd ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.writev st.uring ~file_offset fd bufs (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_writev args st action) st.io_q
let rec enqueue_poll_add fd poll_mask st action =
Log.debug (fun l -> l "poll_add: submitting call");
Ctf.label "poll_add";
let retry = with_cancel_hook ~action st (fun () ->
Uring.poll_add st.uring (FD.get "poll_add" fd) poll_mask (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
match FD.get "poll_add" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok unix_fd ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.poll_add st.uring unix_fd poll_mask (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_poll_add fd poll_mask st action) st.io_q
let rec enqueue_poll_add_unix fd poll_mask st action cb =
Log.debug (fun l -> l "poll_add: submitting call");
@ -354,65 +369,98 @@ let enqueue_write st action (file_offset,fd,buf,len) =
let rec enqueue_splice ~src ~dst ~len st action =
Log.debug (fun l -> l "splice: submitting call");
Ctf.label "splice";
let retry = with_cancel_hook ~action st (fun () ->
Uring.splice st.uring (Job action) ~src:(FD.get "splice" src) ~dst:(FD.get "splice" dst) ~len
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
match FD.get "splice-src" src, FD.get "splice-dst" dst with
| Error ex, _
| _, Error ex -> enqueue_failed_thread st action ex
| Ok unix_src, Ok unix_dst ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.splice st.uring (Job action) ~src:unix_src ~dst:unix_dst ~len
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_splice ~src ~dst ~len st action) st.io_q
let rec enqueue_openat2 ((access, flags, perm, resolve, dir, path) as args) st action =
Log.debug (fun l -> l "openat2: submitting call");
Ctf.label "openat2";
let fd = Option.map (FD.get "openat2") dir in
let retry = with_cancel_hook ~action st (fun () ->
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action)
)
let use fd =
let retry = with_cancel_hook ~action st (fun () ->
Uring.openat2 st.uring ~access ~flags ~perm ~resolve ?fd path (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_openat2 args st action) st.io_q
match dir with
| None -> use None
| Some dir ->
match FD.get "openat2" dir with
| Error ex -> enqueue_failed_thread st action ex
| Ok fd -> use (Some fd)
let rec enqueue_connect fd addr st action =
Log.debug (fun l -> l "connect: submitting call");
Ctf.label "connect";
let retry = with_cancel_hook ~action st (fun () ->
Uring.connect st.uring (FD.get "connect" fd) addr (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
match FD.get "connect" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok unix_fd ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.connect st.uring unix_fd addr (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_connect fd addr st action) st.io_q
let rec extract_fds = function
| [] -> Ok []
| x :: xs ->
match FD.get "send_msg" x with
| Error _ as e -> e
| Ok fd ->
match extract_fds xs with
| Error _ as e -> e
| Ok fds -> Ok (fd :: fds)
let rec enqueue_send_msg fd ~fds ~dst buf st action =
Log.debug (fun l -> l "send_msg: submitting call");
Ctf.label "send_msg";
let retry = with_cancel_hook ~action st (fun () ->
let fds = List.map (FD.get "send_msg") fds in
Uring.send_msg st.uring (FD.get "send_msg" fd) ~fds ?dst buf (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
match FD.get "send_msg" fd, extract_fds fds with
| Error ex, _
| _, Error ex -> enqueue_failed_thread st action ex
| Ok unix_fd, Ok unix_fds ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.send_msg st.uring unix_fd ~fds:unix_fds ?dst buf (Job action)
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_send_msg fd ~fds ~dst buf st action) st.io_q
let rec enqueue_recv_msg fd msghdr st action =
Log.debug (fun l -> l "recv_msg: submitting call");
Ctf.label "recv_msg";
let retry = with_cancel_hook ~action st (fun () ->
Uring.recv_msg st.uring (FD.get "recv_msg" fd) msghdr (Job action);
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
match FD.get "recv_msg" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok unix_fd ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.recv_msg st.uring unix_fd msghdr (Job action);
)
in
if retry then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_recv_msg fd msghdr st action) st.io_q
let rec enqueue_accept fd client_addr st action =
Log.debug (fun l -> l "accept: submitting call");
Ctf.label "accept";
let retry = with_cancel_hook ~action st (fun () ->
Uring.accept st.uring (FD.get "accept" fd) client_addr (Job action)
) in
if retry then (
(* wait until an sqe is available *)
Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q
)
match FD.get "accept" fd with
| Error ex -> enqueue_failed_thread st action ex
| Ok unix_fd ->
let retry = with_cancel_hook ~action st (fun () ->
Uring.accept st.uring unix_fd client_addr (Job action)
) in
if retry then (
(* wait until an sqe is available *)
Queue.push (fun st -> enqueue_accept fd client_addr st action) st.io_q
)
let rec enqueue_noop st action =
Log.debug (fun l -> l "noop: submitting call");
@ -738,7 +786,8 @@ module Low_level = struct
FD.of_unix ~sw ~seekable ~close_unix:true fd
let fstat fd =
Unix.fstat (FD.get "fstat" fd)
(* todo: use uring *)
Unix.fstat (FD.get_exn "fstat" fd)
external eio_mkdirat : Unix.file_descr -> string -> Unix.file_perm -> unit = "caml_eio_mkdirat"
@ -749,12 +798,11 @@ module Low_level = struct
let getrandom { Cstruct.buffer; off; len } =
eio_getrandom buffer off len
(* We ignore [sw] because this isn't a uring operation yet. *)
let mkdirat ~perm dir path =
wrap_errors path @@ fun () ->
match dir with
| None -> Unix.mkdir path perm
| Some dir -> eio_mkdirat (FD.get "mkdirat" dir) path perm
| Some dir -> eio_mkdirat (FD.get_exn "mkdirat" dir) path perm
let mkdir_beneath ~perm ?dir path =
let dir_path = Filename.dirname path in
@ -773,7 +821,7 @@ module Low_level = struct
)
let shutdown socket command =
Unix.shutdown (FD.get "shutdown" socket) command
Unix.shutdown (FD.get_exn "shutdown" socket) command
let accept ~sw fd =
Ctf.label "accept";
@ -798,7 +846,7 @@ module Low_level = struct
let read_dir fd =
let rec read_all acc fd =
match eio_getdents (FD.get "getdents" fd) with
match eio_getdents (FD.get_exn "getdents" fd) with
| [] -> acc
| files ->
let files = List.filter (function ".." | "." -> false | _ -> true) files in
@ -907,7 +955,7 @@ let udp_socket sock = object
end
let flow fd =
let is_tty = lazy (Unix.isatty (FD.get "isatty" fd)) in
let is_tty = lazy (Unix.isatty (FD.get_exn "isatty" fd)) in
object (_ : <source; sink; ..>)
method fd = fd
method close = FD.close fd
@ -939,7 +987,7 @@ let flow fd =
aux (Eio.Flow.read_methods src)
method shutdown cmd =
Unix.shutdown (FD.get "shutdown" fd) @@ match cmd with
Unix.shutdown (FD.get_exn "shutdown" fd) @@ match cmd with
| `Receive -> Unix.SHUTDOWN_RECEIVE
| `Send -> Unix.SHUTDOWN_SEND
| `All -> Unix.SHUTDOWN_ALL

View File

@ -694,9 +694,12 @@ end
class dir dir_path = object (self)
inherit Eio.Dir.t
val mutable closed = false
(* Resolve a relative path to an absolute one, with no symlinks.
@raise Eio.Dir.Permission_denied if it's outside of [dir_path]. *)
method private resolve path =
if closed then Fmt.invalid_arg "Attempt to use closed directory %S" dir_path;
if Filename.is_relative path then (
let dir_path = File.realpath dir_path |> or_raise_path dir_path in
let full = File.realpath (Filename.concat dir_path path) |> or_raise_path path in
@ -743,7 +746,9 @@ class dir dir_path = object (self)
method open_dir ~sw path =
Switch.check sw;
new dir (self#resolve path)
let d = new dir (self#resolve path) in
Switch.on_release sw (fun () -> d#close);
d
(* libuv doesn't seem to provide a race-free way to do this. *)
method mkdir ~perm path =
@ -754,7 +759,7 @@ class dir dir_path = object (self)
let path = self#resolve path in
File.readdir path |> or_raise_path path
method close = ()
method close = closed <- true
end
(* Full access to the filesystem. *)

View File

@ -309,3 +309,15 @@ In that case, `with_open_in` will no longer close it on exit:
+Read 0 bytes from null device
- : unit = ()
```
# Use after close
```ocaml
# run @@ fun env ->
let closed = Switch.run (fun sw -> Eio.Dir.open_dir ~sw env#cwd ".") in
try
failwith (Eio.Dir.read_dir closed "." |> String.concat ",")
with Invalid_argument _ -> traceln "Got Invalid_argument for closed FD";;
+Got Invalid_argument for closed FD
- : unit = ()
```