fixing double-response for streams

there was a long-standing buggy workaround, whereas in stream-mode, when
there was no response yet to query from, a synchronous request would be
fired. This would break when under event streams, so we had to document
this as "make sure that...".

This fixes it by implementing a general session API convention, which
separates the step of sending the requests, from waiting for its
receival. And, given that the request knows when the response is
available, we can actually "tick until response".

This might be used in the future to refactor the way we handle the
responses, which buffer the full payload by default, instead of reading
from the connection at will.
This commit is contained in:
HoneyryderChuck 2021-05-17 13:10:23 +01:00
parent a1d09889ff
commit 441716a5ac
11 changed files with 69 additions and 23 deletions

View File

@ -443,6 +443,7 @@ module HTTPX
emit(:misdirected, request)
else
response = ErrorResponse.new(request, ex, @options)
request.response = response
request.emit(:response, response)
end
end
@ -542,7 +543,9 @@ module HTTPX
def handle_error(error)
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
while (request = @pending.shift)
request.emit(:response, ErrorResponse.new(request, error, @options))
response = ErrorResponse.new(request, error, @options)
request.response = response
request.emit(:response, response)
end
end

View File

@ -200,7 +200,7 @@ module HTTPX
build_request(:post, uri, headers: headers, body: body)
end
def respond_to_missing?(meth, *, **, &blk)
def respond_to_missing?(meth, *, &blk)
@options.grpc_rpcs.key?(meth.to_s) || super
end

View File

@ -17,6 +17,16 @@ module HTTPX
"#GRPC::Call(#{grpc_response})"
end
def to_s
grpc_response.to_s
end
def trailing_metadata
return unless @response.body.closed?
@response.trailing_metadata
end
private
def grpc_response
@ -33,12 +43,12 @@ module HTTPX
end
end
def respond_to_missing?(meth, *args, **kwargs, &blk)
grpc_response.respond_to?(meth, *args, **kwargs, &blk) || super
def respond_to_missing?(meth, *args, &blk)
grpc_response.respond_to?(meth, *args) || super
end
def method_missing(meth, *args, **kwargs, &blk)
return grpc_response.__send__(meth, *args, **kwargs, &blk) if grpc_response.respond_to?(meth, *args, **kwargs, &blk)
def method_missing(meth, *args, &blk)
return grpc_response.__send__(meth, *args, &blk) if grpc_response.respond_to?(meth)
super
end

View File

@ -2,9 +2,10 @@
module HTTPX
class StreamResponse
def initialize(request, session)
def initialize(request, session, connections)
@request = request
@session = session
@connections = connections
@options = @request.options
end
@ -18,6 +19,16 @@ module HTTPX
begin
@on_chunk = block
if @request.response
# if we've already started collecting the payload, yield it first
# before proceeding
body = @request.response.body
body.each do |chunk|
on_chunk(chunk)
end
end
response.raise_for_status
response.close
ensure
@ -61,17 +72,17 @@ module HTTPX
private
def response
@request.response || begin
@response ||= @session.__send__(:send_requests, @request, @options).first
end
@session.__send__(:receive_requests, [@request], @connections, @options) until @request.response
@request.response
end
def respond_to_missing?(*args)
@options.response_class.respond_to?(*args) || super
def respond_to_missing?(meth, *args)
response.respond_to?(meth, *args) || super
end
def method_missing(meth, *args, &block)
return super unless @options.response_class.public_method_defined?(meth)
return super unless response.respond_to?(meth)
response.__send__(meth, *args, &block)
end
@ -91,10 +102,13 @@ module HTTPX
return super(*args, **options) unless stream
requests = args.first.is_a?(Request) ? args : build_requests(*args, options)
raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1
StreamResponse.new(requests.first, self)
request = requests.first
connections = _send_requests(requests, request.options)
StreamResponse.new(request, self, connections)
end
end

View File

@ -94,6 +94,10 @@ module HTTPX
@state = :idle
end
def closed?
@state == :closed
end
def write(chunk)
return if @state == :closed

View File

@ -175,12 +175,18 @@ module HTTPX
end
def send_requests(*requests, options)
connections = []
request_options = @options.merge(options)
connections = _send_requests(requests, request_options)
receive_requests(requests, connections, request_options)
end
def _send_requests(requests, options)
connections = []
requests.each do |request|
error = catch(:resolve_error) do
connection = find_connection(request, connections, request_options)
connection = find_connection(request, connections, options)
connection.send(request)
end
next unless error.is_a?(ResolveError)
@ -188,13 +194,17 @@ module HTTPX
request.emit(:response, ErrorResponse.new(request, error, options))
end
connections
end
def receive_requests(requests, connections, options)
responses = []
begin
# guarantee ordered responses
loop do
request = requests.first
pool.next_tick until (response = fetch_response(request, connections, request_options))
pool.next_tick until (response = fetch_response(request, connections, options))
responses << response
requests.shift
@ -208,7 +218,7 @@ module HTTPX
# opportunity to traverse the requests, hence we're returning only a fraction of the errors
# we were supposed to. This effectively fetches the existing responses and return them.
while (request = requests.shift)
responses << fetch_response(request, connections, request_options)
responses << fetch_response(request, connections, options)
end
break
end

View File

@ -63,7 +63,7 @@ module HTTPX
def on_stream_data: (HTTP2Next::Stream stream, Request request, string data) -> void
def on_stream_close: (HTTP2Next::Stream stream, Request request, Symbol? error) -> void
def on_stream_close: (HTTP2Next::Stream stream, Request request, (Symbol | StandardError)? error) -> void
def on_frame: (string bytes) -> void

View File

@ -13,7 +13,7 @@ module HTTPX
private
def response: () -> response
def initialize: (Request, Session) -> untyped
def initialize: (Request, Session, Array[Connection]) -> untyped
end
module Plugins

View File

@ -11,7 +11,7 @@ module HTTPX
attr_reader body: Body
attr_reader state: Symbol
attr_reader options: Options
attr_reader response: Response?
attr_reader response: response?
attr_reader drain_error: StandardError?
def initialize: (verb | String, uri, ?options?) -> untyped
@ -22,7 +22,7 @@ module HTTPX
def scheme: () -> ("http" | "https")
def response=: (Response response) -> void
def response=: (response) -> void
def path: () -> String

View File

@ -47,6 +47,7 @@ module HTTPX
def empty?: () -> bool
def copy_to: (_ToPath | _Writer destination) -> void
def close: () -> void
def closed?: () -> bool
private

View File

@ -45,5 +45,9 @@ module HTTPX
def build_connection: (URI, Options) -> Connection
def send_requests: (*Request, options) -> Array[response]
def _send_requests: (Array[Request], options) -> Array[Connection]
def receive_requests: (Array[Request], Array[Connection], options) -> Array[response]
end
end