implemented happy eyeballs v2 (rfc8305) for native and https resolver

Two resolver are kept (IPv6/IPv4) along in the pool, to which all
names are sent to and read from in the same pool. IPv4 resolves are
subject to a 50ms delay (as per rfc) before they're used for connecting.
IPv6 addresses have preference, in that if they arrive before the delay,
they are immediately used. If they arrive after the delay, they do not
interrupt the connection, but they'll be the next-in-line in case
connection handshake fails.

Two resolvers are kept, but the inherent Connection will be shared,
thereby sending name resolving requests to the same HTTP/2 connection in
bulk. The resolution delay logic from above also applies.

Currently handles resolving via `resolv` lib. This happens synchronously
though, so we're not there yet.
This commit is contained in:
HoneyryderChuck 2021-12-16 11:51:54 +00:00
parent 82b0a4bf28
commit 2940323412
19 changed files with 134 additions and 46 deletions

View File

@ -44,7 +44,7 @@ module HTTPX
def_delegator :@write_buffer, :empty? def_delegator :@write_buffer, :empty?
attr_reader :origin, :origins, :state, :pending, :options attr_reader :io, :origin, :origins, :state, :pending, :options
attr_writer :timers attr_writer :timers
@ -78,7 +78,11 @@ module HTTPX
# this is a semi-private method, to be used by the resolver # this is a semi-private method, to be used by the resolver
# to initiate the io object. # to initiate the io object.
def addresses=(addrs) def addresses=(addrs)
@io ||= IO.registry(@type).new(@origin, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName if @io
@io.add_addresses(addrs)
else
@io = IO.registry(@type).new(@origin, addrs, @options)
end
end end
def addresses def addresses

View File

@ -15,6 +15,7 @@ module HTTPX
def initialize(origin, addresses, options) def initialize(origin, addresses, options)
@state = :idle @state = :idle
@addresses = []
@hostname = origin.host @hostname = origin.host
@options = Options.new(options) @options = Options.new(options)
@fallback_protocol = @options.fallback_protocol @fallback_protocol = @options.fallback_protocol
@ -30,15 +31,29 @@ module HTTPX
raise Error, "Given IO objects do not match the request authority" unless @io raise Error, "Given IO objects do not match the request authority" unless @io
_, _, _, @ip = @io.addr _, _, _, @ip = @io.addr
@addresses ||= [@ip] @addresses << @ip
@ip_index = @addresses.size - 1
@keep_open = true @keep_open = true
@state = :connected @state = :connected
else else
@addresses = addresses.map { |addr| addr.is_a?(IPAddr) ? addr : IPAddr.new(addr) } add_addresses(addresses)
end end
@ip_index = @addresses.size - 1 @ip_index = @addresses.size - 1
@io ||= build_socket # @io ||= build_socket
end
def add_addresses(addrs)
return if addrs.empty?
addrs = addrs.map { |addr| addr.is_a?(IPAddr) ? addr : IPAddr.new(addr) }
ip_index = @ip_index || (@addresses.size - 1)
if addrs.first.ipv6?
# should be the next in line
@addresses = [*@addresses[0, ip_index], *addrs, *@addresses[ip_index..-1]]
else
@addresses.unshift(*addrs)
@ip_index += addrs.size if @ip_index
end
end end
def to_io def to_io
@ -52,7 +67,7 @@ module HTTPX
def connect def connect
return unless closed? return unless closed?
if @io.closed? if !@io || @io.closed?
transition(:idle) transition(:idle)
@io = build_socket @io = build_socket
end end

View File

@ -15,9 +15,10 @@ module HTTPX
ip_address_families = begin ip_address_families = begin
list = Socket.ip_address_list list = Socket.ip_address_list
if list.any? { |a| a.ipv6? && !a.ipv6_loopback? && !a.ipv6_linklocal? } if list.any? { |a| a.ipv6? && !a.ipv6_loopback? && !a.ipv6_linklocal? }
# [Socket::AF_INET6, Socket::AF_INET] [Socket::AF_INET6, Socket::AF_INET]
else
[Socket::AF_INET]
end end
[Socket::AF_INET]
rescue NotImplementedError rescue NotImplementedError
[Socket::AF_INET] [Socket::AF_INET]
end end

View File

@ -96,8 +96,8 @@ module HTTPX
# rubocop:enable Style/MultilineTernaryOperator # rubocop:enable Style/MultilineTernaryOperator
) )
response.close if response.respond_to?(:close) response.close if response.respond_to?(:close)
request.retries -= 1
log { "failed to get response, #{request.retries} tries to go..." } log { "failed to get response, #{request.retries} tries to go..." }
request.retries -= 1
request.transition(:idle) request.transition(:idle)
retry_after = options.retry_after retry_after = options.retry_after

