applying a resolver manager to hold the different family type resolvers for the pool. This allows to have multiple resolvers per type, i.e. IPv6 and IPv4

This commit is contained in:
HoneyryderChuck 2021-12-12 14:32:12 +00:00
parent 71920157f4
commit 06b162b6ea
19 changed files with 170 additions and 95 deletions

View File

@ -1,5 +1,7 @@
# frozen_string_literal: true # frozen_string_literal: true
require "socket"
module HTTPX module HTTPX
class Options class Options
WINDOW_SIZE = 1 << 14 # 16K WINDOW_SIZE = 1 << 14 # 16K
@ -9,6 +11,17 @@ module HTTPX
KEEP_ALIVE_TIMEOUT = 20 KEEP_ALIVE_TIMEOUT = 20
SETTINGS_TIMEOUT = 10 SETTINGS_TIMEOUT = 10
# https://github.com/ruby/resolv/blob/095f1c003f6073730500f02acbdbc55f83d70987/lib/resolv.rb#L408
ip_address_families = begin
list = Socket.ip_address_list
if list.any? { |a| a.ipv6? && !a.ipv6_loopback? && !a.ipv6_linklocal? }
# [Socket::AF_INET6, Socket::AF_INET]
end
[Socket::AF_INET]
rescue NotImplementedError
[Socket::AF_INET]
end
DEFAULT_OPTIONS = { DEFAULT_OPTIONS = {
:debug => ENV.key?("HTTPX_DEBUG") ? $stderr : nil, :debug => ENV.key?("HTTPX_DEBUG") ? $stderr : nil,
:debug_level => (ENV["HTTPX_DEBUG"] || 1).to_i, :debug_level => (ENV["HTTPX_DEBUG"] || 1).to_i,
@ -37,6 +50,7 @@ module HTTPX
:persistent => false, :persistent => false,
:resolver_class => (ENV["HTTPX_RESOLVER"] || :native).to_sym, :resolver_class => (ENV["HTTPX_RESOLVER"] || :native).to_sym,
:resolver_options => { cache: true }, :resolver_options => { cache: true },
:ip_families => ip_address_families,
}.freeze }.freeze
begin begin
@ -172,6 +186,10 @@ module HTTPX
Array(value) Array(value)
end end
def option_ip_families(value)
Array(value)
end
%i[ %i[
params form json body ssl http2_settings params form json body ssl http2_settings
request_class response_class headers_class request_body_class request_class response_class headers_class request_body_class

View File

@ -1,9 +1,5 @@
# frozen_string_literal: true # frozen_string_literal: true
require "resolv"
require "ipaddr"
require "forwardable"
module HTTPX module HTTPX
class HTTPProxyError < Error; end class HTTPProxyError < Error; end
@ -252,7 +248,7 @@ module HTTPX
case nextstate case nextstate
when :closing when :closing
# this is a hack so that we can use the super method # this is a hack so that we can use the super method
# and it'll thing that the current state is open # and it'll think that the current state is open
@state = :open if @state == :connecting @state = :open if @state == :connecting
end end
super super

View File

@ -106,11 +106,12 @@ module HTTPX
return return
end end
resolver = find_resolver_for(connection) find_resolver_for(connection) do |resolver|
resolver << connection resolver << connection
return if resolver.empty? return if resolver.empty?
select_connection(resolver) select_connection(resolver)
end
end end
def on_resolver_connection(connection) def on_resolver_connection(connection)
@ -137,7 +138,7 @@ module HTTPX
def on_resolver_close(resolver) def on_resolver_close(resolver)
resolver_type = resolver.class resolver_type = resolver.class
return unless @resolvers[resolver_type] == resolver return if resolver.closed?
@resolvers.delete(resolver_type) @resolvers.delete(resolver_type)
@ -172,12 +173,10 @@ module HTTPX
end end
def coalesce_connections(conn1, conn2) def coalesce_connections(conn1, conn2)
if conn1.coalescable?(conn2) return register_connection(conn2) unless conn1.coalescable?(conn2)
conn1.merge(conn2)
@connections.delete(conn2) conn1.merge(conn2)
else @connections.delete(conn2)
register_connection(conn2)
end
end end
def next_timeout def next_timeout
@ -194,13 +193,19 @@ module HTTPX
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol) resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
@resolvers[resolver_type] ||= begin @resolvers[resolver_type] ||= begin
resolver = resolver_type.new(connection_options) resolver_manager = Resolver::Multi.new(resolver_type, connection_options)
resolver.pool = self if resolver.respond_to?(:pool=) resolver_manager.on(:resolve, &method(:on_resolver_connection))
resolver.on(:resolve, &method(:on_resolver_connection)) resolver_manager.on(:error, &method(:on_resolver_error))
resolver.on(:error, &method(:on_resolver_error)) resolver_manager.on(:close, &method(:on_resolver_close))
resolver.on(:close) { on_resolver_close(resolver) } resolver_manager
resolver
end end
manager = @resolvers[resolver_type]
manager.resolvers.each do |resolver|
resolver.pool = self if resolver.respond_to?(:pool=)
yield resolver
end
manager
end end
end end
end end

View File

@ -12,6 +12,7 @@ module HTTPX
require "httpx/resolver/system" require "httpx/resolver/system"
require "httpx/resolver/native" require "httpx/resolver/native"
require "httpx/resolver/https" require "httpx/resolver/https"
require "httpx/resolver/multi"
register :system, System register :system, System
register :native, Native register :native, Native

View File

@ -16,17 +16,20 @@ module HTTPX
DEFAULTS = { DEFAULTS = {
uri: NAMESERVER, uri: NAMESERVER,
use_get: false, use_get: false,
record_types: RECORD_TYPES.keys, }.freeze
FAMILY_TYPES = {
Resolv::DNS::Resource::IN::AAAA => "AAAA",
Resolv::DNS::Resource::IN::A => "A",
}.freeze }.freeze
def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close
attr_writer :pool attr_writer :pool
def initialize(options) def initialize(_, options)
super super
@resolver_options = DEFAULTS.merge(@options.resolver_options) @resolver_options = DEFAULTS.merge(@options.resolver_options)
@_record_types = Hash.new { |types, host| types[host] = @resolver_options[:record_types].dup }
@queries = {} @queries = {}
@requests = {} @requests = {}
@connections = [] @connections = []
@ -81,17 +84,16 @@ module HTTPX
hostname = connection.origin.host hostname = connection.origin.host
log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname
end end
type = @_record_types[hostname].first || "A" log { "resolver: query #{FAMILY_TYPES[@family]} for #{hostname}" }
log { "resolver: query #{type} for #{hostname}" }
begin begin
request = build_request(hostname, type) request = build_request(hostname)
request.on(:response, &method(:on_response).curry(2)[request]) request.on(:response, &method(:on_response).curry(2)[request])
request.on(:promise, &method(:on_promise)) request.on(:promise, &method(:on_promise))
@requests[request] = connection @requests[request] = connection
resolver_connection.send(request) resolver_connection.send(request)
@queries[hostname] = connection @queries[hostname] = connection
@connections << connection @connections << connection
rescue Resolv::DNS::EncodeError, JSON::JSONError => e rescue ResolveError, Resolv::DNS::EncodeError, JSON::JSONError => e
emit_resolve_error(connection, hostname, e) emit_resolve_error(connection, hostname, e)
end end
end end
@ -119,21 +121,15 @@ module HTTPX
answers = decode_response_body(response) answers = decode_response_body(response)
rescue Resolv::DNS::DecodeError, JSON::JSONError => e rescue Resolv::DNS::DecodeError, JSON::JSONError => e
host, connection = @queries.first host, connection = @queries.first
if @_record_types[host].empty? @queries.delete(host)
@queries.delete(host) emit_resolve_error(connection, host, e)
emit_resolve_error(connection, host, e) return
return
end
end end
if answers.nil? || answers.empty? if answers.nil? || answers.empty?
host, connection = @queries.first host, connection = @queries.first
@_record_types[host].shift @queries.delete(host)
if @_record_types[host].empty? emit_resolve_error(connection, host)
@queries.delete(host) return
@_record_types.delete(host)
emit_resolve_error(connection, host)
return
end
else else
answers = answers.group_by { |answer| answer["name"] } answers = answers.group_by { |answer| answer["name"] }
answers.each do |hostname, addresses| answers.each do |hostname, addresses|
@ -168,14 +164,14 @@ module HTTPX
resolve resolve
end end
def build_request(hostname, type) def build_request(hostname)
uri = @uri.dup uri = @uri.dup
rklass = @options.request_class rklass = @options.request_class
payload = Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type]) payload = Resolver.encode_dns_query(hostname, type: @record_type)
if @resolver_options[:use_get] if @resolver_options[:use_get]
params = URI.decode_www_form(uri.query.to_s) params = URI.decode_www_form(uri.query.to_s)
params << ["type", type] params << ["type", FAMILY_TYPES[@record_type]]
params << ["dns", Base64.urlsafe_encode64(payload, padding: false)] params << ["dns", Base64.urlsafe_encode64(payload, padding: false)]
uri.query = URI.encode_www_form(params) uri.query = URI.encode_www_form(params)
request = rklass.new("GET", uri, @options) request = rklass.new("GET", uri, @options)

