mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
renamed channel error emission; emit timeout errors in connection pool, not in client
This commit is contained in:
parent
56156e91fa
commit
4db4b073d3
@ -204,6 +204,10 @@ module HTTPX
|
||||
transition(:open)
|
||||
end
|
||||
end
|
||||
parser.on(:error) do |request, ex|
|
||||
response = ErrorResponse.new(ex, 0, @options)
|
||||
emit(:response, request, response)
|
||||
end
|
||||
parser
|
||||
end
|
||||
|
||||
@ -230,12 +234,13 @@ module HTTPX
|
||||
Errno::EADDRNOTAVAIL,
|
||||
OpenSSL::SSL::SSLError => e
|
||||
# connect errors, exit gracefully
|
||||
emit_error(e)
|
||||
handle_error(e)
|
||||
@state = :closed
|
||||
emit(:close)
|
||||
end
|
||||
|
||||
def emit_error(e)
|
||||
def handle_error(e)
|
||||
parser.handle_error(e)
|
||||
response = ErrorResponse.new(e, 0, @options)
|
||||
@pending.each do |request, _|
|
||||
emit(:response, request, response)
|
||||
|
@ -148,8 +148,6 @@ module HTTPX
|
||||
emit(:reset)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def disable_concurrency
|
||||
return if @requests.empty?
|
||||
@requests.each { |r| r.transition(:idle) }
|
||||
@ -159,6 +157,14 @@ module HTTPX
|
||||
@max_concurrent_requests = 1
|
||||
end
|
||||
|
||||
def handle_error(ex)
|
||||
@requests.each do |request|
|
||||
emit(:error, request, ex)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def set_request_headers(request)
|
||||
request.headers["host"] ||= request.authority
|
||||
request.headers["connection"] ||= "keep-alive"
|
||||
|
@ -60,6 +60,12 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
def handle_error(ex)
|
||||
@streams.each_key do |request|
|
||||
emit(:error, request, ex)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def headline_uri(request)
|
||||
@ -151,8 +157,12 @@ module HTTPX
|
||||
|
||||
def on_stream_close(stream, request, error)
|
||||
return handle(request, stream) if request.expects?
|
||||
response = request.response || ErrorResponse.new(Error.new(error), @retries, @options)
|
||||
emit(:response, request, response)
|
||||
if error
|
||||
emit(:error, request, error)
|
||||
else
|
||||
response = request.response
|
||||
emit(:response, request, response)
|
||||
end
|
||||
log(level: 2, label: "#{stream.id}: ") { "closing stream" }
|
||||
|
||||
@streams.delete(request)
|
||||
|
@ -111,11 +111,7 @@ module HTTPX
|
||||
responses << response
|
||||
requests.shift
|
||||
|
||||
break if requests.empty?
|
||||
rescue TimeoutError => e
|
||||
responses << ErrorResponse.new(e, 0, @options) while requests.shift
|
||||
@connection.reset
|
||||
break
|
||||
break if requests.empty? || !@connection.running?
|
||||
end
|
||||
end
|
||||
responses
|
||||
|
@ -16,12 +16,19 @@ module HTTPX
|
||||
!@channels.empty?
|
||||
end
|
||||
|
||||
def next_tick(timeout: @timeout.timeout)
|
||||
@selector.select(timeout) do |monitor|
|
||||
if (channel = monitor.value)
|
||||
channel.call
|
||||
def next_tick
|
||||
begin
|
||||
timeout = @timeout.timeout
|
||||
@selector.select(timeout) do |monitor|
|
||||
if (channel = monitor.value)
|
||||
channel.call
|
||||
end
|
||||
monitor.interests = channel.interests
|
||||
end
|
||||
rescue TimeoutError => ex
|
||||
@channels.each do |ch|
|
||||
ch.emit(:error, ex)
|
||||
end
|
||||
monitor.interests = channel.interests
|
||||
end
|
||||
end
|
||||
|
||||
@ -30,10 +37,6 @@ module HTTPX
|
||||
next_tick until @channels.empty?
|
||||
end
|
||||
|
||||
def reset
|
||||
@channels.each(&:reset)
|
||||
end
|
||||
|
||||
def build_channel(uri, **options)
|
||||
channel = Channel.by(uri, @options.merge(options))
|
||||
register_channel(channel)
|
||||
|
Loading…
x
Reference in New Issue
Block a user