View File

@ -60,14 +60,15 @@ module HTTPX
outstanding_connections = @connections outstanding_connections = @connections
resolver_connections = @resolvers.each_value.flat_map(&:connections).compact resolver_connections = @resolvers.each_value.flat_map(&:connections).compact
outstanding_connections -= resolver_connections outstanding_connections -= resolver_connections
if outstanding_connections.empty?
@resolvers.each_value do |resolver| return unless outstanding_connections.empty?
resolver.close unless resolver.closed?
end @resolvers.each_value do |resolver|
# for https resolver resolver.close unless resolver.closed?
resolver_connections.each(&:close)
next_tick until resolver_connections.none? { |c| c.state != :idle && @connections.include?(c) }
end end
# for https resolver
resolver_connections.each(&:close)
next_tick until resolver_connections.none? { |c| c.state != :idle && @connections.include?(c) }
end end
def init_connection(connection, _options) def init_connection(connection, _options)
@ -118,7 +119,7 @@ module HTTPX
find_resolver_for(connection) do |resolver| find_resolver_for(connection) do |resolver|
resolver << connection resolver << connection
return if resolver.empty? next if resolver.empty?
select_connection(resolver) select_connection(resolver)
end end
@ -203,7 +204,11 @@ module HTTPX
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol) resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
@resolvers[resolver_type] ||= begin @resolvers[resolver_type] ||= begin
resolver_manager = Resolver::Multi.new(resolver_type, connection_options) resolver_manager = if resolver_type.multi?
Resolver::Multi.new(resolver_type, connection_options)
else
resolver_type.new(connection_options)
end
resolver_manager.on(:resolve, &method(:on_resolver_connection)) resolver_manager.on(:resolve, &method(:on_resolver_connection))
resolver_manager.on(:error, &method(:on_resolver_error)) resolver_manager.on(:error, &method(:on_resolver_error))
resolver_manager.on(:close, &method(:on_resolver_close)) resolver_manager.on(:close, &method(:on_resolver_close))
@ -212,7 +217,7 @@ module HTTPX
manager = @resolvers[resolver_type] manager = @resolvers[resolver_type]
manager.resolvers.each do |resolver| manager.resolvers.each do |resolver|
resolver.pool = self if resolver.respond_to?(:pool=) resolver.pool = self
yield resolver yield resolver
end end
manager manager

View File

@ -33,15 +33,27 @@ module HTTPX
end end
end end
def cached_lookup_set(hostname, entries) def cached_lookup_set(hostname, family, entries)
now = Utils.now now = Utils.now
entries.each do |entry| entries.each do |entry|
entry["TTL"] += now entry["TTL"] += now
end end
@lookup_mutex.synchronize do @lookup_mutex.synchronize do
@lookups[hostname] += entries case family
when Socket::AF_INET6
@lookups[hostname].concat(entries)
when Socket::AF_INET
@lookups[hostname].unshift(*entries)
end
entries.each do |entry| entries.each do |entry|
@lookups[entry["name"]] << entry if entry["name"] != hostname next unless entry["name"] != hostname
case family
when Socket::AF_INET6
@lookups[entry["name"]] << entry
when Socket::AF_INET
@lookups[entry["name"]].unshift(entry)
end
end end
end end
end end

