new :stream_bidi plugin

this plugin is an HTTP/2 only plugin which enables bidirectional streaming

the client can continue writing request streams as response streams arrive midway

Closes https://github.com/HoneyryderChuck/httpx/discussions/71
This commit is contained in:
HoneyryderChuck 2025-01-27 15:02:58 +00:00
parent c48f6c8e8f
commit 84db0072fb
13 changed files with 290 additions and 34 deletions

View File

@ -252,13 +252,13 @@ module HTTPX
chunk = @drains.delete(request) || request.drain_body chunk = @drains.delete(request) || request.drain_body
while chunk while chunk
next_chunk = request.drain_body next_chunk = request.drain_body
log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." } send_chunk(request, stream, chunk, next_chunk)
log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" }
stream.data(chunk, end_stream: !(next_chunk || request.trailers? || request.callbacks_for?(:trailers)))
if next_chunk && (@buffer.full? || request.body.unbounded_body?) if next_chunk && (@buffer.full? || request.body.unbounded_body?)
@drains[request] = next_chunk @drains[request] = next_chunk
throw(:buffer_full) throw(:buffer_full)
end end
chunk = next_chunk chunk = next_chunk
end end
@ -267,6 +267,16 @@ module HTTPX
on_stream_refuse(stream, request, error) on_stream_refuse(stream, request, error)
end end
def send_chunk(request, stream, chunk, next_chunk)
log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" }
stream.data(chunk, end_stream: end_stream?(request, next_chunk))
end
def end_stream?(request, next_chunk)
!(next_chunk || request.trailers? || request.callbacks_for?(:trailers))
end
###### ######
# HTTP/2 Callbacks # HTTP/2 Callbacks
###### ######

View File

@ -2,6 +2,8 @@
module HTTPX module HTTPX
class StreamResponse class StreamResponse
attr_reader :request
def initialize(request, session) def initialize(request, session)
@request = request @request = request
@options = @request.options @options = @request.options
@ -114,7 +116,7 @@ module HTTPX
module Plugins module Plugins
# #
# This plugin adds support for stream response (text/event-stream). # This plugin adds support for streaming a response (useful for i.e. "text/event-stream" payloads).
# #
# https://gitlab.com/os85/httpx/wikis/Stream # https://gitlab.com/os85/httpx/wikis/Stream
# #

View File

@ -0,0 +1,133 @@
# frozen_string_literal: true
module HTTPX
module Plugins
# Extension of the Connection::HTTP2 class, which adds functionality to
# deal with a request that can't be drained and must be interleaved with
# the response streams.
#
# The streams keeps send DATA frames while there's data; when they're ain't,
# the stream is kept open; it must be explicitly closed by the end user.
#
class HTTP2Bidi < Connection::HTTP2
private
def handle_stream(stream, request)
request.on(:body) do
next unless request.headers_sent
handle(request, stream)
end
super
end
# when there ain't more chunks, it makes the buffer as full.
def send_chunk(request, stream, chunk, next_chunk)
super
return if next_chunk
request.transition(:waiting_for_chunk)
throw(:buffer_full)
end
def end_stream?(request, *)
request.closed?
end
end
#
# This plugin adds support for bidirectional HTTP/2 streams.
#
# https://gitlab.com/os85/httpx/wikis/StreamBidi
#
module StreamBidi
class << self
def load_dependencies(klass)
klass.plugin(:stream)
end
def extra_options(options)
options.merge(fallback_protocol: "h2")
end
end
module RequestMethods
attr_accessor :headers_sent
def initialize(*)
super
@headers_sent = false
@closed = false
end
def closed?
@closed
end
def can_buffer?
super && @state != :waiting_for_chunk
end
def transition(nextstate)
headers_sent = @headers_sent
case nextstate
when :waiting_for_chunk
return unless @state == :body
when :body
case @state
when :headers
headers_sent = true
when :waiting_for_chunk
# HACK: to allow super to pass through
@state = :headers
end
end
super.tap do
# delay setting this up until after the first transition to :body
@headers_sent = headers_sent
end
end
def <<(chunk)
if @drainer
@body.clear if @body.respond_to?(:clear)
@drainer = nil
end
@body << chunk
transition(:body)
end
def close
@closed = true
# last chunk to send which ends the stream
self << ""
end
end
module RequestBodyMethods
def initialize(*, **)
super
@headers.delete("content-length")
end
def empty?
false
end
end
module ConnectionMethods
def parser_type(protocol)
return HTTP2Bidi if protocol == "h2"
super
end
end
end
register_plugin :stream_bidi, StreamBidi
end
end

