Compare commits

..

5 Commits

Author SHA1 Message Date
Thomas Leonard
24571692f1
Merge pull request #597 from talex5/windows-fixes
Fix some MDX problems on Windows
2023-08-11 11:38:20 +01:00
Vesa Karvonen
f81880d293 Fix some MDX problems on Windows
- The `c_library_flags` are required to allow the stubs to be loaded dynamically by MDX.
- Fixed `dune` file to use `-cclib` as the `-l` options are for the linker rather than the compiler.
2023-08-11 09:57:24 +01:00
Thomas Leonard
b755d9e41f
Merge pull request #522 from talex5/fd-passing
Eio_unix: add FD passing
2023-08-11 09:53:35 +01:00
Thomas Leonard
90af8f755a Remove Unix_fd from unix network socket types
The normal ways of getting a socket don't include it anyway, and we can
infer it from something being a Unix socket.
2023-08-11 09:42:25 +01:00
Thomas Leonard
57d08881dc Add FD passing 2023-08-10 18:16:39 +01:00
25 changed files with 366 additions and 168 deletions

2
.gitattributes vendored Normal file
View File

@ -0,0 +1,2 @@
# To work around MDX issues
README.md text eol=lf

View File

@ -842,7 +842,8 @@ let try_mkdir path =
The checks also apply to following symlinks: The checks also apply to following symlinks:
```ocaml ```ocaml
# Unix.symlink "dir1" "link-to-dir1"; Unix.symlink "/tmp" "link-to-tmp";; # Unix.symlink "dir1" "link-to-dir1";
Unix.symlink (Filename.get_temp_dir_name ()) "link-to-tmp";;
- : unit = () - : unit = ()
# Eio_main.run @@ fun env -> # Eio_main.run @@ fun env ->

View File

@ -30,6 +30,7 @@ module Ctf = Ctf_unix
module Process = Process module Process = Process
module Net = Net module Net = Net
module Pi = Pi
module Stdenv = struct module Stdenv = struct
type base = < type base = <

View File

@ -28,37 +28,6 @@ module Resource : sig
(** [fd_opt t] returns the FD being wrapped by a generic resource, if any. (** [fd_opt t] returns the FD being wrapped by a generic resource, if any.
This just probes [t] using {!extension-FD}. *) This just probes [t] using {!extension-FD}. *)
module type FLOW = sig
include Eio.Net.Pi.STREAM_SOCKET
include Eio.File.Pi.WRITE with type t := t
val fd : t -> Fd.t
end
val flow_handler :
(module FLOW with type t = 't and type tag = 'tag) ->
('t, [`Unix_fd | 'tag Eio.Net.stream_socket_ty | Eio.File.rw_ty]) Eio.Resource.handler
module type DATAGRAM_SOCKET = sig
include Eio.Net.Pi.DATAGRAM_SOCKET
val fd : t -> Fd.t
end
val datagram_handler :
(module DATAGRAM_SOCKET with type t = 't and type tag = 'tag) ->
('t, [`Unix_fd | 'tag Eio.Net.datagram_socket_ty]) Eio.Resource.handler
module type LISTENING_SOCKET = sig
include Eio.Net.Pi.LISTENING_SOCKET
val fd : t -> Fd.t
end
val listening_socket_handler :
(module LISTENING_SOCKET with type t = 't and type tag = 'tag) ->
('t, [`Unix_fd | 'tag Eio.Net.listening_socket_ty]) Eio.Resource.handler
end end
module Net = Net module Net = Net
@ -129,3 +98,5 @@ module Private : sig
end end
module Ctf = Ctf_unix module Ctf = Ctf_unix
module Pi = Pi

View File

@ -46,6 +46,11 @@ let of_unix ~sw ?blocking ?seekable ~close_unix fd =
t.release_hook <- Switch.on_release_cancellable sw (fun () -> close t); t.release_hook <- Switch.on_release_cancellable sw (fun () -> close t);
t t
let of_unix_list ~sw fds =
match Switch.get_error sw with
| Some e -> List.iter Unix.close fds; raise e
| None -> List.map (of_unix ~sw ~close_unix:true) fds
external eio_is_blocking : Unix.file_descr -> bool = "eio_unix_is_blocking" external eio_is_blocking : Unix.file_descr -> bool = "eio_unix_is_blocking"
let is_blocking t = let is_blocking t =

View File