View File

@ -25,8 +25,6 @@ module HTTPX
def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close
attr_writer :pool
def initialize(_, options) def initialize(_, options)
super super
@resolver_options = DEFAULTS.merge(@options.resolver_options) @resolver_options = DEFAULTS.merge(@options.resolver_options)
@ -61,8 +59,6 @@ module HTTPX
true true
end end
private
def resolver_connection def resolver_connection
@resolver_connection ||= @pool.find_connection(@uri, @options) || begin @resolver_connection ||= @pool.find_connection(@uri, @options) || begin
@building_connection = true @building_connection = true
@ -74,6 +70,8 @@ module HTTPX
end end
end end
private
def resolve(connection = @connections.first, hostname = nil) def resolve(connection = @connections.first, hostname = nil)
return if @building_connection return if @building_connection
return unless connection return unless connection
@ -84,7 +82,7 @@ module HTTPX
hostname = connection.origin.host hostname = connection.origin.host
log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname
end end
log { "resolver: query #{FAMILY_TYPES[@family]} for #{hostname}" } log { "resolver: query #{FAMILY_TYPES[RECORD_TYPES[@family]]} for #{hostname}" }
begin begin
request = build_request(hostname) request = build_request(hostname)
request.on(:response, &method(:on_response).curry(2)[request]) request.on(:response, &method(:on_response).curry(2)[request])
@ -155,7 +153,7 @@ module HTTPX
next unless connection # probably a retried query for which there's an answer next unless connection # probably a retried query for which there's an answer
@connections.delete(connection) @connections.delete(connection)
Resolver.cached_lookup_set(hostname, addresses) if @resolver_options[:cache] Resolver.cached_lookup_set(hostname, @family, addresses) if @resolver_options[:cache]
emit_addresses(connection, addresses.map { |addr| addr["data"] }) emit_addresses(connection, addresses.map { |addr| addr["data"] })
end end
end end

View File

@ -236,7 +236,7 @@ module HTTPX
@timeouts.delete(name) @timeouts.delete(name)
@timeouts.delete(connection.origin.host) @timeouts.delete(connection.origin.host)
@connections.delete(connection) @connections.delete(connection)
Resolver.cached_lookup_set(connection.origin.host, addresses) if @resolver_options[:cache] Resolver.cached_lookup_set(connection.origin.host, @family, addresses) if @resolver_options[:cache]
emit_addresses(connection, addresses.map { |addr| addr["data"] }) emit_addresses(connection, addresses.map { |addr| addr["data"] })
end end
end end

View File

@ -22,8 +22,16 @@ module HTTPX
end end
end end
class << self
def multi?
true
end
end
attr_reader :family attr_reader :family
attr_writer :pool
def initialize(family, options) def initialize(family, options)
@family = family @family = family
@record_type = RECORD_TYPES[family] @record_type = RECORD_TYPES[family]
@ -47,8 +55,19 @@ module HTTPX
address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s) address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s)
end end
log { "resolver: answer #{connection.origin.host}: #{addresses.inspect}" } log { "resolver: answer #{connection.origin.host}: #{addresses.inspect}" }
connection.addresses = addresses if !connection.io &&
emit(:resolve, connection) connection.options.ip_families.size > 1 &&
addresses.first.ipv4? &&
addresses.first.to_s != connection.origin.host.to_s
log { "resolver: A response, applying resolution delay..." }
@pool.after(0.05) do
connection.addresses = addresses
emit(:resolve, connection)
end
else
connection.addresses = addresses
emit(:resolve, connection)
end
end end
def early_resolve(connection, hostname: connection.origin.host) def early_resolve(connection, hostname: connection.origin.host)

View File

