Merge pull request #240 from talex5/accept-and-fork

Replace accept_sub with accept_fork
This commit is contained in:
Thomas Leonard 2022-06-28 09:42:38 +01:00 committed by GitHub
commit bdfb3faa64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 112 additions and 264 deletions

View File

@ -463,7 +463,7 @@ Here is a server that listens on `socket` and handles a single connection by rea
```ocaml
let run_server socket =
Switch.run @@ fun sw ->
Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr ->
Eio.Net.accept_fork socket ~sw (fun flow _addr ->
traceln "Server accepted connection from client";
let b = Buffer.create 100 in
Eio.Flow.copy flow (Eio.Flow.buffer_sink b);
@ -474,9 +474,9 @@ let run_server socket =
Notes:
- `accept_sub` handles the connection in a new fiber, with its own subswitch.
- Normally, a server would call `accept_sub` in a loop to handle multiple connections.
- When the child switch created by `accept_sub` finishes, `flow` is closed automatically.
- `accept_fork` handles the connection in a new fiber.
- Normally, a server would call `accept_fork` in a loop to handle multiple connections.
- When the handler passed to `accept_fork` finishes, `flow` is closed automatically.
This can also be tested on its own using a mock network:

View File

@ -240,23 +240,6 @@ module Fiber : sig
If it raises in turn, the parent switch is failed.
It is not called if the parent [sw] itself is cancelled. *)
val fork_on_accept :
on_handler_error:(exn -> unit) ->
sw:Switch.t ->
(Switch.t -> 'a) ->
(Switch.t -> 'a -> unit) ->
unit
(** [fork_on_accept ~sw accept handle ~on_handler_error] creates a new sub-switch [t].
It runs [accept t] in the current fiber and, on success, runs [handle t result] in a new fiber.
It is useful for e.g. accepting network connections,
where we need to provide a switch for the new client socket before we have forked,
but then move it to a child fiber later.
If [accept] raises an exception then the effect is the same as [Switch.run accept].
If [handle] raises an exception, it is passed to [on_handler_error].
If that raises in turn, the parent switch is failed.
[on_handler_error] is not called if the parent [sw] is itself cancelled. *)
val fork_promise : sw:Switch.t -> (unit -> 'a) -> 'a Promise.or_exn
(** [fork_promise ~sw fn] schedules [fn ()] to run in a new fiber and returns a promise for its result.

View File

@ -33,60 +33,6 @@ let fork_promise ~sw f =
);
p
let fork_on_accept ~on_handler_error ~sw:adopting_sw accept handle =
(* Create a new sub-switch of [adopting_sw].
Run [accept] with the new switch as an argument,
but itself still running in the original context.
This situation is unusual because we have a switch but we're not in [Switch.run],
so we have to make sure we finish it safely in all cases. *)
Switch.check_our_domain adopting_sw;
Switch.check adopting_sw;
let cc = Cancel.create ~protected:false in
let deactivate = Cancel.activate cc ~parent:adopting_sw.cancel in
let child_switch = Switch.create cc in
(* We must prevent [adopting_sw] from finishing while it has a child switch. *)
Switch.inc_fibers adopting_sw;
let run_child fn =
match Switch.run_internal child_switch (fun (_ : Switch.t) -> fn ()) with
| () -> deactivate (); Switch.dec_fibers adopting_sw; Switch.check adopting_sw
| exception ex -> deactivate (); Switch.dec_fibers adopting_sw; raise ex
in
(* From this point we have a [child_switch] that may have fibers and other resources attached to it.
We must call [run_child] on it in all cases. *)
match accept child_switch with
| exception ex ->
(* Accept failed. Don't fork. Shut down [child_switch] in the parent context. *)
run_child (fun () -> raise ex)
| _ when not (Cancel.is_on adopting_sw.cancel) ->
(* We can't fork into [adopting_sw] if it's cancelled, so just clean up and report Cancelled.
The main error is owned by [adopting_sw]. *)
begin
try run_child ignore
with
| Cancel.Cancelled _ as ex -> raise ex
| ex -> raise (Cancel.Cancelled ex)
end;
assert false (* [run_child] must have failed if its parent is cancelled *)
| x ->
(* Accept succeeded. Fork a new fiber into [adopting_sw] and
run it with [child_switch] as its context. *)
let new_fiber = Cancel.Fiber_context.make ~cc:child_switch.cancel in
fork_raw new_fiber @@ fun () ->
match run_child (fun () -> Switch.check child_switch; handle child_switch x) with
| () ->
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:None
| exception ex ->
(* No point reporting an error if we're being cancelled. Also, nowhere to run it. *)
if Cancel.is_on adopting_sw.cancel then (
Switch.run_in adopting_sw @@ fun () ->
try on_handler_error ex
with ex2 ->
(* The [run_in] ensures [adopting_sw] isn't finished here *)
Switch.fail adopting_sw ex;
Switch.fail adopting_sw ex2
);
Ctf.note_resolved (Cancel.Fiber_context.tid new_fiber) ~ex:(Some ex)
let all xs =
Switch.run @@ fun sw ->
List.iter (fork ~sw) xs

View File

@ -95,6 +95,7 @@ module Flow : sig
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
set_copy_method : copy_method -> unit;
attach_to_switch : Eio.Switch.t -> unit;
>
val make : ?pp:string Fmt.t -> string -> t
@ -124,7 +125,7 @@ module Net : sig
type listening_socket = <
Eio.Net.listening_socket;
on_accept : (Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) Handler.t;
on_accept : (Flow.t * Eio.Net.Sockaddr.stream) Handler.t;
>
val make : string -> t
@ -144,7 +145,7 @@ module Net : sig
val on_accept :
listening_socket ->
(#Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) Handler.actions ->
(Flow.t * Eio.Net.Sockaddr.stream) Handler.actions ->
unit
(** [on_accept socket actions] configures how to respond when the server calls "accept". *)
end

View File

@ -11,6 +11,7 @@ type t = <
on_read : string Handler.t;
on_copy_bytes : int Handler.t;
set_copy_method : copy_method -> unit;
attach_to_switch : Switch.t -> unit;
>
let pp_default f s =
@ -60,9 +61,11 @@ let make ?(pp=pp_default) label =
done
with End_of_file -> ()
in
object
object (self)
inherit Eio.Flow.two_way
val on_close = Queue.create ()
method on_read = on_read
method on_copy_bytes = on_copy_bytes
@ -96,10 +99,18 @@ let make ?(pp=pp_default) label =
| `Send -> "send"
| `All -> "all"
method attach_to_switch sw =
let hook = Switch.on_release_cancellable sw (fun () -> Eio.Flow.close self) in
Queue.add (fun () -> Eio.Switch.remove_hook hook) on_close
method close =
while not (Queue.is_empty on_close) do
Queue.take on_close ()
done;
traceln "%s: closed" label
end
let on_read (t:t) = Handler.seq t#on_read
let on_copy_bytes (t:t) = Handler.seq t#on_copy_bytes
let set_copy_method (t:t) = t#set_copy_method
let attach_to_switch (t:t) = t#attach_to_switch

View File

@ -51,7 +51,7 @@ let on_datagram_socket (t:t) actions =
type listening_socket = <
Eio.Net.listening_socket;
on_accept : (Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) Handler.t;
on_accept : (Flow.t * Eio.Net.Sockaddr.stream) Handler.t;
>
let listening_socket label =
@ -63,14 +63,14 @@ let listening_socket label =
method accept ~sw =
let socket, addr = Handler.run on_accept in
Switch.on_release sw (fun () -> Eio.Flow.close socket);
Flow.attach_to_switch socket sw;
traceln "%s: accepted connection from %a" label Eio.Net.Sockaddr.pp addr;
socket, addr
(socket :> Eio.Net.stream_socket), addr
method close =
traceln "%s: closed" label
end
let on_accept (l:listening_socket) actions =
let as_accept_pair x = (x :> Eio.Net.stream_socket * Eio.Net.Sockaddr.stream) in
let as_accept_pair x = (x :> Flow.t * Eio.Net.Sockaddr.stream) in
Handler.seq l#on_accept (List.map (Action.map as_accept_pair) actions)

View File

@ -148,10 +148,22 @@ end
let accept ~sw (t : #listening_socket) = t#accept ~sw
let accept_fork ~sw (t : #listening_socket) ~on_error handle =
let child_started = ref false in
let flow, addr = accept ~sw t in
Fun.protect ~finally:(fun () -> if !child_started = false then Flow.close flow)
(fun () ->
Fiber.fork ~sw (fun () ->
match child_started := true; handle flow addr with
| x -> Flow.close flow; x
| exception ex ->
Flow.close flow;
on_error ex
)
)
let accept_sub ~sw (t : #listening_socket) ~on_error handle =
let accept sw = t#accept ~sw in
let handle sw (flow, addr) = handle ~sw flow addr in
Fiber.fork_on_accept ~sw accept handle ~on_handler_error:on_error
accept_fork ~sw t ~on_error (fun flow addr -> Switch.run (fun sw -> handle ~sw flow addr))
class virtual datagram_socket = object
inherit socket

View File

@ -134,7 +134,19 @@ val accept :
(** [accept ~sw socket] waits until a new connection is ready on [socket] and returns it.
The new socket will be closed automatically when [sw] finishes, if not closed earlier.
If you want to handle multiple connections, consider using {!accept_sub} instead. *)
If you want to handle multiple connections, consider using {!accept_fork} instead. *)
val accept_fork :
sw:Switch.t ->
#listening_socket ->
on_error:(exn -> unit) ->
(stream_socket -> Sockaddr.stream -> unit) ->
unit
(** [accept_fork socket fn] accepts a connection and handles it in a new fiber.
After accepting a connection to [socket], it runs [fn flow client_addr] in a new fiber.
[flow] will be closed automatically when [fn] returns, if not already closed by then. *)
val accept_sub :
sw:Switch.t ->
@ -142,12 +154,7 @@ val accept_sub :
on_error:(exn -> unit) ->
(sw:Switch.t -> stream_socket -> Sockaddr.stream -> unit) ->
unit
(** [accept socket fn] accepts a connection and handles it in a new fiber.
After accepting a connection to [socket], it runs [fn ~sw flow client_addr] in a new fiber,
using {!Fiber.fork_on_accept}.
[flow] will be closed automatically when the sub-switch is finished, if not already closed by then. *)
[@@deprecated "Use accept_fork instead"]
(** {2 Datagram Sockets} *)

View File

@ -39,7 +39,7 @@ A simple test server:
let echo_server ~net addr =
Switch.run @@ fun sw ->
let socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr -> Eio.Flow.copy flow flow)
Eio.Net.accept_fork socket ~sw (fun flow _addr -> Eio.Flow.copy flow flow)
~on_error:(traceln "Error handling connection: %a" Fmt.exn);;
```