View File

@ -0,0 +1,57 @@
# frozen_string_literal: true
require "forwardable"
require "resolv"
module HTTPX
class Resolver::Multi
include Callbacks
attr_reader :resolvers
def initialize(resolver_type, options)
@options = options
@resolvers = options.ip_families.map do |ip_family|
resolver = resolver_type.new(ip_family, options)
resolver.on(:resolve, &method(:on_resolver_connection))
resolver.on(:error, &method(:on_resolver_error))
resolver.on(:close) { on_resolver_close(resolver) }
resolver
end
@errors = Hash.new { |hs, k| hs[k] = [] }
end
def closed?
@resolvers.all?(&:closed?)
end
def timeout
@resolvers.map(&:timeout).min
end
def close
@resolvers.each(&:close)
end
private
def on_resolver_connection(connection)
emit(:resolve, connection)
end
def on_resolver_error(connection, error)
@errors[connection] << error
return unless @errors[connection].size >= @resolvers.size
errors = @errors.delete(connection)
emit(:error, connection, errors.first)
end
def on_resolver_close(resolver)
emit(:close, resolver)
end
end
end

View File

@ -13,7 +13,6 @@ module HTTPX
**Resolv::DNS::Config.default_config_hash, **Resolv::DNS::Config.default_config_hash,
packet_size: 512, packet_size: 512,
timeouts: Resolver::RESOLVE_TIMEOUT, timeouts: Resolver::RESOLVE_TIMEOUT,
record_types: RECORD_TYPES.keys,
}.freeze }.freeze
else else
{ {
@ -21,7 +20,6 @@ module HTTPX
**Resolv::DNS::Config.default_config_hash, **Resolv::DNS::Config.default_config_hash,
packet_size: 512, packet_size: 512,
timeouts: Resolver::RESOLVE_TIMEOUT, timeouts: Resolver::RESOLVE_TIMEOUT,
record_types: RECORD_TYPES.keys,
}.freeze }.freeze
end end
@ -43,14 +41,13 @@ module HTTPX
attr_reader :state attr_reader :state
def initialize(options) def initialize(_, options)
super super
@ns_index = 0 @ns_index = 0
@resolver_options = DEFAULTS.merge(@options.resolver_options) @resolver_options = DEFAULTS.merge(@options.resolver_options)
@nameserver = @resolver_options[:nameserver] @nameserver = @resolver_options[:nameserver]
@_timeouts = Array(@resolver_options[:timeouts]) @_timeouts = Array(@resolver_options[:timeouts])
@timeouts = Hash.new { |timeouts, host| timeouts[host] = @_timeouts.dup } @timeouts = Hash.new { |timeouts, host| timeouts[host] = @_timeouts.dup }
@_record_types = Hash.new { |types, host| types[host] = @resolver_options[:record_types].dup }
@connections = [] @connections = []
@queries = {} @queries = {}
@read_buffer = "".b @read_buffer = "".b
@ -192,27 +189,21 @@ module HTTPX
addresses = Resolver.decode_dns_answer(buffer) addresses = Resolver.decode_dns_answer(buffer)
rescue Resolv::DNS::DecodeError => e rescue Resolv::DNS::DecodeError => e
hostname, connection = @queries.first hostname, connection = @queries.first
if @_record_types[hostname].empty? @queries.delete(hostname)
@queries.delete(hostname) @timeouts.delete(hostname)
@timeouts.delete(hostname) @connections.delete(connection)
@connections.delete(connection) ex = NativeResolveError.new(connection, hostname, e.message)
ex = NativeResolveError.new(connection, hostname, e.message) ex.set_backtrace(e.backtrace)
ex.set_backtrace(e.backtrace) raise ex
raise ex
end
end end
if addresses.nil? || addresses.empty? if addresses.nil? || addresses.empty?
hostname, connection = @queries.first hostname, connection = @queries.first
@_record_types[hostname].shift @queries.delete(hostname)
if @_record_types[hostname].empty? @timeouts.delete(hostname)
@queries.delete(hostname) @connections.delete(connection)
@_record_types.delete(hostname)
@timeouts.delete(hostname)
@connections.delete(connection)
raise NativeResolveError.new(connection, hostname) raise NativeResolveError.new(connection, hostname)
end
else else
address = addresses.first address = addresses.first
name = address["name"] name = address["name"]
@ -265,10 +256,9 @@ module HTTPX
log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname log { "resolver: resolve IDN #{connection.origin.non_ascii_hostname} as #{hostname}" } if connection.origin.non_ascii_hostname
end end
@queries[hostname] = connection @queries[hostname] = connection
type = @_record_types[hostname].first || "A" log { "resolver: query #{@record_type.name.split("::").last} for #{hostname}" }
log { "resolver: query #{type} for #{hostname}" }
begin begin
@write_buffer << Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type]) @write_buffer << Resolver.encode_dns_query(hostname, type: @record_type)
rescue Resolv::DNS::EncodeError => e rescue Resolv::DNS::EncodeError => e
emit_resolve_error(connection, hostname, e) emit_resolve_error(connection, hostname, e)
end end

