Add Eio_unix.socketpair

Also, rename `take`/`peek` to `take_opt`/`peek_opt` to make way for
non-optional versions.
This commit is contained in:
Thomas Leonard 2022-07-22 15:49:10 +01:00
parent 74db28cf96
commit 089b176f23
7 changed files with 155 additions and 26 deletions

View File

@ -1,3 +1,13 @@
type unix_fd = <
unix_fd : [`Peek | `Take] -> Unix.file_descr;
>
type socket = <
Eio.Flow.two_way;
Eio.Flow.close;
unix_fd;
>
module Private = struct
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty
@ -5,7 +15,8 @@ module Private = struct
| Await_readable : Unix.file_descr -> unit Effect.t
| Await_writable : Unix.file_descr -> unit Effect.t
| Get_system_clock : Eio.Time.clock Effect.t
| Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> < Eio.Flow.two_way; Eio.Flow.close > Effect.t
| Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr -> socket Effect.t
| Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int -> (socket * socket) Effect.t
end
let await_readable fd = Effect.perform (Private.Await_readable fd)
@ -25,12 +36,18 @@ let run_in_systhread fn =
Effect.perform (Eio.Private.Effects.Suspend f)
module FD = struct
let peek x = Eio.Generic.probe x (Private.Unix_file_descr `Peek)
let take x = Eio.Generic.probe x (Private.Unix_file_descr `Take)
let peek x = x#unix_fd `Peek
let take x = x#unix_fd `Take
let peek_opt x = Eio.Generic.probe x (Private.Unix_file_descr `Peek)
let take_opt x = Eio.Generic.probe x (Private.Unix_file_descr `Take)
let as_socket ~sw ~close_unix fd = Effect.perform (Private.Socket_of_fd (sw, close_unix, fd))
end
let socketpair ~sw ?(domain=Unix.PF_UNIX) ?(ty=Unix.SOCK_STREAM) ?(protocol=0) () =
Effect.perform (Private.Socketpair (sw, domain, ty, protocol))
module Ipaddr = struct
let to_unix : _ Eio.Net.Ipaddr.t -> Unix.inet_addr = Obj.magic
let of_unix : Unix.inet_addr -> _ Eio.Net.Ipaddr.t = Obj.magic

View File