View File

@ -29,6 +29,7 @@ module HTTPX
| (:retries, ?options) -> Plugins::sessionRetries | (:retries, ?options) -> Plugins::sessionRetries
| (:rate_limiter, ?options) -> Session | (:rate_limiter, ?options) -> Session
| (:stream, ?options) -> Plugins::sessionStream | (:stream, ?options) -> Plugins::sessionStream
| (:stream_bidi, ?options) -> Plugins::sessionStreamBidi
| (:aws_sigv4, ?options) -> Plugins::awsSigV4Session | (:aws_sigv4, ?options) -> Plugins::awsSigV4Session
| (:grpc, ?options) -> Plugins::grpcSession | (:grpc, ?options) -> Plugins::grpcSession
| (:response_cache, ?options) -> Plugins::sessionResponseCache | (:response_cache, ?options) -> Plugins::sessionResponseCache

View File

@ -63,6 +63,10 @@ module HTTPX
def join_body: (::HTTP2::Stream stream, Request request) -> void def join_body: (::HTTP2::Stream stream, Request request) -> void
def send_chunk: (Request request, ::HTTP2::Stream stream, String chunk, String? next_chunk) -> void
def end_stream?: (Request request, String? next_chunk) -> void
def on_stream_headers: (::HTTP2::Stream stream, Request request, Array[[String, String]] headers) -> void def on_stream_headers: (::HTTP2::Stream stream, Request request, Array[[String, String]] headers) -> void
def on_stream_trailers: (::HTTP2::Stream stream, Response response, Array[[String, String]] headers) -> void def on_stream_trailers: (::HTTP2::Stream stream, Response response, Array[[String, String]] headers) -> void

View File

@ -26,8 +26,9 @@ module HTTPX
type streamRequest = Request & Plugins::Stream::RequestMethods type streamRequest = Request & Plugins::Stream::RequestMethods
@request: streamRequest attr_reader request: streamRequest
@options: Options @options: Options
@session: Plugins::sessionStream @session: Plugins::sessionStream
@response_enum: Enumerator[String]? @response_enum: Enumerator[String]?
@buffered_chunks: Array[String] @buffered_chunks: Array[String]

View File

@ -0,0 +1,34 @@
module HTTPX
module Plugins
class HTTP2Bidi < Connection::HTTP2
end
module StreamBidi
def self.load_dependencies: (singleton(Session)) -> void
def self.extra_options: (Options) -> (Options)
module InstanceMethods
end
module RequestMethods
attr_accessor headers_sent: bool
@closed: bool
end
module RequestBodyMethods
end
module ConnectionMethods
private
def parser_type: (String protocol) -> (singleton(HTTP1) | singleton(HTTP2) | singleton(HTTP2Bidi))
end
end
type sessionStreamBidi = Session & StreamBidi::InstanceMethods
end
end

View File

@ -31,6 +31,7 @@ class HTTPSTest < Minitest::Test
include Plugins::RateLimiter include Plugins::RateLimiter
include Plugins::Persistent include Plugins::Persistent
include Plugins::Stream include Plugins::Stream
include Plugins::StreamBidi
include Plugins::AWSAuthentication include Plugins::AWSAuthentication
include Plugins::Upgrade include Plugins::Upgrade
include Plugins::GRPC if RUBY_ENGINE == "ruby" include Plugins::GRPC if RUBY_ENGINE == "ruby"

View File

@ -0,0 +1,36 @@
# frozen_string_literal: true
module Requests
module Plugins
module StreamBidi
def test_plugin_stream_bidi_each
start_test_servlet(Bidi, tls: false) do |server|
uri = "#{server.origin}/"
start_msg = "{\"message\":\"started\"}\n"
ping_msg = "{\"message\":\"pong\"}\n"
session = HTTPX.plugin(:stream_bidi)
request = session.build_request(
"POST",
uri,
headers: { "content-type" => "application/x-ndjson" },
body: [start_msg]
)
response = session.request(request, stream: true)
chunks = []
response.each.each_with_index do |chunk, idx| # rubocop:disable Style/RedundantEach
if idx < 4
request << ping_msg
else
request.close
end
chunks << chunk
end
assert chunks.size == 5, "all the lines should have been yielded"
end
end
end
end
end