View File

@ -9,8 +9,8 @@ module HTTPX
include Loggable include Loggable
RECORD_TYPES = { RECORD_TYPES = {
"A" => Resolv::DNS::Resource::IN::A, Socket::AF_INET6 => Resolv::DNS::Resource::IN::AAAA,
"AAAA" => Resolv::DNS::Resource::IN::AAAA, Socket::AF_INET => Resolv::DNS::Resource::IN::A,
}.freeze }.freeze
CHECK_IF_IP = ->(name) do CHECK_IF_IP = ->(name) do
@ -22,7 +22,11 @@ module HTTPX
end end
end end
def initialize(options) attr_reader :family
def initialize(family, options)
@family = family
@record_type = RECORD_TYPES[family]
@options = Options.new(options) @options = Options.new(options)
end end

View File

@ -12,7 +12,7 @@ module HTTPX
attr_reader :state attr_reader :state
def initialize(options) def initialize(_, options)
super super
@resolver_options = @options.resolver_options @resolver_options = @options.resolver_options
resolv_options = @resolver_options.dup resolv_options = @resolver_options.dup
@ -28,6 +28,7 @@ module HTTPX
ip_resolve(hostname) || ip_resolve(hostname) ||
system_resolve(hostname) || system_resolve(hostname) ||
@resolver.getaddresses(hostname) @resolver.getaddresses(hostname)
throw(:resolve_error, resolve_error(hostname)) if addresses.empty? throw(:resolve_error, resolve_error(hostname)) if addresses.empty?
emit_addresses(connection, addresses) emit_addresses(connection, addresses)

