stream_bidi: allows payload to be buffered to requests from other threads

this is achieved by inserting some synchronization primitives when buffering the content, and waking up the main select loop, via an IO pipe
This commit is contained in:
HoneyryderChuck 2025-04-16 00:38:17 +01:00
parent fe69231e6c
commit dbad275c65
3 changed files with 287 additions and 47 deletions

View File

@ -2,46 +2,149 @@
module HTTPX
module Plugins
# Extension of the Connection::HTTP2 class, which adds functionality to
# deal with a request that can't be drained and must be interleaved with
# the response streams.
#
# The streams keeps send DATA frames while there's data; when they're ain't,
# the stream is kept open; it must be explicitly closed by the end user.
#
class HTTP2Bidi < Connection::HTTP2
private
def handle_stream(stream, request)
request.on(:body) do
next unless request.headers_sent
handle(request, stream)
end
super
end
# when there ain't more chunks, it makes the buffer as full.
def send_chunk(request, stream, chunk, next_chunk)
super
return if next_chunk
request.transition(:waiting_for_chunk)
throw(:buffer_full)
end
def end_stream?(request, *)
request.closed?
end
end
#
# This plugin adds support for bidirectional HTTP/2 streams.
#
# https://gitlab.com/os85/httpx/wikis/StreamBidi
#
# It is required that the request body allows chunk to be buffered, (i.e., responds to +#<<(chunk)+).
module StreamBidi
# Extension of the Connection::HTTP2 class, which adds functionality to
# deal with a request that can't be drained and must be interleaved with
# the response streams.
#
# The streams keeps send DATA frames while there's data; when they're ain't,
# the stream is kept open; it must be explicitly closed by the end user.
#
class HTTP2Bidi < Connection::HTTP2
def initialize(*)
super
@lock = Thread::Mutex.new
end
%i[close empty? exhausted? send <<].each do |lock_meth|
class_eval(<<-METH, __FILE__, __LINE__ + 1)
# lock.aware version of +#{lock_meth}+
def #{lock_meth}(*) # def close(*)
return super if @lock.owned?
# small race condition between
# checking for ownership and
# acquiring lock.
# TODO: fix this at the parser.
@lock.synchronize { super }
end
METH
end
private
%i[join_headers join_trailers join_body].each do |lock_meth|
class_eval(<<-METH, __FILE__, __LINE__ + 1)
# lock.aware version of +#{lock_meth}+
def #{lock_meth}(*) # def join_headers(*)
return super if @lock.owned?
# small race condition between
# checking for ownership and
# acquiring lock.
# TODO: fix this at the parser.
@lock.synchronize { super }
end
METH
end
def handle_stream(stream, request)
request.on(:body) do
next unless request.headers_sent
handle(request, stream)
emit(:flush_buffer)
end
super
end
# when there ain't more chunks, it makes the buffer as full.
def send_chunk(request, stream, chunk, next_chunk)
super
return if next_chunk
request.transition(:waiting_for_chunk)
throw(:buffer_full)
end
# sets end-stream flag when the request is closed.
def end_stream?(request, next_chunk)
request.closed? && next_chunk.nil?
end
end
# BidiBuffer is a Buffer which can be receive data from threads othr
# than the thread of the corresponding Connection/Session.
#
# It synchronizes access to a secondary internal +@oob_buffer+, which periodically
# is reconciled to the main internal +@buffer+.
class BidiBuffer < Buffer
def initialize(*)
super
@parent_thread = Thread.current
@oob_mutex = Thread::Mutex.new
@oob_buffer = "".b
end
def <<(chunk)
return super if Thread.current == @parent_thread
@oob_mutex.synchronize { @oob_buffer << chunk }
end
# reconciles the main and secondary buffer (which receives data from other threads).
def rebuffer
raise Error, "can only rebuffer while waiting on a response" unless Thread.current == @parent_thread
@oob_mutex.synchronize do
@buffer << @oob_buffer
@oob_buffer.clear
end
end
end
# Functions as a way to wake up the session main loop when one of the connections has
# buffered data to write. It abides by the Selectable API, which allows it to be
# registered in the selector alongside actual HTTP-based Connection objects.
class Signal
def initialize
@pipe_read, @pipe_write = ::IO.pipe
end
def state; end
def to_io
@pipe_read.to_io
end
def wakeup
@pipe_write.write("\0")
end
def call
@pipe_read.readpartial(1)
end
def interests
:r
end
def timeout; end
def terminate
@pipe_write.close
@pipe_read.close
end
end
class << self
def load_dependencies(klass)
klass.plugin(:stream)
@ -52,6 +155,32 @@ module HTTPX
end
end
module InstanceMethods
def initialize(*)
@signal = Signal.new
super
end
def close(selector = Selector.new)
@signal.terminate
selector.deregister(@signal)
super(selector)
end
def select_connection(connection, selector)
super
selector.register(@signal)
connection.signal = @signal
end
def deselect_connection(connection, *)
super
connection.signal = nil
end
end
# Adds synchronization to request operations which may buffer payloads from different
# threads.
module RequestMethods
attr_accessor :headers_sent
@ -59,6 +188,7 @@ module HTTPX
super
@headers_sent = false
@closed = false
@mutex = Thread::Mutex.new
end
def closed?
@ -69,6 +199,9 @@ module HTTPX
super && @state != :waiting_for_chunk
end
# overrides state management transitions to introduce an intermediate
# +:waiting_for_chunk+ state, which the request transitions to once payload
# is buffered.
def transition(nextstate)
headers_sent = @headers_sent
@ -92,17 +225,23 @@ module HTTPX
end
def <<(chunk)
if @drainer
@body.clear if @body.respond_to?(:clear)
@drainer = nil
end
@body << chunk
@mutex.synchronize do
if @drainer
@body.clear if @body.respond_to?(:clear)
@drainer = nil
end
@body << chunk
transition(:body)
transition(:body)
end
end
def close
@closed = true
@mutex.synchronize do
return if @closed
@closed = true
end
# last chunk to send which ends the stream
self << ""
@ -120,12 +259,37 @@ module HTTPX
end
end
# overrides the declaration of +@write_buffer+, which is now a thread-safe buffer
# responding to the same API.
module ConnectionMethods
attr_writer :signal
def initialize(*)
super
@write_buffer = BidiBuffer.new(@options.buffer_size)
end
# rebuffers the +@write_buffer+ before calculating interests.
def interests
@write_buffer.rebuffer
super
end
private
def parser_type(protocol)
return HTTP2Bidi if protocol == "h2"
super
end
def set_parser_callbacks(parser)
super
parser.on(:flush_buffer) do
@signal.wakeup if @signal
end
end
end
end
register_plugin :stream_bidi, StreamBidi

