Merge branch 'fix-hev2-overrides' into 'master'

fixes for happy eyeballs implementation

Closes #337

See merge request os85/httpx!368
This commit is contained in:
HoneyryderChuck 2025-03-03 18:02:43 +00:00
commit 3e736b1f05
19 changed files with 251 additions and 123 deletions

View File

@ -157,7 +157,6 @@ All Rubies greater or equal to 2.7, and always latest JRuby and Truffleruby.
* Discuss your contribution in an issue
* Fork it
* Make your changes, add some tests
* Ensure all tests pass (`docker-compose -f docker-compose.yml -f docker-compose-ruby-{RUBY_VERSION}.yml run httpx bundle exec rake test`)
* Make your changes, add some tests (follow the instructions from [here](test/README.md))
* Open a Merge Request (that's Pull Request in Github-ish)
* Wait for feedback

View File

@ -43,8 +43,9 @@ Thread.start do
# puts "#{responses[i].status}: #{l}"
# end
puts "by group:"
responses, error_responses = responses.partition { |r| r.is_a?(HTTPX::Response) }
puts "#{responses.size} responses, #{error_responses.size} errors"
puts "by group:"
responses.group_by(&:status).each do |st, res|
res.each do |r|
puts "#{st}: #{r.uri}"
@ -56,7 +57,7 @@ Thread.start do
error_responses.group_by{ |r| r.error.class }.each do |kl, res|
res.each do |r|
puts "#{r.uri}: #{r.error}"
puts r.error.backtrace.join("\n")
puts r.error.backtrace&.join("\n")
end
end
end

View File

@ -101,8 +101,6 @@ module HTTPX
@inflight = 0
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
@intervals = []
self.addresses = @options.addresses if @options.addresses
end
@ -337,15 +335,7 @@ module HTTPX
end
def handle_socket_timeout(interval)
@intervals.delete_if(&:elapsed?)
unless @intervals.empty?
# remove the intervals which will elapse
return
end
error = HTTPX::TimeoutError.new(interval, "timed out while waiting on select")
error = OperationTimeoutError.new(interval, "timed out while waiting on select")
error.set_backtrace(caller)
on_error(error)
end
@ -379,18 +369,20 @@ module HTTPX
force_reset(true)
end
def disconnect
return unless @current_session && @current_selector
emit(:close)
@current_session = nil
@current_selector = nil
end
private
def connect
transition(:open)
end
def disconnect
emit(:close)
@current_session = nil
@current_selector = nil
end
def consume
return unless @io
@ -628,13 +620,17 @@ module HTTPX
other_connection.merge(self)
request.transition(:idle)
other_connection.send(request)
else
next
when OperationTimeoutError
# request level timeouts should take precedence
next unless request.active_timeouts.empty?
end
response = ErrorResponse.new(request, ex)
request.response = response
request.emit(:response, response)
end
end
end
def transition(nextstate)
handle_transition(nextstate)
@ -654,12 +650,14 @@ module HTTPX
error.set_backtrace(e.backtrace)
handle_connect_error(error) if connecting?
@state = :closed
purge_after_closed
disconnect
rescue TLSError, ::HTTP2::Error::ProtocolError, ::HTTP2::Error::HandshakeError => e
# connect errors, exit gracefully
handle_error(e)
handle_connect_error(e) if connecting?
@state = :closed
purge_after_closed
disconnect
end
@ -812,7 +810,7 @@ module HTTPX
end
def on_error(error, request = nil)
if error.instance_of?(TimeoutError)
if error.is_a?(OperationTimeoutError)
# inactive connections do not contribute to the select loop, therefore
# they should not fail due to such errors.
@ -857,7 +855,7 @@ module HTTPX
return if read_timeout.nil? || read_timeout.infinite?
set_request_timeout(request, read_timeout, :done, :response) do
set_request_timeout(:read_timeout, request, read_timeout, :done, :response) do
read_timeout_callback(request, read_timeout)
end
end
@ -867,7 +865,7 @@ module HTTPX
return if write_timeout.nil? || write_timeout.infinite?
set_request_timeout(request, write_timeout, :headers, %i[done response]) do
set_request_timeout(:write_timeout, request, write_timeout, :headers, %i[done response]) do
write_timeout_callback(request, write_timeout)
end
end
@ -877,7 +875,7 @@ module HTTPX
return if request_timeout.nil? || request_timeout.infinite?
set_request_timeout(request, request_timeout, :headers, :complete) do
set_request_timeout(:request_timeout, request, request_timeout, :headers, :complete) do
read_timeout_callback(request, request_timeout, RequestTimeoutError)
end
end
@ -902,22 +900,19 @@ module HTTPX
on_error(error, request)
end
def set_request_timeout(request, timeout, start_event, finish_events, &callback)
def set_request_timeout(label, request, timeout, start_event, finish_events, &callback)
request.once(start_event) do
interval = @current_selector.after(timeout, callback)
timer = @current_selector.after(timeout, callback)
request.active_timeouts << label
Array(finish_events).each do |event|
# clean up request timeouts if the connection errors out
request.once(event) do
if @intervals.include?(interval)
interval.delete(callback)
@intervals.delete(interval) if interval.no_callbacks?
timer.cancel
request.active_timeouts.delete(label)
end
end
end
@intervals << interval
end
end
class << self

View File

@ -125,7 +125,7 @@ module HTTPX
end
def handle_error(ex, request = nil)
if ex.instance_of?(TimeoutError) && !@handshake_completed && @connection.state != :closed
if ex.is_a?(OperationTimeoutError) && !@handshake_completed && @connection.state != :closed
@connection.goaway(:settings_timeout, "closing due to settings timeout")
emit(:close_handshake)
settings_ex = SettingsTimeoutError.new(ex.timeout, ex.message)

View File

@ -77,6 +77,9 @@ module HTTPX
# Error raised when there was a timeout while resolving a domain to an IP.
class ResolveTimeoutError < TimeoutError; end
# Error raise when there was a timeout waiting for readiness of the socket the request is related to.
class OperationTimeoutError < TimeoutError; end
# Error raised when there was an error while resolving a domain to an IP.
class ResolveError < Error; end

View File

@ -84,7 +84,7 @@ module HTTPX
return if expect_timeout.nil? || expect_timeout.infinite?
set_request_timeout(request, expect_timeout, :expect, %i[body response]) do
set_request_timeout(:expect_timeout, request, expect_timeout, :expect, %i[body response]) do
# expect timeout expired
if request.state == :expect && !request.expects?
Expect.no_expect_store << request.origin

View File

@ -45,6 +45,8 @@ module HTTPX
attr_writer :persistent
attr_reader :active_timeouts
# will be +true+ when request body has been completely flushed.
def_delegator :@body, :empty?
@ -100,6 +102,7 @@ module HTTPX
@response = nil
@peer_address = nil
@persistent = @options.persistent
@active_timeouts = []
end
# the read timeout defined for this requet.
@ -245,8 +248,10 @@ module HTTPX
@body.rewind
@response = nil
@drainer = nil
@active_timeouts.clear
when :headers
return unless @state == :idle
when :body
return unless @state == :headers ||
@state == :expect

View File

@ -35,6 +35,7 @@ module HTTPX
@_timeouts = Array(@resolver_options[:timeouts])
@timeouts = Hash.new { |timeouts, host| timeouts[host] = @_timeouts.dup }
@connections = []
@name = nil
@queries = {}
@read_buffer = "".b
@write_buffer = Buffer.new(@resolver_options[:packet_size])
@ -58,22 +59,6 @@ module HTTPX
when :open
consume
end
nil
rescue Errno::EHOSTUNREACH => e
@ns_index += 1
nameserver = @nameserver
if nameserver && @ns_index < nameserver.size
log do
"resolver #{FAMILY_TYPES[@record_type]}: " \
"failed resolving on nameserver #{@nameserver[@ns_index - 1]} (#{e.message})"
end
transition(:idle)
@timeouts.clear
else
handle_error(e)
end
rescue NativeResolveError => e
handle_error(e)
end
def interests
@ -108,9 +93,7 @@ module HTTPX
@timeouts.values_at(*hosts).reject(&:empty?).map(&:first).min
end
def handle_socket_timeout(interval)
do_retry(interval)
end
def handle_socket_timeout(interval); end
private
@ -123,32 +106,60 @@ module HTTPX
end
def consume
loop do
dread if calculate_interests == :r
do_retry
dwrite if calculate_interests == :w
break unless calculate_interests == :w
# do_retry
dwrite
break unless calculate_interests == :r
end
rescue Errno::EHOSTUNREACH => e
@ns_index += 1
nameserver = @nameserver
if nameserver && @ns_index < nameserver.size
log do
"resolver #{FAMILY_TYPES[@record_type]}: " \
"failed resolving on nameserver #{@nameserver[@ns_index - 1]} (#{e.message})"
end
transition(:idle)
@timeouts.clear
retry
else
handle_error(e)
emit(:close, self)
end
rescue NativeResolveError => e
handle_error(e)
close_or_resolve
retry unless closed?
end
def do_retry(loop_time = nil)
return if @queries.empty? || !@start_timeout
def schedule_retry
h = @name
loop_time ||= Utils.elapsed_time(@start_timeout)
return unless h
query = @queries.first
connection = @queries[h]
return unless query
timeouts = @timeouts[h]
timeout = timeouts.shift
h, connection = query
host = connection.peer.host
timeout = (@timeouts[host][0] -= loop_time)
@timer = @current_selector.after(timeout) do
next unless @connections.include?(connection)
return unless timeout <= 0
do_retry(h, connection, timeout)
end
end
elapsed_after = @_timeouts[@_timeouts.size - @timeouts[host].size]
@timeouts[host].shift
def do_retry(h, connection, interval)
timeouts = @timeouts[h]
if !@timeouts[host].empty?
if !timeouts.empty?
log do
"resolver #{FAMILY_TYPES[@record_type]}: timeout after #{elapsed_after}s, retry (with #{@timeouts[host].first}s) #{host}..."
"resolver #{FAMILY_TYPES[@record_type]}: timeout after #{interval}s, retry (with #{timeouts.first}s) #{h}..."
end
# must downgrade to tcp AND retry on same host as last
downgrade_socket
@ -157,22 +168,28 @@ module HTTPX
# try on the next nameserver
@ns_index += 1
log do
"resolver #{FAMILY_TYPES[@record_type]}: failed resolving #{host} on nameserver #{@nameserver[@ns_index - 1]} (timeout error)"
"resolver #{FAMILY_TYPES[@record_type]}: failed resolving #{h} on nameserver #{@nameserver[@ns_index - 1]} (timeout error)"
end
transition(:idle)
@timeouts.clear
resolve(connection, h)
else
@timeouts.delete(host)
@timeouts.delete(h)
reset_hostname(h, reset_candidates: false)
return unless @queries.empty?
unless @queries.empty?
resolve(connection)
return
end
@connections.delete(connection)
host = connection.peer.host
# This loop_time passed to the exception is bogus. Ideally we would pass the total
# resolve timeout, including from the previous retries.
ex = ResolveTimeoutError.new(loop_time, "Timed out while resolving #{connection.peer.host}")
ex = ResolveTimeoutError.new(interval, "Timed out while resolving #{host}")
ex.set_backtrace(ex ? ex.backtrace : caller)
emit_resolve_error(connection, host, ex)
@ -225,7 +242,7 @@ module HTTPX
parse(@read_buffer)
end
return if @state == :closed
return if @state == :closed || !@write_buffer.empty?
end
end
@ -243,11 +260,15 @@ module HTTPX
return unless siz.positive?
schedule_retry if @write_buffer.empty?
return if @state == :closed
end
end
def parse(buffer)
@timer.cancel
code, result = Resolver.decode_dns_answer(buffer)
case code
@ -258,8 +279,10 @@ module HTTPX
hostname, connection = @queries.first
reset_hostname(hostname, reset_candidates: false)
if @queries.value?(connection)
resolve
other_candidate, _ = @queries.find { |_, conn| conn == connection }
if other_candidate
resolve(connection, other_candidate)
else
@connections.delete(connection)
ex = NativeResolveError.new(connection, connection.peer.host, "name or service not known")
@ -321,8 +344,10 @@ module HTTPX
connection = @queries.delete(name)
end
if address.key?("alias") # CNAME
hostname_alias = address["alias"]
alias_addresses, addresses = addresses.partition { |addr| addr.key?("alias") }
if addresses.empty? && !alias_addresses.empty? # CNAME
hostname_alias = alias_addresses.first["alias"]
# clean up intermediate queries
@timeouts.delete(name) unless connection.peer.host == name
@ -350,7 +375,11 @@ module HTTPX
close_or_resolve
end
def resolve(connection = @connections.first, hostname = nil)
def resolve(connection = nil, hostname = nil)
@connections.shift until @connections.empty? || @connections.first.state != :closed
connection ||= @connections.find { |c| !@queries.value?(c) }
raise Error, "no URI to resolve" unless connection
return unless @write_buffer.empty?
@ -370,6 +399,9 @@ module HTTPX
else
@queries[hostname] = connection
end
@name = hostname
log { "resolver #{FAMILY_TYPES[@record_type]}: query for #{hostname}" }
begin
@write_buffer << encode_dns_query(hostname)
@ -458,6 +490,7 @@ module HTTPX
# these errors may happen during TCP handshake
# treat them as resolve errors.
handle_error(e)
emit(:close, self)
end
def handle_error(error)
@ -472,13 +505,15 @@ module HTTPX
@connections.delete(connection)
emit_resolve_error(connection, host, error)
end
while (connection = @connections.shift)
emit_resolve_error(connection, host, error)
end
end
close_or_resolve
end
def reset_hostname(hostname, connection: @queries.delete(hostname), reset_candidates: true)
@timeouts.delete(hostname)
@timeouts.delete(hostname)
return unless connection && reset_candidates
@ -490,7 +525,10 @@ module HTTPX
end
def close_or_resolve
if @connections.empty?
# drop already closed connections
@connections.shift until @connections.empty? || @connections.first.state != :closed
if (@connections - @queries.values).empty?
emit(:close, self)
else
resolve

View File

@ -74,14 +74,15 @@ module HTTPX
log do
"resolver #{FAMILY_TYPES[RECORD_TYPES[family]]}: " \
"answer #{FAMILY_TYPES[RECORD_TYPES[family]]} #{connection.peer.host}: #{addresses.inspect}"
"answer #{connection.peer.host}: #{addresses.inspect} (early resolve: #{early_resolve})"
end
if @current_selector && # if triggered by early resolve, session may not be here yet
!connection.io &&
connection.options.ip_families.size > 1 &&
family == Socket::AF_INET &&
addresses.first.to_s != connection.peer.host.to_s
if !early_resolve && # do not apply resolution delay for non-dns name resolution
@current_selector && # just in case...
family == Socket::AF_INET && # resolution delay only applies to IPv4
!connection.io && # connection already has addresses and initiated/ended handshake
connection.options.ip_families.size > 1 && # no need to delay if not supporting dual stack IP
addresses.first.to_s != connection.peer.host.to_s # connection URL host is already the IP (early resolve included perhaps?)
log { "resolver #{FAMILY_TYPES[RECORD_TYPES[family]]}: applying resolution delay..." }
@current_selector.after(0.05) do

View File

@ -19,6 +19,7 @@ module HTTPX
def initialize
@timers = Timers.new
@selectables = []
@is_timer_interval = false
end
def each(&blk)
@ -43,7 +44,11 @@ module HTTPX
rescue StandardError => e
emit_error(e)
rescue Exception # rubocop:disable Lint/RescueException
each_connection(&:force_reset)
each_connection do |conn|
conn.force_reset
conn.disconnect
end
raise
end
@ -125,7 +130,6 @@ module HTTPX
# first, we group IOs based on interest type. On call to #interests however,
# things might already happen, and new IOs might be registered, so we might
# have to start all over again. We do this until we group all selectables
begin
@selectables.delete_if do |io|
interests = io.interests
@ -143,7 +147,6 @@ module HTTPX
[*r, *w].each { |io| io.handle_socket_timeout(interval) }
return
end
end
if writers
readers.each do |io|
@ -174,7 +177,7 @@ module HTTPX
end
unless result || interval.nil?
io.handle_socket_timeout(interval)
io.handle_socket_timeout(interval) unless @is_timer_interval
return
end
# raise TimeoutError.new(interval, "timed out while waiting on select")
@ -186,10 +189,21 @@ module HTTPX
end
def next_timeout
[
@timers.wait_interval,
@selectables.filter_map(&:timeout).min,
].compact.min
@is_timer_interval = false
timer_interval = @timers.wait_interval
connection_interval = @selectables.filter_map(&:timeout).min
return connection_interval unless timer_interval
if connection_interval.nil? || timer_interval <= connection_interval
@is_timer_interval = true
return timer_interval
end
connection_interval
end
def emit_error(e)

View File

@ -26,7 +26,7 @@ module HTTPX
@next_interval_at = nil
interval
Timer.new(interval, callback)
end
def wait_interval
@ -48,6 +48,17 @@ module HTTPX
@next_interval_at = nil if @intervals.empty?
end
class Timer
def initialize(interval, callback)
@interval = interval
@callback = callback
end
def cancel
@interval.delete(@callback)
end
end
class Interval
include Comparable
@ -63,6 +74,10 @@ module HTTPX
@on_empty = blk
end
def cancel
@on_empty.call
end
def <=>(other)
@interval <=> other.interval
end

View File

@ -43,7 +43,6 @@ module HTTPX
@parser: Object & _Parser
@connected_at: Float
@response_received_at: Float
@intervals: Array[Timers::Interval]
@exhausted: bool
@cloned: bool
@coalesced_connection: instance?
@ -111,6 +110,8 @@ module HTTPX
def handle_connect_error: (StandardError error) -> void
def disconnect: () -> void
private
def initialize: (http_uri uri, Options options) -> void
@ -119,8 +120,6 @@ module HTTPX
def connect: () -> void
def disconnect: () -> void
def exhausted?: () -> boolish
def consume: () -> void
@ -163,7 +162,7 @@ module HTTPX
def read_timeout_callback: (Request request, Numeric read_timeout, ?singleton(RequestTimeoutError) error_type) -> void
def set_request_timeout: (Request request, Numeric timeout, Symbol start_event, Symbol | Array[Symbol] finish_events) { () -> void } -> void
def set_request_timeout: (Symbol label, Request request, Numeric timeout, Symbol start_event, Symbol | Array[Symbol] finish_events) { () -> void } -> void
def self.parser_type: (String protocol) -> (singleton(HTTP1) | singleton(HTTP2))
end

View File

@ -45,6 +45,9 @@ module HTTPX
class WriteTimeoutError < RequestTimeoutError
end
class OperationTimeoutError < TimeoutError
end
class ResolveError < Error
end

View File

@ -14,6 +14,7 @@ module HTTPX
attr_reader options: Options
attr_reader response: response?
attr_reader drain_error: StandardError?
attr_reader active_timeouts: Array[Symbol]
attr_accessor peer_address: ipaddr?

View File

@ -21,6 +21,7 @@ module HTTPX
@write_buffer: Buffer
@large_packet: Buffer?
@io: UDP | TCP
@name: String?
attr_reader state: Symbol
@ -42,7 +43,9 @@ module HTTPX
def consume: () -> void
def do_retry: (?Numeric? loop_time) -> void
def schedule_retry: () -> void
def do_retry: (String host, Connection connection, Numeric interval) -> void
def dread: (Integer) -> void
| () -> void

View File

@ -10,6 +10,7 @@ module HTTPX
@timers: Timers
@selectables: Array[selectable]
@is_timer_interval: bool
def next_tick: () -> void

View File

@ -1,10 +1,12 @@
module HTTPX
class Timers
type callback = ^() -> void
@intervals: Array[Interval]
@next_interval_at: Float
def after: (Numeric interval_in_secs, ^() -> void) -> Interval
| (Numeric interval_in_secs) { () -> void } -> Interval
def after: (Numeric interval_in_secs, ^() -> void) -> Timer
| (Numeric interval_in_secs) { () -> void } -> Timer
def wait_interval: () -> Numeric?
@ -15,8 +17,6 @@ module HTTPX
class Interval
include Comparable
type callback = ^() -> void
attr_reader interval: Numeric
@callbacks: Array[callback]
@ -25,6 +25,8 @@ module HTTPX
def on_empty: () { () -> void } -> void
def cancel: () -> void
def to_f: () -> Float
def <<: (callback) -> void
@ -41,5 +43,14 @@ module HTTPX
def initialize: (Numeric interval) -> void
end
class Timer
@interval: Interval
@callback: callback
def initialize: (Interval interval, callback callback) -> void
def cancel: () -> void
end
end
end

39
test/README.md Normal file
View File

@ -0,0 +1,39 @@
These are some guidelines and tips on how to write and run tests.
## Minitest
`httpx` test suite uses plain [minitest](https://github.com/minitest/minitest). It constrains its usage down to `assert`, except in the cases where `assert` can't be used (asserting exceptions, for example).
## Structure
It's preferred to write a functional test than a unit test. Some more public-facing components are unit-tested (request, response, bodies...), but this is the exception rather than the rule.
Most functional tests target available functionality from httpbin. If what you're developing **can** be tested using [httpbin](https://httpbin.org/), you **should** use [httpbin](https://httpbin.org/).
Most functional tests are declared in [test/http_test.rb](../test/http_test.rb) and [test/https_test.rb](../test/https_test.rb), via contextual modules. These are roughly scoped by functionality / feature set / plugins. Add tests to existing modules if they fit contextually. Add tests directly to the test files when they're not supposed to be shared otherwise. If it does not fit in any of these, I'll lett you know during review.
Test run in parallel (multi-threaded mode). Your test code should thread-safe as well.
Most tests can be found under [test](../test/).
Some tests are under [integration_tests](../integration_tests/), mostly because they're testing built-in integrations which are loaded by default (and can't be tested in isolation), but also because these integration tests aren't thread safe.
Some tests are under [regression_tests](../regression_tests/). While regressions should have a corresponding test under `test`, some of them can only be tested using some public endpoint, sometimes in an intrusive way that may affect the main test suite. If your test should be added here, I'll let you know during the review.
There are also [standalone_tests](../standalone_tests/). Each runs its own process. These are supposed to test features which are loaded at boot time, and may integrate with different libs offering the same set of features (i.e. multiple json libs, etc).
## Testing locally
Most (not all) tests can be executed locally. Tests using [httpbin](https://httpbin.org/) will target the [nghttp2.org instance](https://nghttp2.org/httpbin/). There is a caveat though: the public instances and the instance used in CI may be different.
## Testing with docker compose (CI mode)
If you want to reproduce the whole test suite, or have a test that runs locally and fails in the CI; the (Gilab) CI suite is backed by a docker-compose based script. If you have `docker` and `docker-compose` installed, you can set the environment:
* open a console via `docker-compose.yml -f docker-compose.yml -f docker-compose-ruby-{RUBY_VERSION}.yml run --entrypoint bash httpx`
* copy the relevant instructions from the [the build script](support/ci/build.sh) script
* install packages
* set required env vars
* install dependencies via `bundler`
* set the local CA bundle
* run `bundle exec rake test`

View File

@ -210,7 +210,7 @@ module Requests
end
self.attempts = 0
def consume
def dwrite
self.class.attempts += 1
raise Errno::EHOSTUNREACH, "host unreachable"
end