From 36e39cc2e7d47d32bd70b36d0052286ac5162dd3 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Mon, 27 Jun 2022 16:39:48 +0100 Subject: [PATCH] Replace accept_sub with accept_fork Now that cancellation is separate to switches, there's no need for a `sw` argument here. This also removes the need for the (complicated!) `Fiber.fork_on_accept`. When accepting, we now attach the new socket to the parent switch, and just make sure to close it when the handler returns manually. --- README.md | 8 +- lib_eio/core/eio__core.mli | 17 ---- lib_eio/core/fiber.ml | 54 ------------ lib_eio/mock/eio_mock.mli | 5 +- lib_eio/mock/flow.ml | 13 ++- lib_eio/mock/net.ml | 8 +- lib_eio/net.ml | 18 +++- lib_eio/net.mli | 21 +++-- tests/mocks.md | 2 +- tests/test_fibre.md | 168 ------------------------------------- tests/test_network.md | 62 +++++++++++++- 11 files changed, 112 insertions(+), 264 deletions(-) diff --git a/README.md b/README.md index 5e11b6c..fdee5ff 100644 --- a/README.md +++ b/README.md @@ -463,7 +463,7 @@ Here is a server that listens on `socket` and handles a single connection by rea ```ocaml let run_server socket = Switch.run @@ fun sw -> - Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr -> + Eio.Net.accept_fork socket ~sw (fun flow _addr -> traceln "Server accepted connection from client"; let b = Buffer.create 100 in Eio.Flow.copy flow (Eio.Flow.buffer_sink b); @@ -474,9 +474,9 @@ let run_server socket = Notes: -- `accept_sub` handles the connection in a new fiber, with its own subswitch. -- Normally, a server would call `accept_sub` in a loop to handle multiple connections. -- When the child switch created by `accept_sub` finishes, `flow` is closed automatically. +- `accept_fork` handles the connection in a new fiber. +- Normally, a server would call `accept_fork` in a loop to handle multiple connections. +- When the handler passed to `accept_fork` finishes, `flow` is closed automatically. This can also be tested on its own using a mock network: diff --git a/lib_eio/core/eio__core.mli b/lib_eio/core/eio__core.mli index 7d57cf8..7b67753 100644 --- a/lib_eio/core/eio__core.mli +++ b/lib_eio/core/eio__core.mli @@ -240,23 +240,6 @@ module Fiber : sig If it raises in turn, the parent switch is failed. It is not called if the parent [sw] itself is cancelled. *) - val fork_on_accept : - on_handler_error:(exn -> unit) -> - sw:Switch.t -> - (Switch.t -> 'a) -> - (Switch.t -> 'a -> unit) -> - unit - (** [fork_on_accept ~sw accept handle ~on_handler_error] creates a new sub-switch [t]. - It runs [accept t] in the current fiber and, on success, runs [handle t result] in a new fiber. - It is useful for e.g. accepting network connections, - where we need to provide a switch for the new client socket before we have forked, - but then move it to a child fiber later. - - If [accept] raises an exception then the effect is the same as [Switch.run accept]. - If [handle] raises an exception, it is passed to [on_handler_error]. - If that raises in turn, the parent switch is failed. - [on_handler_error] is not called if the parent [sw] is itself cancelled. *) - val fork_promise : sw:Switch.t -> (unit -> 'a) -> 'a Promise.or_exn (** [fork_promise ~sw fn] schedules [fn ()] to run in a new fiber and returns a promise for its result. diff --git a/lib_eio/core/fiber.ml b/lib_eio/core/fiber.ml index d356940..fab9f0d 100644 --- a/lib_eio/core/fiber.ml +++ b/lib_eio/core/fiber.ml @@ -33,60 +33,6 @@ let fork_promise ~sw f = ); p -let fork_on_accept ~on_handler_error ~sw:adopting_sw accept handle = - (* Create a new sub-switch of [adopting_sw]. - Run [accept] with the new switch as an argument, - but itself still running in the original context. - This situation is unusual because we have a switch but we're not in [Switch.run], - so we have to make sure we finish it safely in all cases. *) - Switch.check_our_domain adopting_sw; - Switch.check adopting_sw; - let cc = Cancel.create ~protected:false in - let deactivate = Cancel.activate cc ~parent:adopting_sw.cancel in - let child_switch = Switch.create cc in - (* We must prevent [adopting_sw] from finishing while it has a child switch. *) - Switch.inc_fibers adopting_sw; - let run_child fn = - match Switch.run_internal child_switch (fun (_ : Switch.t) -> fn ()) with - | () -> deactivate (); Switch.dec_fibers adopting_sw; Switch.check adopting_sw - | exception ex -> deactivate (); Switch.dec_fibers adopting_sw; raise ex - in - (* From this point we have a [child_switch] that may have fibers and other resources attached to it. - We must call [run_child] on it in all cases. *) - match accept child_switch with - | exception ex -> - (* Accept failed. Don't fork. Shut down [child_switch] in the parent context. *) - run_child (fun () -> raise ex) - | _ when not (Cancel.is_on adopting_sw.cancel) -> - (* We can't fork into [adopting_sw] if it's cancelled, so just clean up and report Cancelled. - The main error is owned by [adopting_sw]. *) - begin - try run_child ignore - with - | Cancel.Cancelled _ as ex -> raise ex - | ex -> raise (Cancel.Cancelled ex) - end; - assert false (* [run_child] must have failed if its parent is cancelled *) - | x -> - (* Accept succeeded. Fork a new fiber into [adopting_sw] and - run it with [child_switch] as its context. *) - let new_fiber = Cancel.Fiber_context.make ~cc:child_switch.cancel in - fork_raw new_fiber @@ fun () -> - match run_child (fun () -> Switch.check child_switch; handle child_switch x) with - | () -> - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None - | exception ex -> - (* No point reporting an error if we're being cancelled. Also, nowhere to run it. *) - if Cancel.is_on adopting_sw.cancel then ( - Switch.run_in adopting_sw @@ fun () -> - try on_handler_error ex - with ex2 -> - (* The [run_in] ensures [adopting_sw] isn't finished here *) - Switch.fail adopting_sw ex; - Switch.fail adopting_sw ex2 - ); - Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex) - let all xs = Switch.run @@ fun sw -> List.iter (fork ~sw) xs diff --git a/lib_eio/mock/eio_mock.mli b/lib_eio/mock/eio_mock.mli index 1d59b2d..ff3288b 100644 --- a/lib_eio/mock/eio_mock.mli +++ b/lib_eio/mock/eio_mock.mli @@ -95,6 +95,7 @@ module Flow : sig on_read : string Handler.t; on_copy_bytes : int Handler.t; set_copy_method : copy_method -> unit; + attach_to_switch : Eio.Switch.t -> unit; > val make : ?pp:string Fmt.t -> string -> t @@ -124,7 +125,7 @@ module Net : sig type listening_socket = < Eio.Net.listening_socket; - on_accept : (Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) Handler.t; + on_accept : (Flow.t * Eio.Net.Sockaddr.stream) Handler.t; > val make : string -> t @@ -144,7 +145,7 @@ module Net : sig val on_accept : listening_socket -> - (#Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) Handler.actions -> + (Flow.t * Eio.Net.Sockaddr.stream) Handler.actions -> unit (** [on_accept socket actions] configures how to respond when the server calls "accept". *) end diff --git a/lib_eio/mock/flow.ml b/lib_eio/mock/flow.ml index 71ed05c..4c187d1 100644 --- a/lib_eio/mock/flow.ml +++ b/lib_eio/mock/flow.ml @@ -11,6 +11,7 @@ type t = < on_read : string Handler.t; on_copy_bytes : int Handler.t; set_copy_method : copy_method -> unit; + attach_to_switch : Switch.t -> unit; > let pp_default f s = @@ -60,9 +61,11 @@ let make ?(pp=pp_default) label = done with End_of_file -> () in - object + object (self) inherit Eio.Flow.two_way + val on_close = Queue.create () + method on_read = on_read method on_copy_bytes = on_copy_bytes @@ -96,10 +99,18 @@ let make ?(pp=pp_default) label = | `Send -> "send" | `All -> "all" + method attach_to_switch sw = + let hook = Switch.on_release_cancellable sw (fun () -> Eio.Flow.close self) in + Queue.add (fun () -> Eio.Switch.remove_hook hook) on_close + method close = + while not (Queue.is_empty on_close) do + Queue.take on_close () + done; traceln "%s: closed" label end let on_read (t:t) = Handler.seq t#on_read let on_copy_bytes (t:t) = Handler.seq t#on_copy_bytes let set_copy_method (t:t) = t#set_copy_method +let attach_to_switch (t:t) = t#attach_to_switch diff --git a/lib_eio/mock/net.ml b/lib_eio/mock/net.ml index 01bdcd3..69f8e41 100644 --- a/lib_eio/mock/net.ml +++ b/lib_eio/mock/net.ml @@ -51,7 +51,7 @@ let on_datagram_socket (t:t) actions = type listening_socket = < Eio.Net.listening_socket; - on_accept : (Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) Handler.t; + on_accept : (Flow.t * Eio.Net.Sockaddr.stream) Handler.t; > let listening_socket label = @@ -63,14 +63,14 @@ let listening_socket label = method accept ~sw = let socket, addr = Handler.run on_accept in - Switch.on_release sw (fun () -> Eio.Flow.close socket); + Flow.attach_to_switch socket sw; traceln "%s: accepted connection from %a" label Eio.Net.Sockaddr.pp addr; - socket, addr + (socket :> Eio.Net.stream_socket), addr method close = traceln "%s: closed" label end let on_accept (l:listening_socket) actions = - let as_accept_pair x = (x :> Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) in + let as_accept_pair x = (x :> Flow.t * Eio.Net.Sockaddr.stream) in Handler.seq l#on_accept (List.map (Action.map as_accept_pair) actions) diff --git a/lib_eio/net.ml b/lib_eio/net.ml index 0156b16..d3bf9c6 100644 --- a/lib_eio/net.ml +++ b/lib_eio/net.ml @@ -148,10 +148,22 @@ end let accept ~sw (t : #listening_socket) = t#accept ~sw +let accept_fork ~sw (t : #listening_socket) ~on_error handle = + let child_started = ref false in + let flow, addr = accept ~sw t in + Fun.protect ~finally:(fun () -> if !child_started = false then Flow.close flow) + (fun () -> + Fiber.fork ~sw (fun () -> + match child_started := true; handle flow addr with + | x -> Flow.close flow; x + | exception ex -> + Flow.close flow; + on_error ex + ) + ) + let accept_sub ~sw (t : #listening_socket) ~on_error handle = - let accept sw = t#accept ~sw in - let handle sw (flow, addr) = handle ~sw flow addr in - Fiber.fork_on_accept ~sw accept handle ~on_handler_error:on_error + accept_fork ~sw t ~on_error (fun flow addr -> Switch.run (fun sw -> handle ~sw flow addr)) class virtual datagram_socket = object inherit socket diff --git a/lib_eio/net.mli b/lib_eio/net.mli index d1d8bf5..f0e9c29 100644 --- a/lib_eio/net.mli +++ b/lib_eio/net.mli @@ -134,7 +134,19 @@ val accept : (** [accept ~sw socket] waits until a new connection is ready on [socket] and returns it. The new socket will be closed automatically when [sw] finishes, if not closed earlier. - If you want to handle multiple connections, consider using {!accept_sub} instead. *) + If you want to handle multiple connections, consider using {!accept_fork} instead. *) + +val accept_fork : + sw:Switch.t -> + #listening_socket -> + on_error:(exn -> unit) -> + (stream_socket -> Sockaddr.stream -> unit) -> + unit +(** [accept_fork socket fn] accepts a connection and handles it in a new fiber. + + After accepting a connection to [socket], it runs [fn flow client_addr] in a new fiber. + + [flow] will be closed automatically when [fn] returns, if not already closed by then. *) val accept_sub : sw:Switch.t -> @@ -142,12 +154,7 @@ val accept_sub : on_error:(exn -> unit) -> (sw:Switch.t -> stream_socket -> Sockaddr.stream -> unit) -> unit -(** [accept socket fn] accepts a connection and handles it in a new fiber. - - After accepting a connection to [socket], it runs [fn ~sw flow client_addr] in a new fiber, - using {!Fiber.fork_on_accept}. - - [flow] will be closed automatically when the sub-switch is finished, if not already closed by then. *) +[@@deprecated "Use accept_fork instead"] (** {2 Datagram Sockets} *) diff --git a/tests/mocks.md b/tests/mocks.md index b0ff026..4b0a7ff 100644 --- a/tests/mocks.md +++ b/tests/mocks.md @@ -39,7 +39,7 @@ A simple test server: let echo_server ~net addr = Switch.run @@ fun sw -> let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in - Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr -> Eio.Flow.copy flow flow) + Eio.Net.accept_fork socket ~sw (fun flow _addr -> Eio.Flow.copy flow flow) ~on_error:(traceln "Error handling connection: %a" Fmt.exn);; ``` diff --git a/tests/test_fibre.md b/tests/test_fibre.md index 526a01a..9fdcacf 100644 --- a/tests/test_fibre.md +++ b/tests/test_fibre.md @@ -302,174 +302,6 @@ Same with `first`: - : unit = () ``` -# fork_on_accept - -We can attach resources to the switch in the accept function, -and they get released when the child fiber finishes. - -```ocaml -let test_fork_on_accept ?(reraise=false) ?(cancel=false) ?(in_accept=ignore) ?(in_handler=ignore) sw = - let on_handler_error = - if reraise then raise - else (fun ex -> traceln "on_handler_error: %a" Fmt.exn ex; Fiber.check ()) - in - Fiber.both (fun () -> - Fiber.fork_on_accept ~sw ~on_handler_error - (fun sw -> - traceln "Got connection"; - Switch.on_release sw (fun () -> traceln "Releasing connection"; Fiber.check ()); - in_accept sw; - 1 - ) - (fun sw x -> - traceln "Run handler with %d" x; - Fiber.yield (); - in_handler sw; - traceln "Handler done" - ); - ) - (fun () -> if cancel then failwith "Simulated failure"); - traceln "Main fiber resumes"; - "Main fiber result" -``` - -The success case: - -```ocaml -# run @@ fun () -> - Switch.run test_fork_on_accept;; -+Got connection -+Run handler with 1 -+Main fiber resumes -+Handler done -+Releasing connection -+Main fiber result -- : unit = () -``` - -The accept function fails: - -```ocaml -# run @@ fun () -> - Switch.run @@ test_fork_on_accept ~in_accept:(fun _ -> failwith "Accept failure");; -+Got connection -+Releasing connection -Exception: Failure "Accept failure". -``` - -The handler function fails: - -```ocaml -# run @@ fun () -> - Switch.run @@ test_fork_on_accept ~in_handler:(fun _ -> failwith "Handler fails");; -+Got connection -+Run handler with 1 -+Main fiber resumes -+Releasing connection -+on_handler_error: Failure("Handler fails") -+Main fiber result -- : unit = () -``` - -Turning off the child switch in the accept function. We treat this as the handler being cancelled: - -```ocaml -# run @@ fun () -> - Switch.run @@ test_fork_on_accept ~in_accept:(fun sw -> Switch.fail sw (Failure "Accept turn-off"));; -+Got connection -+Releasing connection -+on_handler_error: Failure("Accept turn-off") -+Main fiber resumes -+Main fiber result -- : unit = () -``` - -Propagating handling errors to the parent: - -```ocaml -# run @@ fun () -> - Switch.run @@ test_fork_on_accept ~in_handler:(fun _ -> failwith "Handler fails") ~reraise:true;; -+Got connection -+Run handler with 1 -+Main fiber resumes -+Releasing connection -Exception: Failure "Handler fails". -``` - -Cancelling while in accept: - -```ocaml -# run @@ fun () -> - Switch.run @@ test_fork_on_accept ~in_accept:(fun _ -> Fiber.await_cancel ()) ~cancel:true;; -+Got connection -+Releasing connection -Exception: Failure "Simulated failure". -``` - -Cancelling while in handler. `on_hander_error` is not called for cancellations: - -```ocaml -# run @@ fun () -> - Switch.run @@ test_fork_on_accept ~in_handler:(fun _ -> Fiber.await_cancel ()) ~cancel:true;; -+Got connection -+Run handler with 1 -+Releasing connection -Exception: Failure "Simulated failure". -``` - -Parent switch turned off. The main error is reported by the owner of the background thread, -with `fork_on_accept` just getting a `Cancelled` exception. The background switch can't exit -until the connection is released. - -```ocaml -# run @@ fun () -> - Switch.run @@ fun sw -> - let bg_switch = ref None in - Fiber.fork_sub ~sw ~on_error:(traceln "Background thread failed: %a" Fmt.exn) (fun sw -> - bg_switch := Some sw; Fiber.await_cancel () - ); - let bg_switch = Option.get !bg_switch in - test_fork_on_accept bg_switch - ~in_accept:(fun _ -> - Switch.fail bg_switch (Failure "Background switch turned off"); - Fiber.yield () - );; -+Got connection -+Releasing connection -+Background thread failed: Failure("Background switch turned off") -Exception: Cancelled: Failure("Background switch turned off") -``` - -The child outlives the forking context. The error handler runs in `bg_switch`, so it still works: - -```ocaml -# run @@ fun () -> - Switch.run @@ fun sw -> - let bg_switch = ref None in - Fiber.fork_sub ~sw ~on_error:(traceln "Background thread failed: %a" Fmt.exn) (fun sw -> - bg_switch := Some sw; Fiber.yield () - ); - let bg_switch = Option.get !bg_switch in - let x = Switch.run (fun _ -> - test_fork_on_accept bg_switch - ~in_handler:(fun _ -> - Fiber.yield (); - failwith "Simulated error" - ) - ) in - traceln "Main switch done"; - x - ;; -+Got connection -+Run handler with 1 -+Main fiber resumes -+Main switch done -+Releasing connection -+on_handler_error: Failure("Simulated error") -+Main fiber result -- : unit = () -``` - # Forking while cancelled ```ocaml diff --git a/tests/test_network.md b/tests/test_network.md index 1f7e9c2..d75bd25 100644 --- a/tests/test_network.md +++ b/tests/test_network.md @@ -2,6 +2,7 @@ ```ocaml # #require "eio_main";; +# #require "eio.mock";; ``` ```ocaml @@ -41,7 +42,7 @@ A simple server: ```ocaml let run_server ~sw socket = while true do - Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr -> + Eio.Net.accept_fork socket ~sw (fun flow _addr -> traceln "Server accepted connection from client"; Fun.protect (fun () -> let msg = read_all flow in @@ -110,7 +111,7 @@ Cancelling the read: let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in Fiber.both (fun () -> - Eio.Net.accept_sub server ~sw (fun ~sw flow _addr -> + Eio.Net.accept_fork server ~sw (fun flow _addr -> try Fiber.both (fun () -> raise (Promise.await shutdown)) @@ -143,7 +144,7 @@ Calling accept when the switch is already off: # run @@ fun ~net sw -> let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in Switch.fail sw (Failure "Simulated error"); - Eio.Net.accept_sub server ~sw (fun ~sw:_ _flow _addr -> assert false) + Eio.Net.accept_fork server ~sw (fun _flow _addr -> assert false) ~on_error:raise;; Exception: Failure "Simulated error". ``` @@ -272,3 +273,58 @@ Wrapping a Unix FD as an Eio socket: +Got: "Hello" - : unit = () ``` + +# Accept_fork error handling + +On success, we close the connection immediately: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let socket = Eio_mock.Net.listening_socket "tcp/80" in + let flow = Eio_mock.Flow.make "connection" in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in + Eio_mock.Net.on_accept socket [`Return (flow, addr)]; + Switch.run @@ fun sw -> + Eio.Net.accept_fork ~sw ~on_error:raise socket + (fun _flow _addr -> ()); + traceln "Mock connection should have been closed by now";; ++tcp/80: accepted connection from tcp:127.0.0.1:1234 ++connection: closed ++Mock connection should have been closed by now +- : unit = () +``` +If the forked fiber fails, we close immediately: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let socket = Eio_mock.Net.listening_socket "tcp/80" in + let flow = Eio_mock.Flow.make "connection" in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in + Eio_mock.Net.on_accept socket [`Return (flow, addr)]; + Switch.run @@ fun sw -> + Eio.Net.accept_fork ~sw ~on_error:raise socket + (fun _flow _addr -> failwith "Simulated error"); + traceln "Mock connection should have been closed by now";; ++tcp/80: accepted connection from tcp:127.0.0.1:1234 ++connection: closed ++Mock connection should have been closed by now +Exception: Failure "Simulated error". +``` +If the fork itself fails, we still close the connection: + +```ocaml +# Eio_mock.Backend.run @@ fun () -> + let socket = Eio_mock.Net.listening_socket "tcp/80" in + let flow = Eio_mock.Flow.make "connection" in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in + Eio_mock.Net.on_accept socket [`Return (flow, addr)]; + Switch.run @@ fun sw -> + Switch.fail sw (Failure "Simulated error"); + Eio.Net.accept_fork ~sw ~on_error:raise socket + (fun _flow _addr -> assert false); + traceln "Mock connection should have been closed by now";; ++tcp/80: accepted connection from tcp:127.0.0.1:1234 ++connection: closed ++Mock connection should have been closed by now +Exception: Failure "Simulated error". +```