mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-10-04 00:00:37 -04:00
adding Timer, making Timers#after return it, to allow single cancellation
the previous iteration relied on internal behaviour do delete the correct callback; in the process, logic to delete all callbacks from an interval was accidentally committed, which motivated this refactoring. the premise is: timeouts can cancel the timer; they set themselves as active until done; operation timeouts rely on the previous to be ignored or not. a new error, OperationTimeoutError, was added for that effect
This commit is contained in:
parent
97cbdf117d
commit
8bee6956eb
@ -101,8 +101,6 @@ module HTTPX
|
||||
@inflight = 0
|
||||
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
|
||||
|
||||
@intervals = []
|
||||
|
||||
self.addresses = @options.addresses if @options.addresses
|
||||
end
|
||||
|
||||
@ -337,15 +335,7 @@ module HTTPX
|
||||
end
|
||||
|
||||
def handle_socket_timeout(interval)
|
||||
@intervals.delete_if(&:elapsed?)
|
||||
|
||||
unless @intervals.empty?
|
||||
# remove the intervals which will elapse
|
||||
|
||||
return
|
||||
end
|
||||
|
||||
error = HTTPX::TimeoutError.new(interval, "timed out while waiting on select")
|
||||
error = OperationTimeoutError.new(interval, "timed out while waiting on select")
|
||||
error.set_backtrace(caller)
|
||||
on_error(error)
|
||||
end
|
||||
@ -628,13 +618,17 @@ module HTTPX
|
||||
other_connection.merge(self)
|
||||
request.transition(:idle)
|
||||
other_connection.send(request)
|
||||
else
|
||||
next
|
||||
when OperationTimeoutError
|
||||
# request level timeouts should take precedence
|
||||
next unless request.active_timeouts.empty?
|
||||
end
|
||||
|
||||
response = ErrorResponse.new(request, ex)
|
||||
request.response = response
|
||||
request.emit(:response, response)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def transition(nextstate)
|
||||
handle_transition(nextstate)
|
||||
@ -814,7 +808,7 @@ module HTTPX
|
||||
end
|
||||
|
||||
def on_error(error, request = nil)
|
||||
if error.instance_of?(TimeoutError)
|
||||
if error.is_a?(OperationTimeoutError)
|
||||
|
||||
# inactive connections do not contribute to the select loop, therefore
|
||||
# they should not fail due to such errors.
|
||||
@ -859,7 +853,7 @@ module HTTPX
|
||||
|
||||
return if read_timeout.nil? || read_timeout.infinite?
|
||||
|
||||
set_request_timeout(request, read_timeout, :done, :response) do
|
||||
set_request_timeout(:read_timeout, request, read_timeout, :done, :response) do
|
||||
read_timeout_callback(request, read_timeout)
|
||||
end
|
||||
end
|
||||
@ -869,7 +863,7 @@ module HTTPX
|
||||
|
||||
return if write_timeout.nil? || write_timeout.infinite?
|
||||
|
||||
set_request_timeout(request, write_timeout, :headers, %i[done response]) do
|
||||
set_request_timeout(:write_timeout, request, write_timeout, :headers, %i[done response]) do
|
||||
write_timeout_callback(request, write_timeout)
|
||||
end
|
||||
end
|
||||
@ -879,7 +873,7 @@ module HTTPX
|
||||
|
||||
return if request_timeout.nil? || request_timeout.infinite?
|
||||
|
||||
set_request_timeout(request, request_timeout, :headers, :complete) do
|
||||
set_request_timeout(:request_timeout, request, request_timeout, :headers, :complete) do
|
||||
read_timeout_callback(request, request_timeout, RequestTimeoutError)
|
||||
end
|
||||
end
|
||||
@ -904,22 +898,19 @@ module HTTPX
|
||||
on_error(error, request)
|
||||
end
|
||||
|
||||
def set_request_timeout(request, timeout, start_event, finish_events, &callback)
|
||||
def set_request_timeout(label, request, timeout, start_event, finish_events, &callback)
|
||||
request.once(start_event) do
|
||||
interval = @current_selector.after(timeout, callback)
|
||||
timer = @current_selector.after(timeout, callback)
|
||||
request.active_timeouts << label
|
||||
|
||||
Array(finish_events).each do |event|
|
||||
# clean up request timeouts if the connection errors out
|
||||
request.once(event) do
|
||||
if @intervals.include?(interval)
|
||||
interval.delete(callback)
|
||||
@intervals.delete(interval) if interval.no_callbacks?
|
||||
timer.cancel
|
||||
request.active_timeouts.delete(label)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@intervals << interval
|
||||
end
|
||||
end
|
||||
|
||||
class << self
|
||||
|
@ -125,7 +125,7 @@ module HTTPX
|
||||
end
|
||||
|
||||
def handle_error(ex, request = nil)
|
||||
if ex.instance_of?(TimeoutError) && !@handshake_completed && @connection.state != :closed
|
||||
if ex.is_a?(OperationTimeoutError) && !@handshake_completed && @connection.state != :closed
|
||||
@connection.goaway(:settings_timeout, "closing due to settings timeout")
|
||||
emit(:close_handshake)
|
||||
settings_ex = SettingsTimeoutError.new(ex.timeout, ex.message)
|
||||
|
@ -77,6 +77,9 @@ module HTTPX
|
||||
# Error raised when there was a timeout while resolving a domain to an IP.
|
||||
class ResolveTimeoutError < TimeoutError; end
|
||||
|
||||
# Error raise when there was a timeout waiting for readiness of the socket the request is related to.
|
||||
class OperationTimeoutError < TimeoutError; end
|
||||
|
||||
# Error raised when there was an error while resolving a domain to an IP.
|
||||
class ResolveError < Error; end
|
||||
|
||||
|
@ -84,7 +84,7 @@ module HTTPX
|
||||
|
||||
return if expect_timeout.nil? || expect_timeout.infinite?
|
||||
|
||||
set_request_timeout(request, expect_timeout, :expect, %i[body response]) do
|
||||
set_request_timeout(:expect_timeout, request, expect_timeout, :expect, %i[body response]) do
|
||||
# expect timeout expired
|
||||
if request.state == :expect && !request.expects?
|
||||
Expect.no_expect_store << request.origin
|
||||
|
@ -45,6 +45,8 @@ module HTTPX
|
||||
|
||||
attr_writer :persistent
|
||||
|
||||
attr_reader :active_timeouts
|
||||
|
||||
# will be +true+ when request body has been completely flushed.
|
||||
def_delegator :@body, :empty?
|
||||
|
||||
@ -100,6 +102,7 @@ module HTTPX
|
||||
@response = nil
|
||||
@peer_address = nil
|
||||
@persistent = @options.persistent
|
||||
@active_timeouts = []
|
||||
end
|
||||
|
||||
# the read timeout defined for this requet.
|
||||
@ -245,8 +248,10 @@ module HTTPX
|
||||
@body.rewind
|
||||
@response = nil
|
||||
@drainer = nil
|
||||
@active_timeouts.clear
|
||||
when :headers
|
||||
return unless @state == :idle
|
||||
|
||||
when :body
|
||||
return unless @state == :headers ||
|
||||
@state == :expect
|
||||
|
@ -26,7 +26,7 @@ module HTTPX
|
||||
|
||||
@next_interval_at = nil
|
||||
|
||||
interval
|
||||
Timer.new(interval, callback)
|
||||
end
|
||||
|
||||
def wait_interval
|
||||
@ -48,6 +48,17 @@ module HTTPX
|
||||
@next_interval_at = nil if @intervals.empty?
|
||||
end
|
||||
|
||||
class Timer
|
||||
def initialize(interval, callback)
|
||||
@interval = interval
|
||||
@callback = callback
|
||||
end
|
||||
|
||||
def cancel
|
||||
@interval.delete(@callback)
|
||||
end
|
||||
end
|
||||
|
||||
class Interval
|
||||
include Comparable
|
||||
|
||||
|
@ -43,7 +43,6 @@ module HTTPX
|
||||
@parser: Object & _Parser
|
||||
@connected_at: Float
|
||||
@response_received_at: Float
|
||||
@intervals: Array[Timers::Interval]
|
||||
@exhausted: bool
|
||||
@cloned: bool
|
||||
@coalesced_connection: instance?
|
||||
@ -163,7 +162,7 @@ module HTTPX
|
||||
|
||||
def read_timeout_callback: (Request request, Numeric read_timeout, ?singleton(RequestTimeoutError) error_type) -> void
|
||||
|
||||
def set_request_timeout: (Request request, Numeric timeout, Symbol start_event, Symbol | Array[Symbol] finish_events) { () -> void } -> void
|
||||
def set_request_timeout: (Symbol label, Request request, Numeric timeout, Symbol start_event, Symbol | Array[Symbol] finish_events) { () -> void } -> void
|
||||
|
||||
def self.parser_type: (String protocol) -> (singleton(HTTP1) | singleton(HTTP2))
|
||||
end
|
||||
|
@ -45,6 +45,9 @@ module HTTPX
|
||||
class WriteTimeoutError < RequestTimeoutError
|
||||
end
|
||||
|
||||
class OperationTimeoutError < TimeoutError
|
||||
end
|
||||
|
||||
class ResolveError < Error
|
||||
end
|
||||
|
||||
|
@ -14,6 +14,7 @@ module HTTPX
|
||||
attr_reader options: Options
|
||||
attr_reader response: response?
|
||||
attr_reader drain_error: StandardError?
|
||||
attr_reader active_timeouts: Array[Symbol]
|
||||
|
||||
attr_accessor peer_address: ipaddr?
|
||||
|
||||
|
@ -3,8 +3,8 @@ module HTTPX
|
||||
@intervals: Array[Interval]
|
||||
@next_interval_at: Float
|
||||
|
||||
def after: (Numeric interval_in_secs, ^() -> void) -> Interval
|
||||
| (Numeric interval_in_secs) { () -> void } -> Interval
|
||||
def after: (Numeric interval_in_secs, ^() -> void) -> Timer
|
||||
| (Numeric interval_in_secs) { () -> void } -> Timer
|
||||
|
||||
def wait_interval: () -> Numeric?
|
||||
|
||||
@ -43,5 +43,14 @@ module HTTPX
|
||||
|
||||
def initialize: (Numeric interval) -> void
|
||||
end
|
||||
|
||||
class Timer
|
||||
@interval: Interval
|
||||
@callback: callback
|
||||
|
||||
def initialize: (Interval interval, callback callback) -> void
|
||||
|
||||
def cancel: () -> void
|
||||
end
|
||||
end
|
||||
end
|
Loading…
x
Reference in New Issue
Block a user