stream_bidi: allows payload to be buffered to requests from other threads

this is achieved by inserting some synchronization primitives when buffering the content, and waking up the main select loop, via an IO pipe
This commit is contained in:
HoneyryderChuck 2025-04-16 00:38:17 +01:00
parent 012255e49c
commit d7e15c4441
3 changed files with 18 additions and 5 deletions

View File

@ -94,6 +94,7 @@ module HTTPX
@oob_buffer = "".b
end
# buffers the +chunk+ to be sent
def <<(chunk)
return super if Thread.current == @parent_thread
@ -111,29 +112,39 @@ module HTTPX
end
end
# Functions as a way to wake up the session main loop when one of the connections has
# buffered data to write. It abides by the Selectable API, which allows it to be
# registered in the selector alongside actual HTTP-based Connection objects.
# Proxy to wake up the session main loop when one
# of the connections has buffered data to write. It abides by the HTTPX::_Selectable API,
# which allows it to be registered in the selector alongside actual HTTP-based
# HTTPX::Connection objects.
class Signal
def initialize
@closed = false
@pipe_read, @pipe_write = ::IO.pipe
end
def state; end
def state
@closed ? :closed : :open
end
def to_io
@pipe_read.to_io
end
def wakeup
return if @closed
@pipe_write.write("\0")
end
def call
return if @closed
@pipe_read.readpartial(1)
end
def interests
return if @closed
:r
end
@ -142,6 +153,7 @@ module HTTPX
def terminate
@pipe_write.close
@pipe_read.close
@closed = true
end
end

View File

@ -24,6 +24,7 @@ module HTTPX
end
class Signal
@closed: bool
@pipe_read: ::IO
@pipe_write: ::IO

View File

@ -1,6 +1,6 @@
module HTTPX
interface _Selectable
def state: () -> Symbol?
def state: () -> Symbol
def to_io: () -> ::IO