View File

@ -87,10 +87,6 @@ module HTTPX
attr_reader ssl: Hash[Symbol, untyped] attr_reader ssl: Hash[Symbol, untyped]
# request_class response_class headers_class request_body_class
# response_body_class connection_class
# resolver_class resolver_options
# io # io
type io_option = _ToIO | Hash[String, _ToIO] type io_option = _ToIO | Hash[String, _ToIO]
attr_reader io: io_option? attr_reader io: io_option?
@ -110,6 +106,9 @@ module HTTPX
# resolver_options # resolver_options
attr_reader resolver_options: Hash[Symbol, untyped] attr_reader resolver_options: Hash[Symbol, untyped]
# ip_families
attr_reader ip_families: Array[Integer]
def ==: (untyped other) -> bool def ==: (untyped other) -> bool
def merge: (_ToHash[Symbol, untyped] other) -> instance def merge: (_ToHash[Symbol, untyped] other) -> instance
def to_hash: () -> Hash[Symbol, untyped] def to_hash: () -> Hash[Symbol, untyped]

View File

@ -1,6 +1,6 @@
module HTTPX module HTTPX
class Pool class Pool
@resolvers: Hash[Class, Resolver::Resolver] @resolvers: Hash[Class, Resolver::Multi]
@timers: Timers @timers: Timers
@selector: Selector @selector: Selector
@connections: Array[Connection] @connections: Array[Connection]
@ -42,6 +42,6 @@ module HTTPX
def next_timeout: () -> (Integer | Float | nil) def next_timeout: () -> (Integer | Float | nil)
def find_resolver_for: (Connection) -> Resolver::Resolver def find_resolver_for: (Connection) { (Resolver::Resolver) -> void } -> Resolver::Multi
end end
end end

View File

