Add network API to Eio

This commit is contained in:
Thomas Leonard 2021-05-25 13:52:02 +01:00
parent e321b9889d
commit 97606e8132
8 changed files with 166 additions and 13 deletions

View File

@ -18,6 +18,7 @@ unreleased repository.
* [Tracing](#tracing)
* [Switches, errors and cancellation](#switches-errors-and-cancellation)
* [Performance](#performance)
* [Networking](#networking)
* [Further reading](#further-reading)
<!-- vim-markdown-toc -->
@ -139,8 +140,8 @@ so let's make a little wrapper to simplify future examples:
```ocaml
let run fn =
Eio_main.run @@ fun _ ->
try fn ()
Eio_main.run @@ fun env ->
try fn env
with Failure msg -> traceln "Error: %s" msg
```
@ -149,7 +150,7 @@ let run fn =
Here's an example running two threads of execution (fibres) concurrently:
```ocaml
let main () =
let main _env =
Switch.top @@ fun sw ->
Fibre.both ~sw
(fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done)
@ -213,7 +214,7 @@ This is a form of [structured concurrency][].
Here's what happens if one of the two threads above fails:
```ocaml
# run @@ fun () ->
# run @@ fun _env ->
Switch.top @@ fun sw ->
Fibre.both ~sw
(fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done)
@ -239,7 +240,7 @@ Any operation that can be cancelled should take a `~sw` argument.
Switches can also be used to wait for threads even when there isn't an error. e.g.
```ocaml
# run @@ fun () ->
# run @@ fun _env ->
Switch.top (fun sw ->
Fibre.fork_ignore ~sw (fun () -> for i = 1 to 3 do traceln "i = %d" i; Fibre.yield ~sw () done);
traceln "First thread forked";
@ -317,6 +318,65 @@ On my machine, this code path uses the Linux-specific `splice` system call for m
Note that not all cases are well optimised yet, but the idea is for each backend to choose the most efficient way to implement the operation.
## Networking
Eio provides a simple high-level API for networking.
Here is a client that connects to address `addr` using `network` and sends a message:
```ocaml
let run_client ~network ~addr =
traceln "Connecting to server...";
let flow = Eio.Network.connect network addr in
Eio.Flow.write_string flow "Hello from client";
Eio.Flow.close flow
```
Here is a server that listens on `socket` and handles a single connection by reading a message:
```ocaml
let run_server ~sw socket =
Eio.Network.Listening_socket.accept_sub socket ~sw (fun ~sw flow _addr ->
traceln "Server accepted connection from client";
let b = Buffer.create 100 in
let buf = Eio.Flow.buffer_sink b in
Eio.Flow.write buf ~src:flow;
traceln "Server received: %S" (Buffer.contents b);
Eio.Flow.close flow
) ~on_error:(fun ex -> traceln "Error handling connection: %s" (Printexc.to_string ex));
traceln "(normally we'd loop and accept more connections here)"
```
Notes:
- `accept_sub` handles the connection in a new fibre, with its own sub-switch.
- Normally, a server would call `accept_sub` in a loop to handle multiple connections.
We can test them in a single process using `Fibre.both`:
```ocaml
let main ~network ~addr =
Switch.top @@ fun sw ->
let server = Eio.Network.bind network ~reuse_addr:true addr in
Eio.Network.Listening_socket.listen server 5;
traceln "Server ready...";
Fibre.both ~sw
(fun () -> run_server ~sw server)
(fun () -> run_client ~network ~addr)
```
```ocaml
# run @@ fun env ->
main
~network:(Eio.Stdenv.network env)
~addr:Unix.(ADDR_INET (inet_addr_loopback, 8080))
Server ready...
Connecting to server...
Server accepted connection from client
(normally we'd loop and accept more connections here)
Server received: "Hello from client"
- : unit = ()
```
## Further reading
Some background about the effects system can be found in:

View File

@ -12,6 +12,7 @@
"An effect-based IO API for multicore OCaml with fibres.")
(depends
(ctf (= :version))
(fibreslib (= :version))
(alcotest (and (>= 1.4.0) :with-test))))
(package
(name eunix)

View File

@ -10,6 +10,7 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues"
depends: [
"dune" {>= "2.8"}
"ctf" {= version}
"fibreslib" {= version}
"alcotest" {>= "1.4.0" & with-test}
"odoc" {with-doc}
]

View File

@ -1,4 +1,4 @@
(library
(name eio)
(public_name eio)
(libraries cstruct))
(libraries cstruct fibreslib))

View File

@ -1,3 +1,5 @@
open Fibreslib
(** A base class for objects that can be queried at runtime for extra features. *)
module Generic = struct
type 'a ty = ..
@ -61,6 +63,8 @@ module Flow = struct
(** [write src] writes data from [src] until end-of-file. *)
let write (t : #write) ~src = t#write src
let write_string t s = write t ~src:(string_source s)
(** Consumer base class. *)
class virtual sink = object (_ : #Generic.t)
method probe _ = None
@ -89,15 +93,48 @@ module Flow = struct
end
end
module Network = struct
module Listening_socket = struct
class virtual t = object
method virtual listen : int -> unit
method virtual accept_sub :
sw:Switch.t ->
on_error:(exn -> unit) ->
(sw:Switch.t -> <Flow.two_way; Flow.close> -> Unix.sockaddr -> unit) ->
unit
end
let listen (t : #t) = t#listen
(** [accept t fn] waits for a new connection to [t] and then runs [fn ~sw flow client_addr] in a new fibre,
created with [Fibre.fork_sub_ignore]. *)
let accept_sub (t : #t) = t#accept_sub
end
class virtual t = object
method virtual bind : reuse_addr:bool -> Unix.sockaddr -> Listening_socket.t
method virtual connect : Unix.sockaddr -> <Flow.two_way; Flow.close>
end
(** [bind ~sw t addr] is a new listening socket bound to local address [addr]. *)
let bind ?(reuse_addr=false) (t:#t) = t#bind ~reuse_addr
(** [connect t addr] is a new socket connected to remote address [addr]. *)
let connect (t:#t) = t#connect
end
(** The standard environment of a process. *)
module Stdenv = struct
type t = <
stdin : Flow.source;
stdout : Flow.sink;
stderr : Flow.sink;
network : Network.t;
>
let stdin (t : <stdin : #Flow.source; ..>) = t#stdin
let stdout (t : <stdout : #Flow.sink; ..>) = t#stdout
let stderr (t : <stderr : #Flow.sink; ..>) = t#stderr
let network (t : <network : #Network.t; ..>) = t#network
end

View File

@ -80,10 +80,10 @@ type rw_req = {
(* Type of user-data attached to jobs. *)
type io_job =
| Noop
| Read : rw_req -> io_job
| Poll_add : int Suspended.t -> io_job
| Splice : int Suspended.t -> io_job
| Connect : int Suspended.t -> io_job
| Close : int Suspended.t -> io_job
| Write : rw_req -> io_job
@ -164,6 +164,13 @@ let rec enqueue_splice st action ~src ~dst ~len =
if not subm then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_splice st action ~src ~dst ~len) st.io_q
let rec enqueue_connect st action fd addr =
Log.debug (fun l -> l "connect: submitting call");
Ctf.label "connect";
let subm = Uring.connect st.uring (FD.get "connect" fd) addr (Connect action) in
if not subm then (* wait until an sqe is available *)
Queue.push (fun st -> enqueue_connect st action fd addr) st.io_q
let submit_pending_io st =
match Queue.take_opt st.io_q with
| None -> ()
@ -220,10 +227,12 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
| Splice k ->
Log.debug (fun l -> l "splice returned");
Suspended.continue k result
| Connect k ->
Log.debug (fun l -> l "connect returned");
Suspended.continue k result
| Close k ->
Log.debug (fun l -> l "close returned");
Suspended.continue k result
| Noop -> assert false
end
)
and complete_rw_req st ({len; cur_off; action; _} as req) res =
@ -317,6 +326,11 @@ let splice src ~dst ~len =
else if res = 0 then raise End_of_file
else raise (Unix.Unix_error (Uring.error_of_errno res, "splice", ""))
effect Connect : FD.t * Unix.sockaddr -> int
let connect fd addr =
let res = perform (Connect (fd, addr)) in
if res < 0 then raise (Unix.Unix_error (Uring.error_of_errno res, "connect", ""))
let with_chunk fn =
let chunk = alloc () in
Fun.protect ~finally:(fun () -> free chunk) @@ fun () ->
@ -335,7 +349,7 @@ let accept socket =
await_readable socket;
Ctf.label "accept";
let conn, addr = Unix.accept ~cloexec:true (FD.get "accept" socket) in
FD.of_unix conn, addr
FD.of_unix ~seekable:false conn, addr
module Objects = struct
type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty
@ -401,10 +415,41 @@ module Objects = struct
let source fd = (flow fd :> source)
let sink fd = (flow fd :> sink)
let listening_socket fd = object
inherit Eio.Network.Listening_socket.t
method listen n = Unix.listen (FD.get "listen" fd) n
method accept_sub ~sw ~on_error fn =
let client, client_addr = accept fd in
Fibre.fork_sub_ignore ~sw ~on_error (fun sw ->
fn ~sw (flow client :> <Eio.Flow.two_way; Eio.Flow.close>) client_addr
)
end
let network = object
inherit Eio.Network.t
method bind ~reuse_addr addr =
let sock_unix = Unix.(socket PF_INET SOCK_STREAM 0) in
if reuse_addr then
Unix.setsockopt sock_unix Unix.SO_REUSEADDR true;
let sock = FD.of_unix ~seekable:false sock_unix in
Unix.bind sock_unix addr;
listening_socket sock
method connect addr =
let sock_unix = Unix.(socket PF_INET SOCK_STREAM 0) in
let sock = FD.of_unix ~seekable:false sock_unix in
connect sock addr;
(flow sock :> <Eio.Flow.two_way; Eio.Flow.close>)
end
type stdenv = <
stdin : source;
stdout : sink;
stderr : sink;
network : Eio.Network.t;
>
let stdenv () =
@ -415,13 +460,14 @@ module Objects = struct
method stdin = Lazy.force stdin
method stdout = Lazy.force stdout
method stderr = Lazy.force stderr
method network = network
end
end
let pipe () =
let r, w = Unix.pipe () in
let r = Objects.source (FD.of_unix r) in
let w = Objects.sink (FD.of_unix w) in
let r = Objects.source (FD.of_unix ~seekable:false r) in
let w = Objects.sink (FD.of_unix ~seekable:false w) in
r, w
let run ?(queue_depth=64) ?(block_size=4096) main =
@ -429,7 +475,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
let stdenv = Objects.stdenv () in
(* TODO unify this allocation API around baregion/uring *)
let fixed_buf_len = block_size * queue_depth in
let uring = Uring.create ~fixed_buf_len ~queue_depth ~default:Noop () in
let uring = Uring.create ~fixed_buf_len ~queue_depth () in
let buf = Uring.buf uring in
let mem = Uring.Region.init ~block_size buf queue_depth in
let run_q = Queue.create () in
@ -462,6 +508,10 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
let k = { Suspended.k; tid } in
enqueue_splice st k ~src ~dst ~len;
schedule st
| effect (Connect (fd, addr)) k ->
let k = { Suspended.k; tid } in
enqueue_connect st k fd addr;
schedule st
| effect Fibre_impl.Effects.Yield k ->
let k = { Suspended.k; tid } in
enqueue_thread st k ();

View File

@ -82,6 +82,9 @@ val splice : FD.t -> dst:FD.t -> len:int -> int
@raise End_of_file [src] is at the end of the file.
@raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *)
val connect : FD.t -> Unix.sockaddr -> unit
(** [connect fd addr] attempts to connect socket [fd] to [addr]. *)
val await_readable : FD.t -> unit
(** [await_readable fd] blocks until [fd] is readable (or has an error). *)
@ -111,6 +114,7 @@ module Objects : sig
stdin : source;
stdout : sink;
stderr : sink;
network : Eio.Network.t;
>
end

@ -1 +1 @@
Subproject commit 8d5676ac15f36222220426860afe402b5e1d0add
Subproject commit 462a6e3a06e430f5101f9c6877b68c3aebfef213