View File

@ -0,0 +1,27 @@
# frozen_string_literal: true
require_relative "test"
class Bidi < TestHTTP2Server
private
def handle_stream(_conn, stream)
stream.on(:data) do |d|
next if d.empty?
# puts "SERVER: payload chunk: <<#{d}>>"
data = JSON.parse(d)
stream.data(JSON.dump({ processed: data }) << "\n", end_stream: false)
end
stream.on(:half_close) do
stream.data("", end_stream: true)
end
stream.headers({
":status" => "200",
"date" => Time.now.httpdate,
"content-type" => "application/x-ndjson",
}, end_stream: false)
end
end

View File

@ -32,7 +32,7 @@ end
class KeepAlivePongServer < TestHTTP2Server class KeepAlivePongServer < TestHTTP2Server
attr_reader :pings, :pongs attr_reader :pings, :pongs
def initialize def initialize(**)
@sent = false @sent = false
super super
end end

View File

@ -3,7 +3,7 @@
class SettingsTimeoutServer < TestHTTP2Server class SettingsTimeoutServer < TestHTTP2Server
attr_reader :frames attr_reader :frames
def initialize def initialize(**)
super super
@frames = [] @frames = []
end end

View File

@ -23,35 +23,40 @@ end
class TestHTTP2Server class TestHTTP2Server
attr_reader :origin attr_reader :origin
def initialize def initialize(tls: true)
@port = 0 @port = 0
@host = "localhost" @host = "localhost"
@server = TCPServer.new(0)
@origin = "https://localhost:#{@server.addr[1]}"
ctx = OpenSSL::SSL::SSLContext.new
certs_dir = File.expand_path(File.join("..", "..", "ci", "certs"), __FILE__)
ctx.ca_file = File.join(certs_dir, "ca-bundle.crt")
ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(certs_dir, "server.crt")))
ctx.key = OpenSSL::PKey.read(File.read(File.join(certs_dir, "server.key")))
ctx.ssl_version = :TLSv1_2
ctx.alpn_protocols = ["h2"]
ctx.alpn_select_cb = lambda do |protocols|
raise "Protocol h2 is required" unless protocols.include?("h2")
"h2"
end
@server = OpenSSL::SSL::SSLServer.new(@server, ctx)
@server.singleton_class.attr_reader :ctx
@ios = [] @ios = []
@conns = {} @conns = {}
@server = TCPServer.new(0)
@is_tls = tls
if tls
@origin = "https://localhost:#{@server.addr[1]}"
ctx = OpenSSL::SSL::SSLContext.new
certs_dir = File.expand_path(File.join("..", "..", "ci", "certs"), __FILE__)
ctx.ca_file = File.join(certs_dir, "ca-bundle.crt")
ctx.cert = OpenSSL::X509::Certificate.new(File.read(File.join(certs_dir, "server.crt")))
ctx.key = OpenSSL::PKey.read(File.read(File.join(certs_dir, "server.key")))
ctx.ssl_version = :TLSv1_2
ctx.alpn_protocols = ["h2"]
ctx.alpn_select_cb = lambda do |protocols|
raise "Protocol h2 is required" unless protocols.include?("h2")
"h2"
end
@server = OpenSSL::SSL::SSLServer.new(@server, ctx)
@server.singleton_class.attr_reader :ctx
else
@origin = "http://localhost:#{@server.addr[1]}"
end
end end
def shutdown def shutdown
@ -92,9 +97,11 @@ class TestHTTP2Server
when nil when nil
raise EOFError raise EOFError
else else
sock = OpenSSL::SSL::SSLSocket.new(sock, server.ctx) if @is_tls
sock.sync_close = true sock = OpenSSL::SSL::SSLSocket.new(sock, server.ctx)
sock.accept sock.sync_close = true
sock.accept
end
@ios << sock @ios << sock