@ -10,10 +10,16 @@ module HTTPX
Resolv::DNS::EncodeError, Resolv::DNS::EncodeError,
Resolv::DNS::DecodeError].freeze Resolv::DNS::DecodeError].freeze
class << self
def multi?
false
end
end
attr_reader :state attr_reader :state
def initialize(_, options) def initialize(options)
super super(nil, options)
@resolver_options = @options.resolver_options @resolver_options = @options.resolver_options
resolv_options = @resolver_options.dup resolv_options = @resolver_options.dup
timeouts = resolv_options.delete(:timeouts) timeouts = resolv_options.delete(:timeouts)
@ -22,6 +28,12 @@ module HTTPX
@resolver.timeouts = timeouts || Resolver::RESOLVE_TIMEOUT @resolver.timeouts = timeouts || Resolver::RESOLVE_TIMEOUT
end end
def resolvers
return enum_for(__method__) unless block_given?
yield self
end
def connections def connections
EMPTY EMPTY
end end

View File

@ -12,6 +12,8 @@ module HTTPX
:propfind | :proppatch | :mkcol | :copy | :move | :lock | :unlock | :orderpatch | :propfind | :proppatch | :mkcol | :copy | :move | :lock | :unlock | :orderpatch |
:acl | :report | :patch | :search :acl | :report | :patch | :search
type ip_family = Integer #Socket::AF_INET6 | Socket::AF_INET
module Plugins module Plugins
def self?.load_plugin: (Symbol) -> Module def self?.load_plugin: (Symbol) -> Module

View File

@ -107,7 +107,7 @@ module HTTPX
attr_reader resolver_options: Hash[Symbol, untyped] attr_reader resolver_options: Hash[Symbol, untyped]
# ip_families # ip_families
attr_reader ip_families: Array[Integer] attr_reader ip_families: Array[ip_family]
def ==: (untyped other) -> bool def ==: (untyped other) -> bool
def merge: (_ToHash[Symbol, untyped] other) -> instance def merge: (_ToHash[Symbol, untyped] other) -> instance

View File

@ -1,6 +1,8 @@
module HTTPX module HTTPX
class Pool class Pool
@resolvers: Hash[Class, Resolver::Multi] type resolver_manager = Resolver::Multi | Resolver::System
@resolvers: Hash[Class, resolver_manager]
@timers: Timers @timers: Timers
@selector: Selector @selector: Selector
@connections: Array[Connection] @connections: Array[Connection]
@ -42,6 +44,6 @@ module HTTPX
def next_timeout: () -> (Integer | Float | nil) def next_timeout: () -> (Integer | Float | nil)
def find_resolver_for: (Connection) { (Resolver::Resolver) -> void } -> Resolver::Multi def find_resolver_for: (Connection) { (Resolver::Resolver resolver) -> void } -> resolver_manager
end end
end end

View File

@ -15,7 +15,7 @@ module HTTPX
def self?.cached_lookup: (String hostname) -> Array[String]? def self?.cached_lookup: (String hostname) -> Array[String]?
def self?.cached_lookup_set: (String hostname, Array[dns_result] addresses) -> void def self?.cached_lookup_set: (String hostname, ip_family family, Array[dns_result] addresses) -> void
def self?.lookup: (String hostname, Numeric ttl) -> Array[String]? def self?.lookup: (String hostname, Numeric ttl) -> Array[String]?

View File

@ -6,6 +6,7 @@ module HTTPX
DEFAULTS: Hash[Symbol, untyped] DEFAULTS: Hash[Symbol, untyped]
FAMILY_TYPES: Hash[singleton(Resolv::DNS::Resource), String] FAMILY_TYPES: Hash[singleton(Resolv::DNS::Resource), String]
@family: ip_family
@options: Options @options: Options
@requests: Hash[Request, Connection] @requests: Hash[Request, Connection]
@connections: Array[Connection] @connections: Array[Connection]
@ -20,6 +21,8 @@ module HTTPX
private private
def initialize: (ip_family family, options options) -> void
def resolver_connection: () -> Connection def resolver_connection: () -> Connection
def resolve: (?Connection connection, ?String? hostname) -> void def resolve: (?Connection connection, ?String? hostname) -> void