View File

@ -302,174 +302,6 @@ Same with `first`:
- : unit = ()
```
# fork_on_accept
We can attach resources to the switch in the accept function,
and they get released when the child fiber finishes.
```ocaml
let test_fork_on_accept ?(reraise=false) ?(cancel=false) ?(in_accept=ignore) ?(in_handler=ignore) sw =
let on_handler_error =
if reraise then raise
else (fun ex -> traceln "on_handler_error: %a" Fmt.exn ex; Fiber.check ())
in
Fiber.both (fun () ->
Fiber.fork_on_accept ~sw ~on_handler_error
(fun sw ->
traceln "Got connection";
Switch.on_release sw (fun () -> traceln "Releasing connection"; Fiber.check ());
in_accept sw;
1
)
(fun sw x ->
traceln "Run handler with %d" x;
Fiber.yield ();
in_handler sw;
traceln "Handler done"
);
)
(fun () -> if cancel then failwith "Simulated failure");
traceln "Main fiber resumes";
"Main fiber result"
```
The success case:
```ocaml
# run @@ fun () ->
Switch.run test_fork_on_accept;;
+Got connection
+Run handler with 1
+Main fiber resumes
+Handler done
+Releasing connection
+Main fiber result
- : unit = ()
```
The accept function fails:
```ocaml
# run @@ fun () ->
Switch.run @@ test_fork_on_accept ~in_accept:(fun _ -> failwith "Accept failure");;
+Got connection
+Releasing connection
Exception: Failure "Accept failure".
```
The handler function fails:
```ocaml
# run @@ fun () ->
Switch.run @@ test_fork_on_accept ~in_handler:(fun _ -> failwith "Handler fails");;
+Got connection
+Run handler with 1
+Main fiber resumes
+Releasing connection
+on_handler_error: Failure("Handler fails")
+Main fiber result
- : unit = ()
```
Turning off the child switch in the accept function. We treat this as the handler being cancelled:
```ocaml
# run @@ fun () ->
Switch.run @@ test_fork_on_accept ~in_accept:(fun sw -> Switch.fail sw (Failure "Accept turn-off"));;
+Got connection
+Releasing connection
+on_handler_error: Failure("Accept turn-off")
+Main fiber resumes
+Main fiber result
- : unit = ()
```
Propagating handling errors to the parent:
```ocaml
# run @@ fun () ->
Switch.run @@ test_fork_on_accept ~in_handler:(fun _ -> failwith "Handler fails") ~reraise:true;;
+Got connection
+Run handler with 1
+Main fiber resumes
+Releasing connection
Exception: Failure "Handler fails".
```
Cancelling while in accept:
```ocaml
# run @@ fun () ->
Switch.run @@ test_fork_on_accept ~in_accept:(fun _ -> Fiber.await_cancel ()) ~cancel:true;;
+Got connection
+Releasing connection
Exception: Failure "Simulated failure".
```
Cancelling while in handler. `on_hander_error` is not called for cancellations:
```ocaml
# run @@ fun () ->
Switch.run @@ test_fork_on_accept ~in_handler:(fun _ -> Fiber.await_cancel ()) ~cancel:true;;
+Got connection
+Run handler with 1
+Releasing connection
Exception: Failure "Simulated failure".
```
Parent switch turned off. The main error is reported by the owner of the background thread,
with `fork_on_accept` just getting a `Cancelled` exception. The background switch can't exit
until the connection is released.
```ocaml
# run @@ fun () ->
Switch.run @@ fun sw ->
let bg_switch = ref None in
Fiber.fork_sub ~sw ~on_error:(traceln "Background thread failed: %a" Fmt.exn) (fun sw ->
bg_switch := Some sw; Fiber.await_cancel ()
);
let bg_switch = Option.get !bg_switch in
test_fork_on_accept bg_switch
~in_accept:(fun _ ->
Switch.fail bg_switch (Failure "Background switch turned off");
Fiber.yield ()
);;
+Got connection
+Releasing connection
+Background thread failed: Failure("Background switch turned off")
Exception: Cancelled: Failure("Background switch turned off")
```
The child outlives the forking context. The error handler runs in `bg_switch`, so it still works:
```ocaml
# run @@ fun () ->
Switch.run @@ fun sw ->
let bg_switch = ref None in
Fiber.fork_sub ~sw ~on_error:(traceln "Background thread failed: %a" Fmt.exn) (fun sw ->
bg_switch := Some sw; Fiber.yield ()
);
let bg_switch = Option.get !bg_switch in
let x = Switch.run (fun _ ->
test_fork_on_accept bg_switch
~in_handler:(fun _ ->
Fiber.yield ();
failwith "Simulated error"
)
) in
traceln "Main switch done";
x
;;
+Got connection
+Run handler with 1
+Main fiber resumes
+Main switch done
+Releasing connection
+on_handler_error: Failure("Simulated error")
+Main fiber result
- : unit = ()
```
# Forking while cancelled
```ocaml

