mirror of
https://github.com/ocaml-multicore/eio.git
synced 2025-12-15 00:03:23 -05:00
Add Eio_linux.{readv,writev}
This commit is contained in:
parent
23fadfe9c1
commit
15556f67fd
@ -29,6 +29,19 @@ type amount = Exactly of int | Upto of int
|
||||
|
||||
let system_thread = Ctf.mint_id ()
|
||||
|
||||
let rec skip_empty = function
|
||||
| c :: cs when Cstruct.length c = 0 -> skip_empty cs
|
||||
| x -> x
|
||||
|
||||
(* todo: use Cstruct.shiftv when that's released *)
|
||||
let rec shiftv cs = function
|
||||
| 0 -> skip_empty cs
|
||||
| n ->
|
||||
match cs with
|
||||
| [] -> failwith "Can't advance past end of vector!"
|
||||
| c :: cs when n >= Cstruct.length c -> shiftv cs (n - Cstruct.length c)
|
||||
| c :: cs -> Cstruct.shift c n :: cs
|
||||
|
||||
effect Close : Unix.file_descr -> int
|
||||
|
||||
module FD = struct
|
||||
@ -94,6 +107,8 @@ type cancel_hook = Switch.hook ref
|
||||
(* Type of user-data attached to jobs. *)
|
||||
type io_job =
|
||||
| Read : rw_req * cancel_hook -> io_job
|
||||
| Readv : int Suspended.t * cancel_hook -> io_job
|
||||
| Writev : int Suspended.t * cancel_hook -> io_job
|
||||
| Poll_add : int Suspended.t * cancel_hook -> io_job
|
||||
| Splice : int Suspended.t * cancel_hook -> io_job
|
||||
| Openat2 : int Suspended.t * cancel_hook -> io_job
|
||||
@ -207,6 +222,36 @@ let enqueue_read st action (sw,file_offset,fd,buf,len) =
|
||||
Ctf.label "read";
|
||||
submit_rw_req st req
|
||||
|
||||
let rec enqueue_readv st action args =
|
||||
let (sw,file_offset,fd,bufs) = args in
|
||||
let file_offset =
|
||||
match file_offset with
|
||||
| Some x -> x
|
||||
| None -> FD.uring_file_offset fd
|
||||
in
|
||||
Ctf.label "readv";
|
||||
let retry = with_cancel_hook ?sw ~action st (fun cancel ->
|
||||
Uring.readv st.uring ~file_offset (FD.get "readv" fd) bufs (Readv (action, cancel))
|
||||
)
|
||||
in
|
||||
if retry then (* wait until an sqe is available *)
|
||||
Queue.push (fun st -> enqueue_readv st action args) st.io_q
|
||||
|
||||
let rec enqueue_writev st action args =
|
||||
let (sw,file_offset,fd,bufs) = args in
|
||||
let file_offset =
|
||||
match file_offset with
|
||||
| Some x -> x
|
||||
| None -> FD.uring_file_offset fd
|
||||
in
|
||||
Ctf.label "writev";
|
||||
let retry = with_cancel_hook ?sw ~action st (fun cancel ->
|
||||
Uring.writev st.uring ~file_offset (FD.get "writev" fd) bufs (Writev (action, cancel))
|
||||
)
|
||||
in
|
||||
if retry then (* wait until an sqe is available *)
|
||||
Queue.push (fun st -> enqueue_writev st action args) st.io_q
|
||||
|
||||
let rec enqueue_poll_add ?sw st action fd poll_mask =
|
||||
Log.debug (fun l -> l "poll_add: submitting call");
|
||||
Ctf.label "poll_add";
|
||||
@ -344,10 +389,18 @@ and handle_complete st ~runnable result =
|
||||
Log.debug (fun l -> l "read returned");
|
||||
Switch.remove_hook !cancel;
|
||||
complete_rw_req st req result
|
||||
| Readv (k, cancel) ->
|
||||
Log.debug (fun l -> l "readv returned");
|
||||
Switch.remove_hook !cancel;
|
||||
Suspended.continue k result
|
||||
| Write (req, cancel) ->
|
||||
Log.debug (fun l -> l "write returned");
|
||||
Switch.remove_hook !cancel;
|
||||
complete_rw_req st req result
|
||||
| Writev (k, cancel) ->
|
||||
Log.debug (fun l -> l "writev returned");
|
||||
Switch.remove_hook !cancel;
|
||||
Suspended.continue k result
|
||||
| Poll_add (k, cancel) ->
|
||||
Log.debug (fun l -> l "poll_add returned");
|
||||
Switch.remove_hook !cancel;
|
||||
@ -428,6 +481,42 @@ let read_upto ?sw ?file_offset fd buf len =
|
||||
res
|
||||
)
|
||||
|
||||
effect EReadv : (Switch.t option * Optint.Int63.t option * FD.t * Cstruct.t list) -> int
|
||||
|
||||
let readv ?sw ?file_offset fd bufs =
|
||||
let res = perform (EReadv (sw, file_offset, fd, bufs)) in
|
||||
Log.debug (fun l -> l "readv: woken up after read");
|
||||
if res < 0 then (
|
||||
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
|
||||
raise (Unix.Unix_error (Uring.error_of_errno res, "readv", ""))
|
||||
) else if res = 0 then (
|
||||
raise End_of_file
|
||||
) else (
|
||||
res
|
||||
)
|
||||
|
||||
effect EWritev : (Switch.t option * Optint.Int63.t option * FD.t * Cstruct.t list) -> int
|
||||
|
||||
let rec writev ?sw ?file_offset fd bufs =
|
||||
let res = perform (EWritev (sw, file_offset, fd, bufs)) in
|
||||
Log.debug (fun l -> l "writev: woken up after read");
|
||||
if res < 0 then (
|
||||
Option.iter Switch.check sw; (* If cancelled, report that instead. *)
|
||||
raise (Unix.Unix_error (Uring.error_of_errno res, "writev", ""))
|
||||
) else (
|
||||
match shiftv bufs res with
|
||||
| [] -> ()
|
||||
| bufs ->
|
||||
let file_offset =
|
||||
let module I63 = Optint.Int63 in
|
||||
match file_offset with
|
||||
| None -> None
|
||||
| Some ofs when ofs = I63.minus_one -> Some I63.minus_one
|
||||
| Some ofs -> Some (I63.add ofs (I63.of_int res))
|
||||
in
|
||||
writev ?sw ?file_offset fd bufs
|
||||
)
|
||||
|
||||
effect EPoll_add : Switch.t option * FD.t * Uring.Poll_mask.t -> int
|
||||
|
||||
let await_readable ?sw fd =
|
||||
@ -829,6 +918,14 @@ let run ?(queue_depth=64) ?(block_size=4096) main =
|
||||
let k = { Suspended.k; tid } in
|
||||
enqueue_read st k args;
|
||||
schedule st
|
||||
| effect (EReadv args) k ->
|
||||
let k = { Suspended.k; tid } in
|
||||
enqueue_readv st k args;
|
||||
schedule st
|
||||
| effect (EWritev args) k ->
|
||||
let k = { Suspended.k; tid } in
|
||||
enqueue_writev st k args;
|
||||
schedule st
|
||||
| effect (EPoll_add (sw, fd, poll_mask)) k ->
|
||||
let k = { Suspended.k; tid } in
|
||||
enqueue_poll_add ?sw st k fd poll_mask;
|
||||
|
||||
@ -86,11 +86,22 @@ val read_exactly : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.
|
||||
@param file_offset Read from the given position in [fd] (default: 0).
|
||||
@raise End_of_file Raised if the stream ends before [len] bytes have been read. *)
|
||||
|
||||
val readv : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> int
|
||||
(** [readv] is like {!read_upto} but can read into any cstruct(s),
|
||||
not just chunks of the pre-shared buffer.
|
||||
If multiple buffers are given, they are filled in order. *)
|
||||
|
||||
val write : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Uring.Region.chunk -> int -> unit
|
||||
(** [write fd buf len] writes exactly [len] bytes from [buf] to [fd].
|
||||
It blocks until the OS confirms the write is done,
|
||||
and resubmits automatically if the OS doesn't write all of it at once. *)
|
||||
|
||||
val writev : ?sw:Switch.t -> ?file_offset:Optint.Int63.t -> FD.t -> Cstruct.t list -> unit
|
||||
(** [writev] is like {!write} but can write from any cstruct(s),
|
||||
not just chunks of the pre-shared buffer.
|
||||
If multiple buffers are given, they are sent in order.
|
||||
It will make multiple OS calls if the OS doesn't write all of it at once. *)
|
||||
|
||||
val splice : ?sw:Switch.t -> FD.t -> dst:FD.t -> len:int -> int
|
||||
(** [splice src ~dst ~len] attempts to copy up to [len] bytes of data from [src] to [dst].
|
||||
@return The number of bytes copied.
|
||||
|
||||
@ -79,6 +79,42 @@ let test_direct_copy () =
|
||||
Eio.Flow.close from_pipe1;
|
||||
Eio.Flow.close from_pipe2
|
||||
|
||||
let rec skip_empty = function
|
||||
| c :: cs when Cstruct.length c = 0 -> skip_empty cs
|
||||
| x -> x
|
||||
|
||||
(* todo: use Cstruct.shiftv when that's released *)
|
||||
let rec shiftv cs = function
|
||||
| 0 -> skip_empty cs
|
||||
| n ->
|
||||
match cs with
|
||||
| [] -> failwith "Can't advance past end of vector!"
|
||||
| c :: cs when n >= Cstruct.length c -> shiftv cs (n - Cstruct.length c)
|
||||
| c :: cs -> Cstruct.shift c n :: cs
|
||||
|
||||
(* Read and write using IO vectors rather than the fixed buffers. *)
|
||||
let test_iovec () =
|
||||
Eio_linux.run ~queue_depth:4 @@ fun _stdenv ->
|
||||
Switch.top @@ fun sw ->
|
||||
let from_pipe, to_pipe = Eio_linux.pipe sw in
|
||||
let from_pipe = Eio_linux.Objects.get_fd from_pipe in
|
||||
let to_pipe = Eio_linux.Objects.get_fd to_pipe in
|
||||
let message = Cstruct.of_string "Got [ ] and [ ]" in
|
||||
let rec recv = function
|
||||
| [] -> ()
|
||||
| cs ->
|
||||
let got = Eio_linux.readv ~sw from_pipe cs in
|
||||
recv (shiftv cs got)
|
||||
in
|
||||
Fibre.both ~sw
|
||||
(fun () -> recv [Cstruct.sub message 5 3; Cstruct.sub message 15 3])
|
||||
(fun () ->
|
||||
let b = Cstruct.of_string "barfoo" in
|
||||
Eio_linux.writev ~sw to_pipe [Cstruct.sub b 3 3; Cstruct.sub b 0 3];
|
||||
Eio_linux.FD.close to_pipe
|
||||
);
|
||||
Alcotest.(check string) "Transfer correct" "Got [foo] and [bar]" (Cstruct.to_string message)
|
||||
|
||||
let () =
|
||||
let open Alcotest in
|
||||
run "eioio" [
|
||||
@ -87,5 +123,6 @@ let () =
|
||||
test_case "direct_copy" `Quick test_direct_copy;
|
||||
test_case "poll_add" `Quick test_poll_add;
|
||||
test_case "poll_add_busy" `Quick test_poll_add_busy;
|
||||
test_case "iovec" `Quick test_iovec;
|
||||
];
|
||||
]
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user