@ -4,6 +4,7 @@ module HTTPX
NAMESERVER: String NAMESERVER: String
DEFAULTS: Hash[Symbol, untyped] DEFAULTS: Hash[Symbol, untyped]
FAMILY_TYPES: Hash[singleton(Resolv::DNS::Resource), String]
@options: Options @options: Options
@requests: Hash[Request, Connection] @requests: Hash[Request, Connection]
@ -19,8 +20,6 @@ module HTTPX
private private
def initialize: (options) -> untyped
def resolver_connection: () -> Connection def resolver_connection: () -> Connection
def resolve: (?Connection connection, ?String? hostname) -> void def resolve: (?Connection connection, ?String? hostname) -> void
@ -29,7 +28,7 @@ module HTTPX
def parse: (Response response) -> void def parse: (Response response) -> void
def build_request: (String hostname, "A" | "AAAA") -> Request def build_request: (String hostname) -> Request
def decode_response_body: (Response) -> Array[dns_result] def decode_response_body: (Response) -> Array[dns_result]
end end

6
sig/resolver/multi.rbs Normal file
View File

@ -0,0 +1,6 @@
module HTTPX
module Resolver
class Multi
end
end
end

View File

@ -28,8 +28,6 @@ module HTTPX
private private
def initialize: (options) -> untyped
def calculate_interests: () -> (:r | :w) def calculate_interests: () -> (:r | :w)
def consume: () -> void def consume: () -> void

View File

@ -4,16 +4,17 @@ module HTTPX
include Callbacks include Callbacks
include Loggable include Loggable
RECORD_TYPES: Hash[String, singleton(Resolv::DNS::Resource)] RECORD_TYPES: Hash[Integer, singleton(Resolv::DNS::Resource)]
CHECK_IF_IP: ^(String name) -> bool
attr_reader family: Integer
@record_type: singleton(Resolv::DNS::Resource)
@options: Options @options: Options
@resolver_options: Hash[Symbol, untyped] @resolver_options: Hash[Symbol, untyped]
@_record_types: Hash[String, ["A" | "AAAA", dns_resource]]
@queries: Hash[String, Connection] @queries: Hash[String, Connection]
@system_resolver: Resolv::Hosts @system_resolver: Resolv::Hosts
CHECK_IF_IP: ^(String name) -> bool
def close: () -> void def close: () -> void
def closed?: () -> bool def closed?: () -> bool
@ -22,7 +23,7 @@ module HTTPX
private private
def initialize: (options) -> void def initialize: (Integer family, options options) -> void
def emit_addresses: (Connection, Array[ipaddr | Resolv::DNS::ip_address]) -> void def emit_addresses: (Connection, Array[ipaddr | Resolv::DNS::ip_address]) -> void
@ -34,7 +35,7 @@ module HTTPX
def emit_resolve_error: (Connection, ?String hostname, ?StandardError) -> void def emit_resolve_error: (Connection, ?String hostname, ?StandardError) -> void
def resolve_error: (String hostname, ?StandardError?) -> void def resolve_error: (String hostname, ?StandardError?) -> ResolveError
end end
end end
end end

View File

@ -6,10 +6,6 @@ module HTTPX
@resolver: Resolv::DNS @resolver: Resolv::DNS
def <<: (Connection) -> void def <<: (Connection) -> void
private
def initialize: (options) -> untyped
end end
end end
end end

View File

@ -127,6 +127,7 @@ class OptionsTest < Minitest::Test
:persistent => false, :persistent => false,
:resolver_class => bar.resolver_class, :resolver_class => bar.resolver_class,
:resolver_options => bar.resolver_options, :resolver_options => bar.resolver_options,
:ip_families => bar.ip_families,
}.compact }.compact
assert foo.merge(bar).to_hash == expected, "options haven't merged correctly" assert foo.merge(bar).to_hash == expected, "options haven't merged correctly"

View File

@ -57,7 +57,7 @@ module Requests
session = HTTPX.plugin(SessionWithPool) session = HTTPX.plugin(SessionWithPool)
uri = URI(build_uri("/get")) uri = URI(build_uri("/get"))
resolver_class = Class.new(HTTPX::Resolver::HTTPS) do resolver_class = Class.new(HTTPX::Resolver::HTTPS) do
def build_request(_hostname, _type) def build_request(_hostname)
@options.request_class.new("POST", @uri) @options.request_class.new("POST", @uri)
end end
end end

View File

@ -150,8 +150,15 @@ module WSTestPlugin
attr_reader :websocket attr_reader :websocket
def init_websocket(connection) def init_websocket(connection)
socket = connection.to_io if connection.state == :open
@websocket = WSCLient.new(socket, @headers) socket = connection.to_io
@websocket = WSCLient.new(socket, @headers)
else
connection.once(:open) do
socket = connection.to_io
@websocket = WSCLient.new(socket, @headers)
end
end
end end
end end