View File

@ -2,6 +2,7 @@
```ocaml
# #require "eio_main";;
# #require "eio.mock";;
```
```ocaml
@ -41,7 +42,7 @@ A simple server:
```ocaml
let run_server ~sw socket =
while true do
Eio.Net.accept_sub socket ~sw (fun ~sw flow _addr ->
Eio.Net.accept_fork socket ~sw (fun flow _addr ->
traceln "Server accepted connection from client";
Fun.protect (fun () ->
let msg = read_all flow in
@ -110,7 +111,7 @@ Cancelling the read:
let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
Fiber.both
(fun () ->
Eio.Net.accept_sub server ~sw (fun ~sw flow _addr ->
Eio.Net.accept_fork server ~sw (fun flow _addr ->
try
Fiber.both
(fun () -> raise (Promise.await shutdown))
@ -143,7 +144,7 @@ Calling accept when the switch is already off:
# run @@ fun ~net sw ->
let server = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:5 addr in
Switch.fail sw (Failure "Simulated error");
Eio.Net.accept_sub server ~sw (fun ~sw:_ _flow _addr -> assert false)
Eio.Net.accept_fork server ~sw (fun _flow _addr -> assert false)
~on_error:raise;;
Exception: Failure "Simulated error".
```
@ -272,3 +273,58 @@ Wrapping a Unix FD as an Eio socket:
+Got: "Hello"
- : unit = ()
```
# Accept_fork error handling
On success, we close the connection immediately:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let socket = Eio_mock.Net.listening_socket "tcp/80" in
let flow = Eio_mock.Flow.make "connection" in
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in
Eio_mock.Net.on_accept socket [`Return (flow, addr)];
Switch.run @@ fun sw ->
Eio.Net.accept_fork ~sw ~on_error:raise socket
(fun _flow _addr -> ());
traceln "Mock connection should have been closed by now";;
+tcp/80: accepted connection from tcp:127.0.0.1:1234
+connection: closed
+Mock connection should have been closed by now
- : unit = ()
```
If the forked fiber fails, we close immediately:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let socket = Eio_mock.Net.listening_socket "tcp/80" in
let flow = Eio_mock.Flow.make "connection" in
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in
Eio_mock.Net.on_accept socket [`Return (flow, addr)];
Switch.run @@ fun sw ->
Eio.Net.accept_fork ~sw ~on_error:raise socket
(fun _flow _addr -> failwith "Simulated error");
traceln "Mock connection should have been closed by now";;
+tcp/80: accepted connection from tcp:127.0.0.1:1234
+connection: closed
+Mock connection should have been closed by now
Exception: Failure "Simulated error".
```
If the fork itself fails, we still close the connection:
```ocaml
# Eio_mock.Backend.run @@ fun () ->
let socket = Eio_mock.Net.listening_socket "tcp/80" in
let flow = Eio_mock.Flow.make "connection" in
let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, 1234) in
Eio_mock.Net.on_accept socket [`Return (flow, addr)];
Switch.run @@ fun sw ->
Switch.fail sw (Failure "Simulated error");
Eio.Net.accept_fork ~sw ~on_error:raise socket
(fun _flow _addr -> assert false);
traceln "Mock connection should have been closed by now";;
+tcp/80: accepted connection from tcp:127.0.0.1:1234
+connection: closed
+Mock connection should have been closed by now
Exception: Failure "Simulated error".
```