recover from errors on response chunk processing

first attempt at more granular error handling: during response chunk processing, errors will be handled in a way where current response stops being fetched; for http/1, the connection is fully reset, for http/2, the individual stream is cancelled
This commit is contained in:
HoneyryderChuck 2022-12-28 23:56:09 +00:00
parent 45c8dcb36b
commit 9bcae578d7
3 changed files with 57 additions and 8 deletions

View File

@ -133,33 +133,42 @@ module HTTPX
end
def on_data(chunk)
return unless @request
request = @request
return unless request
log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, color: :green) { "-> #{chunk.inspect}" }
response = @request.response
response = request.response
response << chunk
rescue StandardError => e
error_response = ErrorResponse.new(request, e, request.options)
request.response = error_response
dispatch
end
def on_complete
return unless @request
request = @request
return unless request
log(level: 2) { "parsing complete" }
dispatch
end
def dispatch
if @request.expects?
request = @request
if request.expects?
@parser.reset!
return handle(@request)
return handle(request)
end
request = @request
@request = nil
@requests.shift
response = request.response
response.finish!
response.finish! unless response.is_a?(ErrorResponse)
emit(:response, request, response)
if @parser.upgrade?
@ -169,7 +178,11 @@ module HTTPX
@parser.reset!
@max_requests -= 1
manage_connection(response)
if response.is_a?(ErrorResponse)
disable
else
manage_connection(response)
end
send(@pending.shift) unless @pending.empty?
end

View File

@ -296,6 +296,14 @@ module HTTPX
log(level: 1, color: :green) { "#{stream.id}: <- DATA: #{data.bytesize} bytes..." }
log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" }
request.response << data
rescue StandardError => e
# when an error happens upstream parsing the chunk, the HTTP/2 connection
# should be kept intact, but this stream should be cancelled.
stream.cancel rescue nil # rubocop:disable Style/RescueModifier
error_response = ErrorResponse.new(request, e, request.options)
request.response = error_response
request.emit(:response, error_response)
end
def on_stream_refuse(stream, request, error)
@ -314,6 +322,7 @@ module HTTPX
ex = Error.new(stream.id, error)
ex.set_backtrace(caller)
response = ErrorResponse.new(request, ex, request.options)
request.response = response
emit(:response, request, response)
else
response = request.response

View File

@ -41,6 +41,33 @@ module Requests
# end
# end
ResponseErrorEmitter = Module.new do
self::ResponseMethods = Module.new do
def <<(_)
raise "done with it"
end
end
end
def test_errors_mid_response_buffering
uri = URI(build_uri("/get"))
HTTPX.plugin(SessionWithPool).plugin(ResponseErrorEmitter).wrap do |http|
response = http.get(uri)
verify_error_response(response, "done with it")
if uri.scheme == "https"
# in http/2, such an error will result in the stream getting cancelled.
# connection remains active for subsequent requests.
connections = http.pool.connections
assert connections.size == 1
connection = connections.first
assert connection.state == :inactive
else
# in http/1.1, a new connection would need to be established.
assert http.pool.connections.empty?
end
end
end
private
def next_available_port