View File

@ -1,30 +1,63 @@
module HTTPX
module Plugins
class HTTP2Bidi < Connection::HTTP2
end
module StreamBidi
def self.load_dependencies: (singleton(Session)) -> void
def self.extra_options: (Options) -> (Options)
class HTTP2Bidi < Connection::HTTP2
@lock: Thread::Mutex
private
def handle_stream: (::HTTP2::Stream stream, Request & RequestMethods request) -> void
def end_stream?: (Request & RequestMethods request, String? next_chunk) -> void
end
class BidiBuffer < Buffer
@parent_thread: Thread
@oob_mutex: Thread::Mutex
@oob_buffer: String
def rebuffer: () -> void
end
class Signal
@pipe_read: ::IO
@pipe_write: ::IO
include _Selectable
def wakeup: () -> void
def mergeable?: () -> bool
def terminate: () -> void
end
module InstanceMethods
@signal: Signal
end
module RequestMethods
attr_accessor headers_sent: bool
@closed: bool
@mutex: Thread::Mutex
def closed?: () -> bool
end
module RequestBodyMethods
end
module ConnectionMethods
@write_buffer: BidiBuffer
private
def parser_type: (String protocol) -> (singleton(HTTP1) | singleton(HTTP2) | singleton(HTTP2Bidi))
def parser_type: (String protocol) -> (singleton(Connection::HTTP1) | singleton(Connection::HTTP2) | singleton(HTTP2Bidi))
end
end

View File

@ -2,13 +2,18 @@
module Requests
module Plugins
#
# This plugin adds support for HTTP/2 bidirectional streaming.
#
# https://gitlab.com/os85/httpx/wikis/Stream-Bidi
#
module StreamBidi
def test_plugin_stream_bidi_each
start_test_servlet(Bidi, tls: false) do |server|
uri = "#{server.origin}/"
start_msg = "{\"message\":\"started\"}\n"
ping_msg = "{\"message\":\"pong\"}\n"
pong_msg = "{\"message\":\"pong\"}\n"
session = HTTPX.plugin(:stream_bidi)
request = session.build_request(
@ -22,7 +27,7 @@ module Requests
chunks = []
response.each.each_with_index do |chunk, idx| # rubocop:disable Style/RedundantEach
if idx < 4
request << ping_msg
request << pong_msg
else
request.close
end
@ -31,6 +36,44 @@ module Requests
assert chunks.size == 5, "all the lines should have been yielded"
end
end
def test_plugin_stream_bidi_buffer_data_from_separate_thread
start_test_servlet(Bidi, tls: false) do |server|
uri = "#{server.origin}/"
q = Queue.new
start_msg = "{\"message\":\"started\"}\n"
pong_msg = "{\"message\":\"pong\"}\n"
session = HTTPX.plugin(:stream_bidi)
request = session.build_request(
"POST",
uri,
headers: { "content-type" => "application/x-ndjson" },
body: [start_msg]
)
response = session.request(request, stream: true)
th = Thread.start do
4.times do
msg = q.pop
request << msg
end
request.close
end
chunks = []
response.each.each_with_index do |chunk, _idx| # rubocop:disable Style/RedundantEach
chunks << chunk
q << pong_msg
end
th.join
assert chunks.size == 5, "all the lines should have been yielded"
end
end
end
end
end