@ -4,6 +4,18 @@
For example, it is possible to leak file descriptors this way, or to use them after they've been closed,
allowing one module to corrupt a file belonging to an unrelated module. *)
open Eio.Std
type unix_fd = <
unix_fd : [`Peek | `Take] -> Unix.file_descr;
>
type socket = <
Eio.Flow.two_way;
Eio.Flow.close;
unix_fd;
>
val await_readable : Unix.file_descr -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *)
@ -12,15 +24,23 @@ val await_writable : Unix.file_descr -> unit
(** Convert between [Unix.file_descr] and Eio objects. *)
module FD : sig
val peek : #Eio.Generic.t -> Unix.file_descr option
(** [peek x] is the Unix file descriptor underlying [x], if any.
val peek : < unix_fd; .. > -> Unix.file_descr
(** [peek x] is the Unix file descriptor underlying [x].
The caller must ensure that they do not continue to use the result after [x] is closed. *)
val take : #Eio.Generic.t -> Unix.file_descr option
val peek_opt : #Eio.Generic.t -> Unix.file_descr option
(** [peek_opt x] is the Unix file descriptor underlying [x], if any.
The caller must ensure that they do not continue to use the result after [x] is closed. *)
val take : < unix_fd; .. > -> Unix.file_descr
(** [take x] is like [peek], but also marks [x] as closed on success (without actually closing the FD).
[x] can no longer be used after this, and the caller is responsible for closing the FD. *)
val as_socket : sw:Eio.Switch.t -> close_unix:bool -> Unix.file_descr -> < Eio.Flow.two_way; Eio.Flow.close >
val take_opt : #Eio.Generic.t -> Unix.file_descr option
(** [take_opt x] is like [peek_opt], but also marks [x] as closed on success (without actually closing the FD).
[x] can no longer be used after this, and the caller is responsible for closing the FD. *)
val as_socket : sw:Switch.t -> close_unix:bool -> Unix.file_descr -> socket
(** [as_socket ~sw ~close_unix:true fd] is an Eio flow that uses [fd].
It can be cast to e.g. {!Eio.source} for a one-way flow.
The socket object will be closed when [sw] finishes.
@ -46,6 +66,17 @@ val run_in_systhread : (unit -> 'a) -> 'a
(** [run_in_systhread fn] runs the function [fn] in a newly created system thread (a {! Thread.t}).
This allows blocking calls to be made non-blocking. *)
val socketpair :
sw:Switch.t ->
?domain:Unix.socket_domain ->
?ty:Unix.socket_type ->
?protocol:int ->
unit ->
socket * socket
(** [socketpair ~sw ()] returns a connected pair of flows, such that writes to one can be read by the other.
This creates OS-level resources using [socketpair(2)].
Note that, like all FDs created by Eio, they are both marked as close-on-exec by default. *)
(** API for Eio backends only. *)
module Private : sig
type _ Eio.Generic.ty += Unix_file_descr : [`Peek | `Take] -> Unix.file_descr Eio.Generic.ty
@ -55,8 +86,10 @@ module Private : sig
| Await_readable : Unix.file_descr -> unit Effect.t (** See {!await_readable} *)
| Await_writable : Unix.file_descr -> unit Effect.t (** See {!await_writable} *)
| Get_system_clock : Eio.Time.clock Effect.t (** See {!sleep} *)
| Socket_of_fd : Eio.Switch.t * bool * Unix.file_descr ->
< Eio.Flow.two_way; Eio.Flow.close > Effect.t (** See {!FD.as_socket} *)
| Socket_of_fd : Switch.t * bool * Unix.file_descr ->
socket Effect.t (** See {!FD.as_socket} *)
| Socketpair : Eio.Switch.t * Unix.socket_domain * Unix.socket_type * int ->
(socket * socket) Effect.t (** See {!socketpair} *)
end
module Ctf = Ctf_unix

View File

@ -992,6 +992,8 @@ let flow fd =
| `Receive -> Unix.SHUTDOWN_RECEIVE
| `Send -> Unix.SHUTDOWN_SEND
| `All -> Unix.SHUTDOWN_ALL
method unix_fd op = FD.to_unix op fd
end
let source fd = (flow fd :> source)
@ -1347,7 +1349,13 @@ let rec run ?(queue_depth=64) ?n_blocks ?(block_size=4096) ?polling_timeout ?fal
| Eio_unix.Private.Get_system_clock -> Some (fun k -> continue k clock)
| Eio_unix.Private.Socket_of_fd (sw, close_unix, fd) -> Some (fun k ->
let fd = FD.of_unix ~sw ~seekable:false ~close_unix fd in
continue k (flow fd :> < Eio.Flow.two_way; Eio.Flow.close >)
continue k (flow fd :> Eio_unix.socket)
)
| Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k ->
let a, b = Unix.socketpair ~cloexec:true domain ty protocol in
let a = FD.of_unix ~sw ~seekable:false ~close_unix:true a |> flow in
let b = FD.of_unix ~sw ~seekable:false ~close_unix:true b |> flow in
continue k ((a :> Eio_unix.socket), (b :> Eio_unix.socket))
)
| Low_level.Alloc -> Some (fun k ->
match st.mem with

View File

@ -447,6 +447,8 @@ let socket sock = object
| Eio_unix.Private.Unix_file_descr op -> Stream.to_unix_opt op sock
| x -> super#probe x
method unix_fd op = Stream.to_unix_opt op sock |> Option.get
method read_into buf =
let buf = Cstruct.to_bigarray buf in
Stream.read_into sock buf
@ -618,11 +620,11 @@ let net = object
let sock = Luv.TCP.init ~loop:(get_loop ()) () |> or_raise |> Handle.of_luv ~sw in
let addr = luv_addr_of_eio host port in
await_exn (fun _loop _fiber -> Luv.TCP.connect (Handle.get "connect" sock) addr);
socket sock
(socket sock :> < Eio.Flow.two_way; Eio.Flow.close> )
| `Unix path ->
let sock = Luv.Pipe.init ~loop:(get_loop ()) () |> or_raise |> Handle.of_luv ~sw in
await_exn (fun _loop _fiber -> Luv.Pipe.connect (Handle.get "connect" sock) path);
socket sock
(socket sock :> < Eio.Flow.two_way; Eio.Flow.close> )
method datagram_socket ~sw = function
| `Udp (host, port) ->
@ -874,7 +876,28 @@ let rec run main =
let sock = Luv.TCP.init ~loop () |> or_raise in
let handle = Handle.of_luv ~sw ~close_unix sock in
Luv.TCP.open_ sock fd |> or_raise;
continue k (socket handle :> < Eio.Flow.two_way; Eio.Flow.close >)
continue k (socket handle :> Eio_unix.socket)
with Luv_error _ as ex ->
discontinue k ex
)
| Eio_unix.Private.Socketpair (sw, domain, ty, protocol) -> Some (fun k ->
try
if domain <> Unix.PF_UNIX then failwith "Only PF_UNIX sockets are supported by libuv";
let ty =
match ty with
| Unix.SOCK_DGRAM -> `DGRAM
| Unix.SOCK_STREAM -> `STREAM
| Unix.SOCK_RAW -> `RAW
| Unix.SOCK_SEQPACKET -> failwith "Type SEQPACKET not support by libuv"
in
let a, b = Luv.TCP.socketpair ty protocol |> or_raise in
let wrap x =
let sock = Luv.TCP.init ~loop () |> or_raise in
Luv.TCP.open_ sock x |> or_raise;
let h = Handle.of_luv ~sw ~close_unix:true sock in
(socket h :> Eio_unix.socket)
in
continue k (wrap a, wrap b)
with Luv_error _ as ex ->
discontinue k ex
)

View File

@ -1,4 +1,4 @@
# Setting up the environment
## Setting up the environment
```ocaml
# #require "eio_main";;
@ -31,7 +31,7 @@ let mock_source items =
end
```
# read_exact
## read_exact
```ocaml
# run @@ fun () ->
@ -53,7 +53,7 @@ let mock_source items =
Exception: End_of_file.
```
# copy
## copy
```ocaml
# run @@ fun () ->

View File

@ -1,4 +1,4 @@
# Setting up the environment
## Setting up the environment
```ocaml
# #require "eio_main";;
@ -23,7 +23,7 @@ let read_all flow =
exception Graceful_shutdown
```
# Test cases
## Test cases
A simple client:
@ -175,23 +175,23 @@ Working with UDP and endpoints:
- : unit = ()
```
# Unix interop
## Unix interop
Extracting file descriptors from Eio objects:
```ocaml
# run @@ fun ~net sw ->
let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
traceln "Listening socket has Unix FD: %b" (Eio_unix.FD.peek server <> None);
traceln "Listening socket has Unix FD: %b" (Eio_unix.FD.peek_opt server <> None);
let have_client, have_server =
Fiber.pair
(fun () ->
let flow = Eio.Net.connect ~sw net addr in
(Eio_unix.FD.peek flow <> None)
(Eio_unix.FD.peek_opt flow <> None)
)
(fun () ->
let flow, _addr = Eio.Net.accept ~sw server in
(Eio_unix.FD.peek flow <> None)
(Eio_unix.FD.peek_opt flow <> None)
)
in
traceln "Client-side socket has Unix FD: %b" have_client;
@ -274,7 +274,7 @@ Wrapping a Unix FD as an Eio socket:
- : unit = ()
```
# Accept_fork error handling
## Accept_fork error handling
On success, we close the connection immediately:
@ -329,7 +329,7 @@ If the fork itself fails, we still close the connection:
Exception: Failure "Simulated error".
```
# Cancelling multiple jobs
## Cancelling multiple jobs
We start two jobs and cancel both. Cancellation happens in series. By the time the second job's cancel function is called, it has already finished.
@ -392,3 +392,51 @@ let mock_cancellable ~sw ~server ~set_client_ready =
+Client connected
Exception: Failure "Simulated error".
```
## Socketpair
```ocaml
# Eio_main.run @@ fun _ ->
Switch.run @@ fun sw ->
let a, b = Eio_unix.socketpair ~sw () in
ignore (Eio_unix.FD.peek a : Unix.file_descr);
ignore (Eio_unix.FD.peek b : Unix.file_descr);
Eio.Flow.copy_string "foo" a;
Eio.Flow.close a;
let msg = Eio.Buf_read.of_flow b ~max_size:10 |> Eio.Buf_read.take_all in
traceln "Got: %S" msg;;
+Got: "foo"
- : unit = ()
```
## Errors
ECONNRESET:
```ocaml
# Eio_main.run @@ fun _ ->
Switch.run @@ fun sw ->
let a, b = Eio_unix.socketpair ~sw () in
Eio.Flow.copy_string "foo" a;
Eio.Flow.close b; (* Close without reading *)
try
ignore (Eio.Flow.read a (Cstruct.create 1) : int);
assert false
with Eio.Net.Connection_reset _ -> traceln "Connection failed (good)";;
+Connection failed (good)
- : unit = ()
```
EPIPE:
```ocaml
# Eio_main.run @@ fun _ ->
Switch.run @@ fun sw ->
let a, b = Eio_unix.socketpair ~sw () in
Eio.Flow.close b;
try
Eio.Flow.copy_string "foo" a;
assert false
with Eio.Net.Connection_reset _ -> traceln "Connection failed (good)";;
+Connection failed (good)
- : unit = ()
```

View File

@ -286,7 +286,7 @@ We can get the Unix FD from the flow and use it directly:
# run @@ fun env ->
let fs = Eio.Stdenv.fs env in
Eio.Dir.with_open_in fs Filename.null (fun flow ->
match Eio_unix.FD.peek flow with
match Eio_unix.FD.peek_opt flow with
| None -> failwith "No Unix file descriptor!"
| Some fd ->
let got = Unix.read fd (Bytes.create 10) 0 10 in
@ -302,7 +302,7 @@ In that case, `with_open_in` will no longer close it on exit:
```ocaml
# run @@ fun env ->
let fs = Eio.Stdenv.fs env in
let fd = Eio.Dir.with_open_in fs Filename.null (fun flow -> Option.get (Eio_unix.FD.take flow)) in
let fd = Eio.Dir.with_open_in fs Filename.null (fun flow -> Option.get (Eio_unix.FD.take_opt flow)) in
let got = Unix.read fd (Bytes.create 10) 0 10 in
traceln "Read %d bytes from null device" got;
Unix.close fd;;