@ -14,6 +14,10 @@ val of_unix : sw:Switch.t -> ?blocking:bool -> ?seekable:bool -> close_unix:bool
@param seekable The value to be returned by {!is_seekable}. Defaults to probing if needed. @param seekable The value to be returned by {!is_seekable}. Defaults to probing if needed.
@param close_unix Whether {!close} also closes [fd] (this should normally be [true]). *) @param close_unix Whether {!close} also closes [fd] (this should normally be [true]). *)
val of_unix_list : sw:Switch.t -> Unix.file_descr list -> t list
(** [of_unix_list ~sw fds] is like [List.map (of_unix ~sw ~close_unix:true) fds],
except that if [sw] is off then it closes all the FDs. *)
(** {2 Using FDs} *) (** {2 Using FDs} *)
val use : t -> (Unix.file_descr -> 'a) -> if_closed:(unit -> 'a) -> 'a val use : t -> (Unix.file_descr -> 'a) -> if_closed:(unit -> 'a) -> 'a

View File

@ -1,8 +1,8 @@
open Eio.Std open Eio.Std
type stream_socket_ty = [`Unix_fd | [`Generic | `Unix] Eio.Net.stream_socket_ty] type stream_socket_ty = [`Generic | `Unix] Eio.Net.stream_socket_ty
type datagram_socket_ty = [`Unix_fd | [`Generic | `Unix] Eio.Net.datagram_socket_ty] type datagram_socket_ty = [`Generic | `Unix] Eio.Net.datagram_socket_ty
type listening_socket_ty = [`Unix_fd | [`Generic | `Unix] Eio.Net.listening_socket_ty] type listening_socket_ty = [`Generic | `Unix] Eio.Net.listening_socket_ty
type 'a stream_socket = ([> stream_socket_ty] as 'a) r type 'a stream_socket = ([> stream_socket_ty] as 'a) r
type 'a datagram_socket = ([> datagram_socket_ty] as 'a) r type 'a datagram_socket = ([> datagram_socket_ty] as 'a) r
type 'a listening_socket = ([> listening_socket_ty] as 'a) r type 'a listening_socket = ([> listening_socket_ty] as 'a) r
@ -30,6 +30,20 @@ let sockaddr_of_unix_datagram = function
let host = Ipaddr.of_unix host in let host = Ipaddr.of_unix host in
`Udp (host, port) `Udp (host, port)
let send_msg (Eio.Resource.T (t, ops)) ?(fds=[]) bufs =
let module X = (val (Eio.Resource.get ops Pi.Stream_socket)) in
let rec aux ~fds bufs =
let sent = X.send_msg t ~fds bufs in
match Cstruct.shiftv bufs sent with
| [] -> ()
| bufs -> aux bufs ~fds:[]
in
aux ~fds bufs
let recv_msg_with_fds (Eio.Resource.T (t, ops)) ~sw ~max_fds bufs =
let module X = (val (Eio.Resource.get ops Pi.Stream_socket)) in
X.recv_msg_with_fds t ~sw ~max_fds bufs
let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) = let getnameinfo (sockaddr : Eio.Net.Sockaddr.t) =
let options = let options =
match sockaddr with match sockaddr with
@ -46,15 +60,15 @@ type t = [`Generic | `Unix] Eio.Net.ty r
[@@@alert "-unstable"] [@@@alert "-unstable"]
type _ Effect.t += type _ Effect.t +=
| Import_socket_stream : Switch.t * bool * Unix.file_descr -> stream_socket_ty r Effect.t | Import_socket_stream : Switch.t * bool * Unix.file_descr -> [`Unix_fd | stream_socket_ty] r Effect.t
| Import_socket_datagram : Switch.t * bool * Unix.file_descr -> datagram_socket_ty r Effect.t | Import_socket_datagram : Switch.t * bool * Unix.file_descr -> [`Unix_fd | datagram_socket_ty] r Effect.t
| Socketpair_stream : Switch.t * Unix.socket_domain * int -> | Socketpair_stream : Switch.t * Unix.socket_domain * int ->
(stream_socket_ty r * stream_socket_ty r) Effect.t ([`Unix_fd | stream_socket_ty] r * [`Unix_fd | stream_socket_ty] r) Effect.t
| Socketpair_datagram : Switch.t * Unix.socket_domain * int -> | Socketpair_datagram : Switch.t * Unix.socket_domain * int ->
(datagram_socket_ty r * datagram_socket_ty r) Effect.t ([`Unix_fd | datagram_socket_ty] r * [`Unix_fd | datagram_socket_ty] r) Effect.t
let open_stream s = (s : _ stream_socket :> [< stream_socket_ty] r) let open_stream s = (s : _ stream_socket :> [< `Unix_fd | stream_socket_ty] r)
let open_datagram s = (s : _ datagram_socket :> [< datagram_socket_ty] r) let open_datagram s = (s : _ datagram_socket :> [< `Unix_fd | datagram_socket_ty] r)
let import_socket_stream ~sw ~close_unix fd = let import_socket_stream ~sw ~close_unix fd =
open_stream @@ Effect.perform (Import_socket_stream (sw, close_unix, fd)) open_stream @@ Effect.perform (Import_socket_stream (sw, close_unix, fd))
@ -68,3 +82,6 @@ let socketpair_stream ~sw ?(domain=Unix.PF_UNIX) ?(protocol=0) () =
let socketpair_datagram ~sw ?(domain=Unix.PF_UNIX) ?(protocol=0) () = let socketpair_datagram ~sw ?(domain=Unix.PF_UNIX) ?(protocol=0) () =
Effect.perform (Socketpair_datagram (sw, domain, protocol)) Effect.perform (Socketpair_datagram (sw, domain, protocol))
let fd socket =
Option.get (Resource.fd_opt socket)

View File

@ -4,15 +4,36 @@ open Eio.Std
These extend the types in {!Eio.Net} with support for file descriptors. *) These extend the types in {!Eio.Net} with support for file descriptors. *)
type stream_socket_ty = [`Unix_fd | [`Generic | `Unix] Eio.Net.stream_socket_ty] type stream_socket_ty = [`Generic | `Unix] Eio.Net.stream_socket_ty
type datagram_socket_ty = [`Unix_fd | [`Generic | `Unix] Eio.Net.datagram_socket_ty] type datagram_socket_ty = [`Generic | `Unix] Eio.Net.datagram_socket_ty
type listening_socket_ty = [`Unix_fd | [`Generic | `Unix] Eio.Net.listening_socket_ty] type listening_socket_ty = [`Generic | `Unix] Eio.Net.listening_socket_ty
type 'a stream_socket = ([> stream_socket_ty] as 'a) r type 'a stream_socket = ([> stream_socket_ty] as 'a) r
type 'a datagram_socket = ([> datagram_socket_ty] as 'a) r type 'a datagram_socket = ([> datagram_socket_ty] as 'a) r
type 'a listening_socket = ([> listening_socket_ty] as 'a) r type 'a listening_socket = ([> listening_socket_ty] as 'a) r
type t = [`Generic | `Unix] Eio.Net.ty r type t = [`Generic | `Unix] Eio.Net.ty r
(** {2 Passing file descriptors} *)
val send_msg :
[> `Platform of [>`Unix] | `Socket | `Stream] r ->
?fds:Fd.t list ->
Cstruct.t list -> unit
(** Like {!Eio.Flow.write}, but allows passing file descriptors (for Unix-domain sockets). *)
val recv_msg_with_fds :
[> `Platform of [>`Unix] | `Socket | `Stream] r ->
sw:Switch.t ->
max_fds:int ->
Cstruct.t list ->
int * Fd.t list
(** Like {!Eio.Flow.single_read}, but also allows receiving file descriptors (for Unix-domain sockets).
@param max_fds The maximum number of file descriptors to accept (additional ones will be closed). *)
val fd : [> `Platform of [> `Unix] | `Socket] r -> Fd.t
(** [fd socket] is the underlying FD of [socket]. *)
(** {2 Unix address conversions} (** {2 Unix address conversions}
Note: OCaml's {!Unix.sockaddr} type considers e.g. TCP port 80 and UDP port Note: OCaml's {!Unix.sockaddr} type considers e.g. TCP port 80 and UDP port
@ -34,7 +55,7 @@ end
(** {2 Creating or importing sockets} *) (** {2 Creating or importing sockets} *)
val import_socket_stream : sw:Switch.t -> close_unix:bool -> Unix.file_descr -> stream_socket_ty r val import_socket_stream : sw:Switch.t -> close_unix:bool -> Unix.file_descr -> [`Unix_fd | stream_socket_ty] r
(** [import_socket_stream ~sw ~close_unix:true fd] is an Eio flow that uses [fd]. (** [import_socket_stream ~sw ~close_unix:true fd] is an Eio flow that uses [fd].
It can be cast to e.g. {!source} for a one-way flow. It can be cast to e.g. {!source} for a one-way flow.
@ -42,7 +63,7 @@ val import_socket_stream : sw:Switch.t -> close_unix:bool -> Unix.file_descr ->
The [close_unix] and [sw] arguments are passed to {!Fd.of_unix}. *) The [close_unix] and [sw] arguments are passed to {!Fd.of_unix}. *)
val import_socket_datagram : sw:Switch.t -> close_unix:bool -> Unix.file_descr -> datagram_socket_ty r val import_socket_datagram : sw:Switch.t -> close_unix:bool -> Unix.file_descr -> [`Unix_fd | datagram_socket_ty] r
(** [import_socket_datagram ~sw ~close_unix:true fd] is an Eio datagram socket that uses [fd]. (** [import_socket_datagram ~sw ~close_unix:true fd] is an Eio datagram socket that uses [fd].
The socket object will be closed when [sw] finishes. The socket object will be closed when [sw] finishes.
@ -54,7 +75,7 @@ val socketpair_stream :
?domain:Unix.socket_domain -> ?domain:Unix.socket_domain ->
?protocol:int -> ?protocol:int ->
unit -> unit ->
stream_socket_ty r * stream_socket_ty r [`Unix_fd | stream_socket_ty] r * [`Unix_fd | stream_socket_ty] r
(** [socketpair_stream ~sw ()] returns a connected pair of flows, such that writes to one can be read by the other. (** [socketpair_stream ~sw ()] returns a connected pair of flows, such that writes to one can be read by the other.
This creates OS-level resources using [socketpair(2)]. This creates OS-level resources using [socketpair(2)].
@ -65,7 +86,7 @@ val socketpair_datagram :
?domain:Unix.socket_domain -> ?domain:Unix.socket_domain ->
?protocol:int -> ?protocol:int ->
unit -> unit ->
datagram_socket_ty r * datagram_socket_ty r [`Unix_fd | datagram_socket_ty] r * [`Unix_fd | datagram_socket_ty] r
(** [socketpair_datagram ~sw ()] returns a connected pair of flows, such that writes to one can be read by the other. (** [socketpair_datagram ~sw ()] returns a connected pair of flows, such that writes to one can be read by the other.
This creates OS-level resources using [socketpair(2)]. This creates OS-level resources using [socketpair(2)].
@ -78,11 +99,11 @@ val getnameinfo : Eio.Net.Sockaddr.t -> (string * string)
type _ Effect.t += type _ Effect.t +=
| Import_socket_stream : | Import_socket_stream :
Switch.t * bool * Unix.file_descr -> stream_socket_ty r Effect.t (** See {!import_socket_stream} *) Switch.t * bool * Unix.file_descr -> [`Unix_fd | stream_socket_ty] r Effect.t (** See {!import_socket_stream} *)
| Import_socket_datagram : | Import_socket_datagram :
Switch.t * bool * Unix.file_descr -> datagram_socket_ty r Effect.t (** See {!import_socket_datagram} *) Switch.t * bool * Unix.file_descr -> [`Unix_fd | datagram_socket_ty] r Effect.t (** See {!import_socket_datagram} *)
| Socketpair_stream : Eio.Switch.t * Unix.socket_domain * int -> | Socketpair_stream : Eio.Switch.t * Unix.socket_domain * int ->
(stream_socket_ty r * stream_socket_ty r) Effect.t (** See {!socketpair_stream} *) ([`Unix_fd | stream_socket_ty] r * [`Unix_fd | stream_socket_ty] r) Effect.t (** See {!socketpair_stream} *)
| Socketpair_datagram : Eio.Switch.t * Unix.socket_domain * int -> | Socketpair_datagram : Eio.Switch.t * Unix.socket_domain * int ->
(datagram_socket_ty r * datagram_socket_ty r) Effect.t (** See {!socketpair_datagram} *) ([`Unix_fd | datagram_socket_ty] r * [`Unix_fd | datagram_socket_ty] r) Effect.t (** See {!socketpair_datagram} *)
[@@alert "-unstable"] [@@alert "-unstable"]

51
lib_eio/unix/pi.ml Normal file
View File

@ -0,0 +1,51 @@
open Eio.Std
module type STREAM_SOCKET = sig
include Eio.Net.Pi.STREAM_SOCKET
val send_msg : t -> fds:Fd.t list -> Cstruct.t list -> int
val recv_msg_with_fds : t -> sw:Switch.t -> max_fds:int -> Cstruct.t list -> int * Fd.t list
val fd : t -> Fd.t
end
type (_, _, _) Eio.Resource.pi +=
| Stream_socket : ('t, (module STREAM_SOCKET with type t = 't), [> `Platform of [> `Unix] | `Socket | `Stream]) Eio.Resource.pi
module type FLOW = sig
include Eio.File.Pi.WRITE
include STREAM_SOCKET with type t := t
end
let flow_handler (type t tag) (module X : FLOW with type t = t and type tag = tag) : (t, _) Eio.Resource.handler =
Eio.Resource.handler @@
Eio.Resource.bindings (Eio.Net.Pi.stream_socket (module X)) @
Eio.Resource.bindings (Eio.File.Pi.rw (module X)) @ [
H (Resource.T, X.fd);
H (Stream_socket, (module X));
]
module type DATAGRAM_SOCKET = sig
include Eio.Net.Pi.DATAGRAM_SOCKET
val fd : t -> Fd.t
end
let datagram_handler (type t tag) (module X : DATAGRAM_SOCKET with type t = t and type tag = tag) : (t, _) Eio.Resource.handler =
Eio.Resource.handler @@
Eio.Resource.bindings (Eio.Net.Pi.datagram_socket (module X)) @ [
H (Resource.T, X.fd);
]
module type LISTENING_SOCKET = sig
include Eio.Net.Pi.LISTENING_SOCKET
val fd : t -> Fd.t
end
let listening_socket_handler (type t tag) (module X : LISTENING_SOCKET with type t = t and type tag = tag)
: (t, _) Eio.Resource.handler =
Eio.Resource.handler @@
Eio.Resource.bindings (Eio.Net.Pi.listening_socket (module X)) @ [
H (Resource.T, X.fd);
]

42
lib_eio/unix/pi.mli Normal file
View File

@ -0,0 +1,42 @@
open Eio.Std
module type STREAM_SOCKET = sig
include Eio.Net.Pi.STREAM_SOCKET
val send_msg : t -> fds:Fd.t list -> Cstruct.t list -> int
val recv_msg_with_fds : t -> sw:Switch.t -> max_fds:int -> Cstruct.t list -> int * Fd.t list
val fd : t -> Fd.t
end
type (_, _, _) Eio.Resource.pi +=
| Stream_socket : ('t, (module STREAM_SOCKET with type t = 't), [> `Platform of [> `Unix] | `Socket | `Stream]) Eio.Resource.pi
module type FLOW = sig
include Eio.File.Pi.WRITE
include STREAM_SOCKET with type t := t
end
val flow_handler :
(module FLOW with type t = 't and type tag = 'tag) ->
('t, [`Unix_fd | 'tag Eio.Net.stream_socket_ty | Eio.File.rw_ty]) Eio.Resource.handler
module type DATAGRAM_SOCKET = sig
include Eio.Net.Pi.DATAGRAM_SOCKET
val fd : t -> Fd.t
end
val datagram_handler :
(module DATAGRAM_SOCKET with type t = 't and type tag = 'tag) ->
('t, [`Unix_fd | 'tag Eio.Net.datagram_socket_ty]) Eio.Resource.handler
module type LISTENING_SOCKET = sig
include Eio.Net.Pi.LISTENING_SOCKET
val fd : t -> Fd.t
end
val listening_socket_handler :
(module LISTENING_SOCKET with type t = 't and type tag = 'tag) ->
('t, [`Unix_fd | 'tag Eio.Net.listening_socket_ty]) Eio.Resource.handler

View File

@ -7,42 +7,3 @@ let fd_opt (Eio.Resource.T (t, ops)) =
match Eio.Resource.get_opt ops T with match Eio.Resource.get_opt ops T with
| Some f -> Some (f t) | Some f -> Some (f t)
| None -> None | None -> None
module type FLOW = sig
include Eio.Net.Pi.STREAM_SOCKET
include Eio.File.Pi.WRITE with type t := t
val fd : t -> Fd.t
end
let flow_handler (type t tag) (module X : FLOW with type t = t and type tag = tag) : (t, _) Eio.Resource.handler =
Eio.Resource.handler @@
Eio.Resource.bindings (Eio.Net.Pi.stream_socket (module X)) @
Eio.Resource.bindings (Eio.File.Pi.rw (module X)) @ [
H (T, X.fd);
]
module type DATAGRAM_SOCKET = sig
include Eio.Net.Pi.DATAGRAM_SOCKET
val fd : t -> Fd.t
end
let datagram_handler (type t tag) (module X : DATAGRAM_SOCKET with type t = t and type tag = tag) : (t, _) Eio.Resource.handler =
Eio.Resource.handler @@
Eio.Resource.bindings (Eio.Net.Pi.datagram_socket (module X)) @ [
H (T, X.fd);
]
module type LISTENING_SOCKET = sig
include Eio.Net.Pi.LISTENING_SOCKET
val fd : t -> Fd.t
end
let listening_socket_handler (type t tag) (module X : LISTENING_SOCKET with type t = t and type tag = tag)
: (t, _) Eio.Resource.handler =
Eio.Resource.handler @@
Eio.Resource.bindings (Eio.Net.Pi.listening_socket (module X)) @ [
H (T, X.fd);
]

View File

@ -131,7 +131,7 @@ module Datagram_socket = struct
| `All -> Unix.SHUTDOWN_ALL | `All -> Unix.SHUTDOWN_ALL
end end
let datagram_handler = Eio_unix.Resource.datagram_handler (module Datagram_socket) let datagram_handler = Eio_unix.Pi.datagram_handler (module Datagram_socket)
let datagram_socket fd = let datagram_socket fd =
Eio.Resource.T (fd, datagram_handler) Eio.Resource.T (fd, datagram_handler)
@ -185,14 +185,21 @@ module Flow = struct
| `Receive -> Unix.SHUTDOWN_RECEIVE | `Receive -> Unix.SHUTDOWN_RECEIVE
| `Send -> Unix.SHUTDOWN_SEND | `Send -> Unix.SHUTDOWN_SEND
| `All -> Unix.SHUTDOWN_ALL | `All -> Unix.SHUTDOWN_ALL
let send_msg t ~fds data =
Low_level.send_msg t ~fds data
let recv_msg_with_fds t ~sw ~max_fds data =
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds data in
n, fds
end end
let flow_handler = Eio_unix.Resource.flow_handler (module Flow) let flow_handler = Eio_unix.Pi.flow_handler (module Flow)
let flow fd = let flow fd =
let r = Eio.Resource.T (fd, flow_handler) in let r = Eio.Resource.T (fd, flow_handler) in
(r : [Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :> (r : [`Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :>
[< Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r) [< `Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r)
let source fd = (flow fd :> _ Eio_unix.source) let source fd = (flow fd :> _ Eio_unix.source)
let sink fd = (flow fd :> _ Eio_unix.sink) let sink fd = (flow fd :> _ Eio_unix.sink)
@ -217,7 +224,7 @@ module Listening_socket = struct
flow, client_addr flow, client_addr
end end
let listening_handler = Eio_unix.Resource.listening_socket_handler (module Listening_socket) let listening_handler = Eio_unix.Pi.listening_socket_handler (module Listening_socket)
let listening_socket fd = let listening_socket fd =
Eio.Resource.T (fd, listening_handler) Eio.Resource.T (fd, listening_handler)

View File

@ -263,10 +263,7 @@ let recv_msg_with_fds ~sw ~max_fds fd buf =
if res < 0 then ( if res < 0 then (
raise @@ Err.wrap (Uring.error_of_errno res) "recv_msg" "" raise @@ Err.wrap (Uring.error_of_errno res) "recv_msg" ""
); );
let fds = let fds = Uring.Msghdr.get_fds msghdr |> Fd.of_unix_list ~sw in
Uring.Msghdr.get_fds msghdr
|> List.map (fun fd -> Fd.of_unix ~sw ~close_unix:true fd)
in
addr, res, fds addr, res, fds
let with_chunk ~fallback fn = let with_chunk ~fallback fn =

View File

@ -1,42 +0,0 @@
# Setting up the environment
```ocaml
# #require "eio_linux";;
```
```ocaml
open Eio.Std
let ( / ) = Eio.Path.( / )
```
Sending a file descriptor over a Unix domain socket:
```ocaml
# Eio_linux.run @@ fun env ->
Switch.run @@ fun sw ->
let fd = Eio.Path.open_out ~sw (env#cwd / "tmp.txt") ~create:(`Exclusive 0o600) in
Eio.Flow.copy_string "Test data" fd;
let r, w = Unix.(socketpair PF_UNIX SOCK_STREAM 0) in
let r = Eio_unix.Fd.of_unix ~sw ~seekable:false ~close_unix:true r in
let w = Eio_unix.Fd.of_unix ~sw ~seekable:false ~close_unix:true w in
Fiber.both
(fun () ->
let sent = Eio_linux.Low_level.send_msg w [Cstruct.of_string "x"] ~fds:[Eio_unix.Resource.fd_opt fd |> Option.get] in
assert (sent = 1)
)
(fun () ->
let buf = Cstruct.of_string "?" in
let addr, got, fds = Eio_linux.Low_level.recv_msg_with_fds ~sw r ~max_fds:10 [buf] in
traceln "Got: %S plus %d FD" (Cstruct.to_string buf) (List.length fds);
match fds with
| [fd] ->
Eio_unix.Fd.use_exn "read" fd @@ fun fd ->
ignore (Unix.lseek fd 0 Unix.SEEK_SET : int);
traceln "Read: %S" (really_input_string (Unix.in_channel_of_descr fd) 9);
| _ -> assert false
);;
+Got: "x" plus 1 FD
+Read: "Test data"
- : unit = ()
```

View File

@ -212,23 +212,51 @@ CAMLprim value caml_eio_posix_spawn(value v_errors, value v_actions) {
CAMLreturn(Val_long(child_pid)); CAMLreturn(Val_long(child_pid));
} }
CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_dst_opt, value v_bufs) { /* Copy [n_fds] from [v_fds] to [msg]. */
CAMLparam2(v_dst_opt, v_bufs); static void fill_fds(struct msghdr *msg, int n_fds, value v_fds) {
if (n_fds > 0) {
int i;
struct cmsghdr *cm;
cm = CMSG_FIRSTHDR(msg);
cm->cmsg_level = SOL_SOCKET;
cm->cmsg_type = SCM_RIGHTS;
cm->cmsg_len = CMSG_LEN(n_fds * sizeof(int));
for (i = 0; i < n_fds; i++) {
int fd = -1;
if (Is_block(v_fds)) {
fd = Int_val(Field(v_fds, 0));
v_fds = Field(v_fds, 1);
}
((int *)CMSG_DATA(cm))[i] = fd;
}
}
}
CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_n_fds, value v_fds, value v_dst_opt, value v_bufs) {
CAMLparam3(v_fds, v_dst_opt, v_bufs);
int n_bufs = Wosize_val(v_bufs); int n_bufs = Wosize_val(v_bufs);
int n_fds = Int_val(v_n_fds);
struct iovec iov[n_bufs]; struct iovec iov[n_bufs];
union sock_addr_union dst_addr; union sock_addr_union dst_addr;
int controllen = n_fds > 0 ? CMSG_SPACE(sizeof(int) * n_fds) : 0;
char cmsg[controllen];
struct msghdr msg = { struct msghdr msg = {
.msg_iov = iov, .msg_iov = iov,
.msg_iovlen = n_bufs, .msg_iovlen = n_bufs,
.msg_control = n_fds > 0 ? cmsg : NULL,
.msg_controllen = controllen,
}; };
ssize_t r; ssize_t r;
memset(cmsg, 0, controllen);
if (Is_some(v_dst_opt)) { if (Is_some(v_dst_opt)) {
caml_unix_get_sockaddr(Some_val(v_dst_opt), &dst_addr, &msg.msg_namelen); caml_unix_get_sockaddr(Some_val(v_dst_opt), &dst_addr, &msg.msg_namelen);
msg.msg_name = &dst_addr; msg.msg_name = &dst_addr;
} }
fill_iov(iov, v_bufs); fill_iov(iov, v_bufs);
fill_fds(&msg, n_fds, v_fds);
caml_enter_blocking_section(); caml_enter_blocking_section();
r = sendmsg(Int_val(v_fd), &msg, 0); r = sendmsg(Int_val(v_fd), &msg, 0);
@ -238,20 +266,49 @@ CAMLprim value caml_eio_posix_send_msg(value v_fd, value v_dst_opt, value v_bufs
CAMLreturn(Val_long(r)); CAMLreturn(Val_long(r));
} }
CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_bufs) { static value get_msghdr_fds(struct msghdr *msg) {
CAMLparam0();
CAMLlocal2(v_list, v_cons);
struct cmsghdr *cm;
v_list = Val_int(0);
for (cm = CMSG_FIRSTHDR(msg); cm; cm = CMSG_NXTHDR(msg, cm)) {
if (cm->cmsg_level == SOL_SOCKET && cm->cmsg_type == SCM_RIGHTS) {
int *fds = (int *) CMSG_DATA(cm);
int n_fds = (cm->cmsg_len - CMSG_LEN(0)) / sizeof(int);
int i;
for (i = n_fds - 1; i >= 0; i--) {
value fd = Val_int(fds[i]);
v_cons = caml_alloc_tuple(2);
Store_field(v_cons, 0, fd);
Store_field(v_cons, 1, v_list);
v_list = v_cons;
}
}
}
CAMLreturn(v_list);
}
CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_max_fds, value v_bufs) {
CAMLparam1(v_bufs); CAMLparam1(v_bufs);
CAMLlocal2(v_result, v_addr); CAMLlocal2(v_result, v_addr);
int max_fds = Int_val(v_max_fds);
int n_bufs = Wosize_val(v_bufs); int n_bufs = Wosize_val(v_bufs);
struct iovec iov[n_bufs]; struct iovec iov[n_bufs];
union sock_addr_union source_addr; union sock_addr_union source_addr;
int controllen = max_fds > 0 ? CMSG_SPACE(sizeof(int) * max_fds) : 0;
char cmsg[controllen];
struct msghdr msg = { struct msghdr msg = {
.msg_name = &source_addr, .msg_name = &source_addr,
.msg_namelen = sizeof(source_addr), .msg_namelen = sizeof(source_addr),
.msg_iov = iov, .msg_iov = iov,
.msg_iovlen = n_bufs, .msg_iovlen = n_bufs,
.msg_control = max_fds > 0 ? cmsg : NULL,
.msg_controllen = controllen,
}; };
ssize_t r; ssize_t r;
memset(cmsg, 0, controllen);
fill_iov(iov, v_bufs); fill_iov(iov, v_bufs);
caml_enter_blocking_section(); caml_enter_blocking_section();
@ -261,9 +318,10 @@ CAMLprim value caml_eio_posix_recv_msg(value v_fd, value v_bufs) {
v_addr = caml_unix_alloc_sockaddr(&source_addr, msg.msg_namelen, -1); v_addr = caml_unix_alloc_sockaddr(&source_addr, msg.msg_namelen, -1);
v_result = caml_alloc_tuple(2); v_result = caml_alloc_tuple(3);
Store_field(v_result, 0, v_addr); Store_field(v_result, 0, v_addr);
Store_field(v_result, 1, Val_long(r)); Store_field(v_result, 1, Val_long(r));
Store_field(v_result, 2, get_msghdr_fds(&msg));
CAMLreturn(v_result); CAMLreturn(v_result);
} }

View File

@ -81,12 +81,19 @@ module Impl = struct
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs) let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs)
let send_msg t ~fds data =
Low_level.send_msg ~fds t (Array.of_list data)
let recv_msg_with_fds t ~sw ~max_fds data =
let _addr, n, fds = Low_level.recv_msg_with_fds t ~sw ~max_fds (Array.of_list data) in
n, fds
let fd t = t let fd t = t
let close = Eio_unix.Fd.close let close = Eio_unix.Fd.close
end end
let handler = Eio_unix.Resource.flow_handler (module Impl) let handler = Eio_unix.Pi.flow_handler (module Impl)
let of_fd fd = let of_fd fd =
let r = Eio.Resource.T (fd, handler) in let r = Eio.Resource.T (fd, handler) in

View File

@ -79,16 +79,27 @@ let accept ~sw sock =
let shutdown sock cmd = let shutdown sock cmd =
Fd.use_exn "shutdown" sock (fun fd -> Unix.shutdown fd cmd) Fd.use_exn "shutdown" sock (fun fd -> Unix.shutdown fd cmd)
external eio_send_msg : Unix.file_descr -> Unix.sockaddr option -> Cstruct.t array -> int = "caml_eio_posix_send_msg" external eio_send_msg : Unix.file_descr -> int -> Unix.file_descr list -> Unix.sockaddr option -> Cstruct.t array -> int = "caml_eio_posix_send_msg"
external eio_recv_msg : Unix.file_descr -> Cstruct.t array -> Unix.sockaddr * int = "caml_eio_posix_recv_msg" external eio_recv_msg : Unix.file_descr -> int -> Cstruct.t array -> Unix.sockaddr * int * Unix.file_descr list = "caml_eio_posix_recv_msg"
let send_msg fd ?dst buf = let send_msg fd ?(fds = []) ?dst buf =
Fd.use_exn "send_msg" fd @@ fun fd -> Fd.use_exn "send_msg" fd @@ fun fd ->
do_nonblocking Write (fun fd -> eio_send_msg fd dst buf) fd Fd.use_exn_list "send_msg" fds @@ fun fds ->
do_nonblocking Write (fun fd -> eio_send_msg fd (List.length fds) fds dst buf) fd
let recv_msg fd buf = let recv_msg fd buf =
Fd.use_exn "recv_msg" fd @@ fun fd -> let addr, got, _ =
do_nonblocking Read (fun fd -> eio_recv_msg fd buf) fd Fd.use_exn "recv_msg" fd @@ fun fd ->
do_nonblocking Read (fun fd -> eio_recv_msg fd 0 buf) fd
in
(addr, got)
let recv_msg_with_fds ~sw ~max_fds fd buf =
let addr, got, fds =
Fd.use_exn "recv_msg" fd @@ fun fd ->
do_nonblocking Read (fun fd -> eio_recv_msg fd max_fds buf) fd
in
(addr, got, Eio_unix.Fd.of_unix_list ~sw fds)
external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_posix_getrandom" external eio_getrandom : Cstruct.buffer -> int -> int -> int = "caml_eio_posix_getrandom"

View File

@ -29,7 +29,9 @@ val accept : sw:Switch.t -> fd -> fd * Unix.sockaddr
val shutdown : fd -> Unix.shutdown_command -> unit val shutdown : fd -> Unix.shutdown_command -> unit
val recv_msg : fd -> Cstruct.t array -> Unix.sockaddr * int val recv_msg : fd -> Cstruct.t array -> Unix.sockaddr * int
val send_msg : fd -> ?dst:Unix.sockaddr -> Cstruct.t array -> int val recv_msg_with_fds : sw:Switch.t -> max_fds:int -> fd -> Cstruct.t array -> Unix.sockaddr * int * fd list
val send_msg : fd -> ?fds:fd list -> ?dst:Unix.sockaddr -> Cstruct.t array -> int
val getrandom : Cstruct.t -> unit val getrandom : Cstruct.t -> unit

View File

@ -38,7 +38,7 @@ module Listening_socket = struct
flow, client_addr flow, client_addr
end end
let listening_handler = Eio_unix.Resource.listening_socket_handler (module Listening_socket) let listening_handler = Eio_unix.Pi.listening_socket_handler (module Listening_socket)
let listening_socket ~hook fd = let listening_socket ~hook fd =
Eio.Resource.T (Listening_socket.make ~hook fd, listening_handler) Eio.Resource.T (Listening_socket.make ~hook fd, listening_handler)
@ -72,7 +72,7 @@ module Datagram_socket = struct
| Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) | Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
end end
let datagram_handler = Eio_unix.Resource.datagram_handler (module Datagram_socket) let datagram_handler = Eio_unix.Pi.datagram_handler (module Datagram_socket)
let datagram_socket fd = let datagram_socket fd =
Eio.Resource.T (fd, datagram_handler) Eio.Resource.T (fd, datagram_handler)

View File

@ -1,12 +1,13 @@
(library (library
(name eio_windows) (name eio_windows)
(public_name eio_windows) (public_name eio_windows)
(library_flags :standard -ccopt -lbcrypt -ccopt -lntdll) (library_flags :standard -cclib -lbcrypt -cclib -lntdll)
(enabled_if (= %{os_type} "Win32")) (enabled_if (= %{os_type} "Win32"))
(foreign_stubs (foreign_stubs
(language c) (language c)
(include_dirs ../lib_eio/unix/include) (include_dirs ../lib_eio/unix/include)
(names eio_windows_stubs eio_windows_cstruct_stubs)) (names eio_windows_stubs eio_windows_cstruct_stubs))
(c_library_flags :standard -lbcrypt -lntdll)
(libraries eio eio.unix eio.utils fmt)) (libraries eio eio.unix eio.utils fmt))
(rule (rule

View File

@ -74,17 +74,21 @@ module Impl = struct
let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs) let pwrite t ~file_offset bufs = Low_level.pwritev ~file_offset t (Array.of_list bufs)
let send_msg _t ~fds:_ _data = failwith "Not implemented on Windows"
let recv_msg_with_fds _t ~sw:_ ~max_fds:_ _data = failwith "Not implemented on Windows"
let fd t = t let fd t = t
let close = Eio_unix.Fd.close let close = Eio_unix.Fd.close
end end
let handler = Eio_unix.Resource.flow_handler (module Impl) let handler = Eio_unix.Pi.flow_handler (module Impl)
let of_fd fd = let of_fd fd =
let r = Eio.Resource.T (fd, handler) in let r = Eio.Resource.T (fd, handler) in
(r : [Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :> (r : [`Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r :>
[< Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r) [< `Unix_fd | Eio_unix.Net.stream_socket_ty | Eio.File.rw_ty] r)
module Secure_random = struct module Secure_random = struct
type t = unit type t = unit

View File

@ -38,7 +38,7 @@ module Listening_socket = struct
flow, client_addr flow, client_addr
end end
let listening_handler = Eio_unix.Resource.listening_socket_handler (module Listening_socket) let listening_handler = Eio_unix.Pi.listening_socket_handler (module Listening_socket)
let listening_socket ~hook fd = let listening_socket ~hook fd =
Eio.Resource.T (Listening_socket.make ~hook fd, listening_handler) Eio.Resource.T (Listening_socket.make ~hook fd, listening_handler)
@ -74,7 +74,7 @@ module Datagram_socket = struct
| Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg) | Unix.Unix_error (code, name, arg) -> raise (Err.wrap code name arg)
end end
let datagram_handler = Eio_unix.Resource.datagram_handler (module Datagram_socket) let datagram_handler = Eio_unix.Pi.datagram_handler (module Datagram_socket)
let datagram_socket fd = let datagram_socket fd =
Eio.Resource.T (fd, datagram_handler) Eio.Resource.T (fd, datagram_handler)

View File

@ -98,8 +98,8 @@ let test_wrap_socket pipe_or_socketpair () =
let test_eio_socketpair () = let test_eio_socketpair () =
Switch.run @@ fun sw -> Switch.run @@ fun sw ->
let a, b = Eio_unix.Net.socketpair_stream ~sw () in let a, b = Eio_unix.Net.socketpair_stream ~sw () in
ignore (Eio_unix.Resource.fd a : Eio_unix.Fd.t); ignore (Eio_unix.Net.fd a : Eio_unix.Fd.t);
ignore (Eio_unix.Resource.fd b : Eio_unix.Fd.t); ignore (Eio_unix.Net.fd b : Eio_unix.Fd.t);
Eio.Flow.copy_string "foo" a; Eio.Flow.copy_string "foo" a;
Eio.Flow.close a; Eio.Flow.close a;
let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in

77
tests/fd_passing.md Normal file
View File

@ -0,0 +1,77 @@
# Setting up the environment
```ocaml
# #require "eio_main";;
```
```ocaml
open Eio.Std
let ( / ) = Eio.Path.( / )
```
```ocaml
(* Send [to_send] to [w] and get it from [r], then read it. *)
let test ~to_send r w =
Switch.run @@ fun sw ->
Fiber.both
(fun () -> Eio_unix.Net.send_msg w [Cstruct.of_string "x"] ~fds:to_send)
(fun () ->
let buf = Cstruct.of_string "?" in
let got, fds = Eio_unix.Net.recv_msg_with_fds ~sw r ~max_fds:2 [buf] in
let msg = Cstruct.to_string buf ~len:got in
traceln "Got: %S plus %d FDs" msg (List.length fds);
fds |> List.iter (fun fd ->
Eio_unix.Fd.use_exn "read" fd @@ fun fd ->
let len = Unix.lseek fd 0 Unix.SEEK_CUR in
ignore (Unix.lseek fd 0 Unix.SEEK_SET : int);
traceln "Read: %S" (really_input_string (Unix.in_channel_of_descr fd) len);
)
)
let with_tmp_file dir id fn =
let path = (dir / (Printf.sprintf "tmp-%s.txt" id)) in
Eio.Path.with_open_out path ~create:(`Exclusive 0o600) @@ fun file ->
Fun.protect
(fun () ->
Eio.Flow.copy_string id file;
fn (Option.get (Eio_unix.Resource.fd_opt file))
)
~finally:(fun () -> Eio.Path.unlink path)
```
## Tests
Using a socket-pair:
```ocaml
# Eio_main.run @@ fun env ->
with_tmp_file env#cwd "foo" @@ fun fd1 ->
with_tmp_file env#cwd "bar" @@ fun fd2 ->
Switch.run @@ fun sw ->
let r, w = Eio_unix.Net.socketpair_stream ~sw ~domain:PF_UNIX ~protocol:0 () in
test ~to_send:[fd1; fd2] r w;;
+Got: "x" plus 2 FDs
+Read: "foo"
+Read: "bar"
- : unit = ()
```
Using named sockets:
```ocaml
# Eio_main.run @@ fun env ->
let net = env#net in
with_tmp_file env#cwd "foo" @@ fun fd ->
Switch.run @@ fun sw ->
let addr = `Unix "test.socket" in
let server = Eio.Net.listen ~sw net ~reuse_addr:true ~backlog:1 addr in
let r, w = Fiber.pair
(fun () -> Eio.Net.connect ~sw net addr)
(fun () -> fst (Eio.Net.accept ~sw server))
in
test ~to_send:[fd] r w;;
+Got: "x" plus 1 FDs
+Read: "foo"
- : unit = ()
```

View File

@ -470,8 +470,8 @@ Exception: Failure "Simulated error".
# Eio_main.run @@ fun _ -> # Eio_main.run @@ fun _ ->
Switch.run @@ fun sw -> Switch.run @@ fun sw ->
let a, b = Eio_unix.Net.socketpair_stream ~sw () in let a, b = Eio_unix.Net.socketpair_stream ~sw () in
ignore (Eio_unix.Resource.fd a : Eio_unix.Fd.t); ignore (Eio_unix.Net.fd a : Eio_unix.Fd.t);
ignore (Eio_unix.Resource.fd b : Eio_unix.Fd.t); ignore (Eio_unix.Net.fd b : Eio_unix.Fd.t);
Eio.Flow.copy_string "foo" a; Eio.Flow.copy_string "foo" a;
Eio.Flow.close a; Eio.Flow.close a;
let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in
@ -746,8 +746,8 @@ Eio.Io Net Connection_failure Timeout,
# Eio_main.run @@ fun _ -> # Eio_main.run @@ fun _ ->
Switch.run @@ fun sw -> Switch.run @@ fun sw ->
let a, b = Eio_unix.Net.socketpair_datagram ~sw ~domain:Unix.PF_UNIX () in let a, b = Eio_unix.Net.socketpair_datagram ~sw ~domain:Unix.PF_UNIX () in
ignore (Eio_unix.Resource.fd a : Eio_unix.Fd.t); ignore (Eio_unix.Net.fd a : Eio_unix.Fd.t);
ignore (Eio_unix.Resource.fd b : Eio_unix.Fd.t); ignore (Eio_unix.Net.fd b : Eio_unix.Fd.t);
let l = [ "foo"; "bar"; "foobar"; "cellar door"; "" ] in let l = [ "foo"; "bar"; "foobar"; "cellar door"; "" ] in
let buf = Cstruct.create 32 in let buf = Cstruct.create 32 in
let write bufs = Eio.Net.send a (List.map Cstruct.of_string bufs) in let write bufs = Eio.Net.send a (List.map Cstruct.of_string bufs) in