View File

@ -7,6 +7,7 @@ module HTTPX
DEFAULTS: Hash[Symbol, untyped] DEFAULTS: Hash[Symbol, untyped]
DNS_PORT: Integer DNS_PORT: Integer
@family: ip_family
@options: Options @options: Options
@ns_index: Integer @ns_index: Integer
@nameserver: String @nameserver: String
@ -28,6 +29,8 @@ module HTTPX
private private
def initialize: (ip_family family, options options) -> void
def calculate_interests: () -> (:r | :w) def calculate_interests: () -> (:r | :w)
def consume: () -> void def consume: () -> void

View File

@ -7,7 +7,7 @@ module HTTPX
RECORD_TYPES: Hash[Integer, singleton(Resolv::DNS::Resource)] RECORD_TYPES: Hash[Integer, singleton(Resolv::DNS::Resource)]
CHECK_IF_IP: ^(String name) -> bool CHECK_IF_IP: ^(String name) -> bool
attr_reader family: Integer attr_reader family: ip_family
@record_type: singleton(Resolv::DNS::Resource) @record_type: singleton(Resolv::DNS::Resource)
@options: Options @options: Options
@ -23,7 +23,7 @@ module HTTPX
private private
def initialize: (Integer family, options options) -> void def initialize: (ip_family? family, options options) -> void
def emit_addresses: (Connection, Array[ipaddr | Resolv::DNS::ip_address]) -> void def emit_addresses: (Connection, Array[ipaddr | Resolv::DNS::ip_address]) -> void

View File

@ -6,6 +6,10 @@ module HTTPX
@resolver: Resolv::DNS @resolver: Resolv::DNS
def <<: (Connection) -> void def <<: (Connection) -> void
private
def initialize: (options options) -> void
end end
end end
end end

View File

@ -8,17 +8,25 @@ class ResolverTest < Minitest::Test
def test_cached_lookup def test_cached_lookup
ips = Resolver.cached_lookup("test.com") ips = Resolver.cached_lookup("test.com")
assert ips.nil? assert ips.nil?
dns_entry = { "data" => "IP", "TTL" => 2, "name" => "test.com" } dns_entry = { "data" => "IPv6", "TTL" => 2, "name" => "test.com" }
Resolver.cached_lookup_set("test.com", [dns_entry]) Resolver.cached_lookup_set("test.com", Socket::AF_INET6, [dns_entry])
ips = Resolver.cached_lookup("test.com") ips = Resolver.cached_lookup("test.com")
assert ips == ["IP"] assert ips == ["IPv6"]
sleep 2 sleep 2
ips = Resolver.cached_lookup("test.com") ips = Resolver.cached_lookup("test.com")
assert ips.nil? assert ips.nil?
alias_entry = { "alias" => "test.com", "TTL" => 2, "name" => "foo.com" } alias_entry = { "alias" => "test.com", "TTL" => 2, "name" => "foo.com" }
Resolver.cached_lookup_set("test.com", [dns_entry]) Resolver.cached_lookup_set("test.com", Socket::AF_INET6, [dns_entry])
Resolver.cached_lookup_set("foo.com", [alias_entry]) Resolver.cached_lookup_set("foo.com", Socket::AF_INET6, [alias_entry])
ips = Resolver.cached_lookup("foo.com") ips = Resolver.cached_lookup("foo.com")
assert ips == ["IP"] assert ips == ["IPv6"]
Resolver.cached_lookup_set("test.com", Socket::AF_INET6, [{ "data" => "IPv6_2", "TTL" => 2, "name" => "test.com" }])
ips = Resolver.cached_lookup("test.com")
assert ips == %w[IPv6 IPv6_2]
Resolver.cached_lookup_set("test.com", Socket::AF_INET, [{ "data" => "IPv4", "TTL" => 2, "name" => "test.com" }])
ips = Resolver.cached_lookup("test.com")
assert ips == %w[IPv4 IPv6 IPv6_2]
end end
end end