From 97606e81321394b4b72fd538ed2a37fb855709a9 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Tue, 25 May 2021 13:52:02 +0100 Subject: [PATCH] Add network API to Eio --- README.md | 70 +++++++++++++++++++++++++++++++++++++++++---- dune-project | 1 + eio.opam | 1 + lib_eio/dune | 2 +- lib_eio/eio.ml | 37 ++++++++++++++++++++++++ lib_eunix/eunix.ml | 62 +++++++++++++++++++++++++++++++++++---- lib_eunix/eunix.mli | 4 +++ ocaml-uring | 2 +- 8 files changed, 166 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 048ab9b..f7ba760 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ unreleased repository. * [Tracing](#tracing) * [Switches, errors and cancellation](#switches-errors-and-cancellation) * [Performance](#performance) +* [Networking](#networking) * [Further reading](#further-reading) @@ -139,8 +140,8 @@ so let's make a little wrapper to simplify future examples: ```ocaml let run fn = - Eio_main.run @@ fun _ -> - try fn () + Eio_main.run @@ fun env -> + try fn env with Failure msg -> traceln "Error: %s" msg ``` @@ -149,7 +150,7 @@ let run fn = Here's an example running two threads of execution (fibres) concurrently: ```ocaml -let main () = +let main _env = Switch.top @@ fun sw -> Fibre.both ~sw (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done) @@ -213,7 +214,7 @@ This is a form of [structured concurrency][]. Here's what happens if one of the two threads above fails: ```ocaml -# run @@ fun () -> +# run @@ fun _env -> Switch.top @@ fun sw -> Fibre.both ~sw (fun () -> for x = 1 to 3 do traceln "x = %d" x; Fibre.yield ~sw () done) @@ -239,7 +240,7 @@ Any operation that can be cancelled should take a `~sw` argument. Switches can also be used to wait for threads even when there isn't an error. e.g. ```ocaml -# run @@ fun () -> +# run @@ fun _env -> Switch.top (fun sw -> Fibre.fork_ignore ~sw (fun () -> for i = 1 to 3 do traceln "i = %d" i; Fibre.yield ~sw () done); traceln "First thread forked"; @@ -317,6 +318,65 @@ On my machine, this code path uses the Linux-specific `splice` system call for m Note that not all cases are well optimised yet, but the idea is for each backend to choose the most efficient way to implement the operation. +## Networking + +Eio provides a simple high-level API for networking. +Here is a client that connects to address `addr` using `network` and sends a message: + +```ocaml +let run_client ~network ~addr = + traceln "Connecting to server..."; + let flow = Eio.Network.connect network addr in + Eio.Flow.write_string flow "Hello from client"; + Eio.Flow.close flow +``` + +Here is a server that listens on `socket` and handles a single connection by reading a message: + +```ocaml +let run_server ~sw socket = + Eio.Network.Listening_socket.accept_sub socket ~sw (fun ~sw flow _addr -> + traceln "Server accepted connection from client"; + let b = Buffer.create 100 in + let buf = Eio.Flow.buffer_sink b in + Eio.Flow.write buf ~src:flow; + traceln "Server received: %S" (Buffer.contents b); + Eio.Flow.close flow + ) ~on_error:(fun ex -> traceln "Error handling connection: %s" (Printexc.to_string ex)); + traceln "(normally we'd loop and accept more connections here)" +``` + +Notes: + +- `accept_sub` handles the connection in a new fibre, with its own sub-switch. +- Normally, a server would call `accept_sub` in a loop to handle multiple connections. + +We can test them in a single process using `Fibre.both`: + +```ocaml +let main ~network ~addr = + Switch.top @@ fun sw -> + let server = Eio.Network.bind network ~reuse_addr:true addr in + Eio.Network.Listening_socket.listen server 5; + traceln "Server ready..."; + Fibre.both ~sw + (fun () -> run_server ~sw server) + (fun () -> run_client ~network ~addr) +``` + +```ocaml +# run @@ fun env -> + main + ~network:(Eio.Stdenv.network env) + ~addr:Unix.(ADDR_INET (inet_addr_loopback, 8080)) +Server ready... +Connecting to server... +Server accepted connection from client +(normally we'd loop and accept more connections here) +Server received: "Hello from client" +- : unit = () +``` + ## Further reading Some background about the effects system can be found in: diff --git a/dune-project b/dune-project index 2ca92a2..0c441ea 100644 --- a/dune-project +++ b/dune-project @@ -12,6 +12,7 @@ "An effect-based IO API for multicore OCaml with fibres.") (depends (ctf (= :version)) + (fibreslib (= :version)) (alcotest (and (>= 1.4.0) :with-test)))) (package (name eunix) diff --git a/eio.opam b/eio.opam index 50e98af..120ec80 100644 --- a/eio.opam +++ b/eio.opam @@ -10,6 +10,7 @@ bug-reports: "https://github.com/ocaml-multicore/eio/issues" depends: [ "dune" {>= "2.8"} "ctf" {= version} + "fibreslib" {= version} "alcotest" {>= "1.4.0" & with-test} "odoc" {with-doc} ] diff --git a/lib_eio/dune b/lib_eio/dune index 6647ce5..6a88a1e 100644 --- a/lib_eio/dune +++ b/lib_eio/dune @@ -1,4 +1,4 @@ (library (name eio) (public_name eio) - (libraries cstruct)) + (libraries cstruct fibreslib)) diff --git a/lib_eio/eio.ml b/lib_eio/eio.ml index 585b4cd..34a99c0 100644 --- a/lib_eio/eio.ml +++ b/lib_eio/eio.ml @@ -1,3 +1,5 @@ +open Fibreslib + (** A base class for objects that can be queried at runtime for extra features. *) module Generic = struct type 'a ty = .. @@ -61,6 +63,8 @@ module Flow = struct (** [write src] writes data from [src] until end-of-file. *) let write (t : #write) ~src = t#write src + let write_string t s = write t ~src:(string_source s) + (** Consumer base class. *) class virtual sink = object (_ : #Generic.t) method probe _ = None @@ -89,15 +93,48 @@ module Flow = struct end end +module Network = struct + module Listening_socket = struct + class virtual t = object + method virtual listen : int -> unit + method virtual accept_sub : + sw:Switch.t -> + on_error:(exn -> unit) -> + (sw:Switch.t -> -> Unix.sockaddr -> unit) -> + unit + end + + let listen (t : #t) = t#listen + + (** [accept t fn] waits for a new connection to [t] and then runs [fn ~sw flow client_addr] in a new fibre, + created with [Fibre.fork_sub_ignore]. *) + let accept_sub (t : #t) = t#accept_sub + end + + class virtual t = object + method virtual bind : reuse_addr:bool -> Unix.sockaddr -> Listening_socket.t + method virtual connect : Unix.sockaddr -> + end + + (** [bind ~sw t addr] is a new listening socket bound to local address [addr]. *) + let bind ?(reuse_addr=false) (t:#t) = t#bind ~reuse_addr + + (** [connect t addr] is a new socket connected to remote address [addr]. *) + let connect (t:#t) = t#connect +end + (** The standard environment of a process. *) module Stdenv = struct type t = < stdin : Flow.source; stdout : Flow.sink; stderr : Flow.sink; + network : Network.t; > let stdin (t : ) = t#stdin let stdout (t : ) = t#stdout let stderr (t : ) = t#stderr + + let network (t : ) = t#network end diff --git a/lib_eunix/eunix.ml b/lib_eunix/eunix.ml index 52202a2..a9e8a1d 100644 --- a/lib_eunix/eunix.ml +++ b/lib_eunix/eunix.ml @@ -80,10 +80,10 @@ type rw_req = { (* Type of user-data attached to jobs. *) type io_job = - | Noop | Read : rw_req -> io_job | Poll_add : int Suspended.t -> io_job | Splice : int Suspended.t -> io_job + | Connect : int Suspended.t -> io_job | Close : int Suspended.t -> io_job | Write : rw_req -> io_job @@ -164,6 +164,13 @@ let rec enqueue_splice st action ~src ~dst ~len = if not subm then (* wait until an sqe is available *) Queue.push (fun st -> enqueue_splice st action ~src ~dst ~len) st.io_q +let rec enqueue_connect st action fd addr = + Log.debug (fun l -> l "connect: submitting call"); + Ctf.label "connect"; + let subm = Uring.connect st.uring (FD.get "connect" fd) addr (Connect action) in + if not subm then (* wait until an sqe is available *) + Queue.push (fun st -> enqueue_connect st action fd addr) st.io_q + let submit_pending_io st = match Queue.take_opt st.io_q with | None -> () @@ -220,10 +227,12 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = | Splice k -> Log.debug (fun l -> l "splice returned"); Suspended.continue k result + | Connect k -> + Log.debug (fun l -> l "connect returned"); + Suspended.continue k result | Close k -> Log.debug (fun l -> l "close returned"); Suspended.continue k result - | Noop -> assert false end ) and complete_rw_req st ({len; cur_off; action; _} as req) res = @@ -317,6 +326,11 @@ let splice src ~dst ~len = else if res = 0 then raise End_of_file else raise (Unix.Unix_error (Uring.error_of_errno res, "splice", "")) +effect Connect : FD.t * Unix.sockaddr -> int +let connect fd addr = + let res = perform (Connect (fd, addr)) in + if res < 0 then raise (Unix.Unix_error (Uring.error_of_errno res, "connect", "")) + let with_chunk fn = let chunk = alloc () in Fun.protect ~finally:(fun () -> free chunk) @@ fun () -> @@ -335,7 +349,7 @@ let accept socket = await_readable socket; Ctf.label "accept"; let conn, addr = Unix.accept ~cloexec:true (FD.get "accept" socket) in - FD.of_unix conn, addr + FD.of_unix ~seekable:false conn, addr module Objects = struct type _ Eio.Generic.ty += FD : FD.t Eio.Generic.ty @@ -401,10 +415,41 @@ module Objects = struct let source fd = (flow fd :> source) let sink fd = (flow fd :> sink) + let listening_socket fd = object + inherit Eio.Network.Listening_socket.t + + method listen n = Unix.listen (FD.get "listen" fd) n + + method accept_sub ~sw ~on_error fn = + let client, client_addr = accept fd in + Fibre.fork_sub_ignore ~sw ~on_error (fun sw -> + fn ~sw (flow client :> ) client_addr + ) + end + + let network = object + inherit Eio.Network.t + + method bind ~reuse_addr addr = + let sock_unix = Unix.(socket PF_INET SOCK_STREAM 0) in + if reuse_addr then + Unix.setsockopt sock_unix Unix.SO_REUSEADDR true; + let sock = FD.of_unix ~seekable:false sock_unix in + Unix.bind sock_unix addr; + listening_socket sock + + method connect addr = + let sock_unix = Unix.(socket PF_INET SOCK_STREAM 0) in + let sock = FD.of_unix ~seekable:false sock_unix in + connect sock addr; + (flow sock :> ) + end + type stdenv = < stdin : source; stdout : sink; stderr : sink; + network : Eio.Network.t; > let stdenv () = @@ -415,13 +460,14 @@ module Objects = struct method stdin = Lazy.force stdin method stdout = Lazy.force stdout method stderr = Lazy.force stderr + method network = network end end let pipe () = let r, w = Unix.pipe () in - let r = Objects.source (FD.of_unix r) in - let w = Objects.sink (FD.of_unix w) in + let r = Objects.source (FD.of_unix ~seekable:false r) in + let w = Objects.sink (FD.of_unix ~seekable:false w) in r, w let run ?(queue_depth=64) ?(block_size=4096) main = @@ -429,7 +475,7 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let stdenv = Objects.stdenv () in (* TODO unify this allocation API around baregion/uring *) let fixed_buf_len = block_size * queue_depth in - let uring = Uring.create ~fixed_buf_len ~queue_depth ~default:Noop () in + let uring = Uring.create ~fixed_buf_len ~queue_depth () in let buf = Uring.buf uring in let mem = Uring.Region.init ~block_size buf queue_depth in let run_q = Queue.create () in @@ -462,6 +508,10 @@ let run ?(queue_depth=64) ?(block_size=4096) main = let k = { Suspended.k; tid } in enqueue_splice st k ~src ~dst ~len; schedule st + | effect (Connect (fd, addr)) k -> + let k = { Suspended.k; tid } in + enqueue_connect st k fd addr; + schedule st | effect Fibre_impl.Effects.Yield k -> let k = { Suspended.k; tid } in enqueue_thread st k (); diff --git a/lib_eunix/eunix.mli b/lib_eunix/eunix.mli index 4a805e4..7fc355a 100644 --- a/lib_eunix/eunix.mli +++ b/lib_eunix/eunix.mli @@ -82,6 +82,9 @@ val splice : FD.t -> dst:FD.t -> len:int -> int @raise End_of_file [src] is at the end of the file. @raise Unix.Unix_error(EINVAL, "splice", _) if splice is not supported for these FDs. *) +val connect : FD.t -> Unix.sockaddr -> unit +(** [connect fd addr] attempts to connect socket [fd] to [addr]. *) + val await_readable : FD.t -> unit (** [await_readable fd] blocks until [fd] is readable (or has an error). *) @@ -111,6 +114,7 @@ module Objects : sig stdin : source; stdout : sink; stderr : sink; + network : Eio.Network.t; > end diff --git a/ocaml-uring b/ocaml-uring index 8d5676a..462a6e3 160000 --- a/ocaml-uring +++ b/ocaml-uring @@ -1 +1 @@ -Subproject commit 8d5676ac15f36222220426860afe402b5e1d0add +Subproject commit 462a6e3a06e430f5101f9c6877b68c3aebfef213