mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-10-04 00:00:37 -04:00
initial reimplementation of the system resolver, now using getaddrinfo
the ruby `resolver` library does everthing in ruby, and sequentially (first ipv4 then ipv6 resolution). we already have native for that, and getaddrinfo should be considered the ideal way to use DNS (potentially in the future, it becomes the default resolver).
This commit is contained in:
parent
2940323412
commit
554957f6ca
@ -18,11 +18,6 @@ module HTTPX
|
||||
use_get: false,
|
||||
}.freeze
|
||||
|
||||
FAMILY_TYPES = {
|
||||
Resolv::DNS::Resource::IN::AAAA => "AAAA",
|
||||
Resolv::DNS::Resource::IN::A => "A",
|
||||
}.freeze
|
||||
|
||||
def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close
|
||||
|
||||
def initialize(_, options)
|
||||
@ -64,7 +59,7 @@ module HTTPX
|
||||
@building_connection = true
|
||||
connection = @options.connection_class.new("ssl", @uri, @options.merge(ssl: { alpn_protocols: %w[h2] }))
|
||||
@pool.init_connection(connection, @options)
|
||||
emit_addresses(connection, @uri_addresses)
|
||||
emit_addresses(connection, @family, @uri_addresses)
|
||||
@building_connection = false
|
||||
connection
|
||||
end
|
||||
@ -154,7 +149,7 @@ module HTTPX
|
||||
|
||||
@connections.delete(connection)
|
||||
Resolver.cached_lookup_set(hostname, @family, addresses) if @resolver_options[:cache]
|
||||
emit_addresses(connection, addresses.map { |addr| addr["data"] })
|
||||
emit_addresses(connection, @family, addresses.map { |addr| addr["data"] })
|
||||
end
|
||||
end
|
||||
return if @connections.empty?
|
||||
|
@ -6,6 +6,7 @@ require "resolv"
|
||||
module HTTPX
|
||||
class Resolver::Multi
|
||||
include Callbacks
|
||||
using ArrayExtensions
|
||||
|
||||
attr_reader :resolvers
|
||||
|
||||
|
@ -237,7 +237,7 @@ module HTTPX
|
||||
@timeouts.delete(connection.origin.host)
|
||||
@connections.delete(connection)
|
||||
Resolver.cached_lookup_set(connection.origin.host, @family, addresses) if @resolver_options[:cache]
|
||||
emit_addresses(connection, addresses.map { |addr| addr["data"] })
|
||||
emit_addresses(connection, @family, addresses.map { |addr| addr["data"] })
|
||||
end
|
||||
end
|
||||
return emit(:close) if @connections.empty?
|
||||
|
@ -13,6 +13,11 @@ module HTTPX
|
||||
Socket::AF_INET => Resolv::DNS::Resource::IN::A,
|
||||
}.freeze
|
||||
|
||||
FAMILY_TYPES = {
|
||||
Resolv::DNS::Resource::IN::AAAA => "AAAA",
|
||||
Resolv::DNS::Resource::IN::A => "A",
|
||||
}.freeze
|
||||
|
||||
CHECK_IF_IP = ->(name) do
|
||||
begin
|
||||
IPAddr.new(name)
|
||||
@ -50,14 +55,14 @@ module HTTPX
|
||||
|
||||
private
|
||||
|
||||
def emit_addresses(connection, addresses)
|
||||
def emit_addresses(connection, family, addresses)
|
||||
addresses.map! do |address|
|
||||
address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s)
|
||||
end
|
||||
log { "resolver: answer #{connection.origin.host}: #{addresses.inspect}" }
|
||||
if !connection.io &&
|
||||
connection.options.ip_families.size > 1 &&
|
||||
addresses.first.ipv4? &&
|
||||
family == Socket::AF_INET &&
|
||||
addresses.first.to_s != connection.origin.host.to_s
|
||||
log { "resolver: A response, applying resolution delay..." }
|
||||
@pool.after(0.05) do
|
||||
@ -77,11 +82,12 @@ module HTTPX
|
||||
system_resolve(hostname)
|
||||
return unless addresses
|
||||
|
||||
emit_addresses(connection, addresses)
|
||||
emit_addresses(connection, nil, addresses)
|
||||
end
|
||||
|
||||
def ip_resolve(hostname)
|
||||
[hostname] if CHECK_IF_IP[hostname]
|
||||
[IPAddr.new(hostname)]
|
||||
rescue ArgumentError
|
||||
end
|
||||
|
||||
def system_resolve(hostname)
|
||||
|
@ -5,11 +5,17 @@ require "resolv"
|
||||
|
||||
module HTTPX
|
||||
class Resolver::System < Resolver::Resolver
|
||||
using URIExtensions
|
||||
extend Forwardable
|
||||
|
||||
RESOLV_ERRORS = [Resolv::ResolvError,
|
||||
Resolv::DNS::Requester::RequestError,
|
||||
Resolv::DNS::EncodeError,
|
||||
Resolv::DNS::DecodeError].freeze
|
||||
|
||||
DONE = 1
|
||||
ERROR = 2
|
||||
|
||||
class << self
|
||||
def multi?
|
||||
false
|
||||
@ -18,14 +24,21 @@ module HTTPX
|
||||
|
||||
attr_reader :state
|
||||
|
||||
def_delegator :@connections, :empty?
|
||||
|
||||
def initialize(options)
|
||||
super(nil, options)
|
||||
@resolver_options = @options.resolver_options
|
||||
resolv_options = @resolver_options.dup
|
||||
timeouts = resolv_options.delete(:timeouts)
|
||||
timeouts = resolv_options.delete(:timeouts) || Resolver::RESOLVE_TIMEOUT
|
||||
@_timeouts = Array(timeouts)
|
||||
@timeouts = Hash.new { |tims, host| tims[host] = @_timeouts.dup }
|
||||
resolv_options.delete(:cache)
|
||||
@resolver = resolv_options.empty? ? Resolv::DNS.new : Resolv::DNS.new(resolv_options)
|
||||
@resolver.timeouts = timeouts || Resolver::RESOLVE_TIMEOUT
|
||||
@connections = []
|
||||
@queries = []
|
||||
@ips = []
|
||||
@pipe_mutex = Thread::Mutex.new
|
||||
@state = :idle
|
||||
end
|
||||
|
||||
def resolvers
|
||||
@ -38,18 +51,142 @@ module HTTPX
|
||||
EMPTY
|
||||
end
|
||||
|
||||
def close
|
||||
transition(:closed)
|
||||
end
|
||||
|
||||
def closed?
|
||||
@state == :closed
|
||||
end
|
||||
|
||||
def to_io
|
||||
@pipe_read.to_io
|
||||
end
|
||||
|
||||
def call
|
||||
case @state
|
||||
when :open
|
||||
consume
|
||||
end
|
||||
nil
|
||||
end
|
||||
|
||||
def interests
|
||||
return if @queries.empty?
|
||||
|
||||
:r
|
||||
end
|
||||
|
||||
def timeout
|
||||
return unless @queries.empty?
|
||||
|
||||
_, connection = @queries.first
|
||||
|
||||
@timeouts[connection.origin.host].first
|
||||
end
|
||||
|
||||
def <<(connection)
|
||||
return if early_resolve(connection)
|
||||
|
||||
@connections << connection
|
||||
resolve
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def transition(nextstate)
|
||||
case nextstate
|
||||
when :idle
|
||||
@timeouts.clear
|
||||
when :open
|
||||
return unless @state == :idle
|
||||
|
||||
@pipe_read, @pipe_write = ::IO.pipe
|
||||
when :closed
|
||||
return unless @state == :open
|
||||
|
||||
@pipe_write.close
|
||||
@pipe_read.close
|
||||
end
|
||||
@state = nextstate
|
||||
end
|
||||
|
||||
def consume
|
||||
return if @connections.empty?
|
||||
|
||||
while @pipe_read.ready? && (event = @pipe_read.getbyte)
|
||||
case event
|
||||
when DONE
|
||||
*pair, addrs = @pipe_mutex.synchronize { @ips.pop }
|
||||
@queries.delete(pair)
|
||||
|
||||
family, connection = pair
|
||||
emit_addresses(connection, family, addrs)
|
||||
when ERROR
|
||||
*pair, error = @pipe_mutex.synchronize { @ips.pop }
|
||||
@queries.delete(pair)
|
||||
|
||||
family, connection = pair
|
||||
emit_resolve_error(connection, connection.origin.host, error)
|
||||
end
|
||||
|
||||
@connections.delete(connection) if @queries.empty?
|
||||
end
|
||||
|
||||
return emit(:close, self) if @connections.empty?
|
||||
|
||||
resolve
|
||||
end
|
||||
|
||||
def resolve(connection = @connections.first)
|
||||
raise Error, "no URI to resolve" unless connection
|
||||
return unless @queries.empty?
|
||||
|
||||
hostname = connection.origin.host
|
||||
addresses = connection.addresses ||
|
||||
ip_resolve(hostname) ||
|
||||
system_resolve(hostname) ||
|
||||
@resolver.getaddresses(hostname)
|
||||
scheme = connection.origin.scheme
|
||||
log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname
|
||||
|
||||
throw(:resolve_error, resolve_error(hostname)) if addresses.empty?
|
||||
transition(:open)
|
||||
|
||||
emit_addresses(connection, addresses)
|
||||
rescue Errno::EHOSTUNREACH, *RESOLV_ERRORS => e
|
||||
emit_resolve_error(connection, hostname, e)
|
||||
connection.options.ip_families.each do |family|
|
||||
@queries << [family, connection]
|
||||
end
|
||||
async_resolve(connection, hostname, scheme)
|
||||
consume
|
||||
end
|
||||
|
||||
def async_resolve(connection, hostname, scheme)
|
||||
families = connection.options.ip_families
|
||||
log { "resolver: query for #{hostname}" }
|
||||
Thread.start do
|
||||
Thread.current.report_on_exception = false
|
||||
begin
|
||||
addrs = __addrinfo_resolve(hostname, scheme)
|
||||
addrs = addrs.sort_by(&:afamily).group_by(&:afamily)
|
||||
families.each do |family|
|
||||
addresses = addrs[family]
|
||||
next unless addresses
|
||||
|
||||
addresses.map!(&:ip_address)
|
||||
addresses.uniq!
|
||||
@pipe_mutex.synchronize do
|
||||
@ips.unshift([family, connection, addresses])
|
||||
@pipe_write.putc(DONE) unless @pipe_write.closed?
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
@pipe_mutex.synchronize do
|
||||
families.each do |family|
|
||||
@ips.unshift([family, connection, e])
|
||||
@pipe_write.putc(ERROR) unless @pipe_write.closed?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def __addrinfo_resolve(host, scheme)
|
||||
Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -25,7 +25,7 @@ module HTTPX
|
||||
|
||||
def initialize: (ip_family? family, options options) -> void
|
||||
|
||||
def emit_addresses: (Connection, Array[ipaddr | Resolv::DNS::ip_address]) -> void
|
||||
def emit_addresses: (Connection, ip_family? family, Array[ipaddr | Resolv::DNS::ip_address]) -> void
|
||||
|
||||
def early_resolve: (Connection, ?hostname: String) -> void
|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
module HTTPX
|
||||
class Selector
|
||||
type selectable = Connection | Resolver::Native
|
||||
type selectable = Connection | Resolver::Native | Resolver::System
|
||||
|
||||
READABLE: Array[Symbol]
|
||||
WRITABLE: Array[Symbol]
|
||||
|
Loading…
x
Reference in New Issue
Block a user