diff --git a/lib/httpx/connection/http1.rb b/lib/httpx/connection/http1.rb index 8b1e0bef..912079f7 100644 --- a/lib/httpx/connection/http1.rb +++ b/lib/httpx/connection/http1.rb @@ -133,33 +133,42 @@ module HTTPX end def on_data(chunk) - return unless @request + request = @request + + return unless request log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." } log(level: 2, color: :green) { "-> #{chunk.inspect}" } - response = @request.response + response = request.response response << chunk + rescue StandardError => e + error_response = ErrorResponse.new(request, e, request.options) + request.response = error_response + dispatch end def on_complete - return unless @request + request = @request + + return unless request log(level: 2) { "parsing complete" } dispatch end def dispatch - if @request.expects? + request = @request + + if request.expects? @parser.reset! - return handle(@request) + return handle(request) end - request = @request @request = nil @requests.shift response = request.response - response.finish! + response.finish! unless response.is_a?(ErrorResponse) emit(:response, request, response) if @parser.upgrade? @@ -169,7 +178,11 @@ module HTTPX @parser.reset! @max_requests -= 1 - manage_connection(response) + if response.is_a?(ErrorResponse) + disable + else + manage_connection(response) + end send(@pending.shift) unless @pending.empty? end diff --git a/lib/httpx/connection/http2.rb b/lib/httpx/connection/http2.rb index 06835cac..11dc2dcd 100644 --- a/lib/httpx/connection/http2.rb +++ b/lib/httpx/connection/http2.rb @@ -296,6 +296,14 @@ module HTTPX log(level: 1, color: :green) { "#{stream.id}: <- DATA: #{data.bytesize} bytes..." } log(level: 2, color: :green) { "#{stream.id}: <- #{data.inspect}" } request.response << data + rescue StandardError => e + # when an error happens upstream parsing the chunk, the HTTP/2 connection + # should be kept intact, but this stream should be cancelled. + stream.cancel rescue nil # rubocop:disable Style/RescueModifier + + error_response = ErrorResponse.new(request, e, request.options) + request.response = error_response + request.emit(:response, error_response) end def on_stream_refuse(stream, request, error) @@ -314,6 +322,7 @@ module HTTPX ex = Error.new(stream.id, error) ex.set_backtrace(caller) response = ErrorResponse.new(request, ex, request.options) + request.response = response emit(:response, request, response) else response = request.response diff --git a/test/support/requests/errors.rb b/test/support/requests/errors.rb index 5661d145..ca93990a 100644 --- a/test/support/requests/errors.rb +++ b/test/support/requests/errors.rb @@ -41,6 +41,33 @@ module Requests # end # end + ResponseErrorEmitter = Module.new do + self::ResponseMethods = Module.new do + def <<(_) + raise "done with it" + end + end + end + + def test_errors_mid_response_buffering + uri = URI(build_uri("/get")) + HTTPX.plugin(SessionWithPool).plugin(ResponseErrorEmitter).wrap do |http| + response = http.get(uri) + verify_error_response(response, "done with it") + if uri.scheme == "https" + # in http/2, such an error will result in the stream getting cancelled. + # connection remains active for subsequent requests. + connections = http.pool.connections + assert connections.size == 1 + connection = connections.first + assert connection.state == :inactive + else + # in http/1.1, a new connection would need to be established. + assert http.pool.connections.empty? + end + end + end + private def next_available_port