mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
Channel -> Connection
This commit is contained in:
parent
5d8c65cfe3
commit
935a58e11e
@ -55,51 +55,51 @@ module HTTPX
|
||||
@responses.delete(request)
|
||||
end
|
||||
|
||||
def find_channel(request, **options)
|
||||
def find_connection(request, **options)
|
||||
uri = URI(request.uri)
|
||||
@pool.find_channel(uri) || build_channel(uri, options)
|
||||
@pool.find_connection(uri) || build_connection(uri, options)
|
||||
end
|
||||
|
||||
def set_channel_callbacks(channel, options)
|
||||
channel.on(:response, &method(:on_response))
|
||||
channel.on(:promise, &method(:on_promise))
|
||||
channel.on(:uncoalesce) do |uncoalesced_uri|
|
||||
other_channel = build_channel(uncoalesced_uri, options)
|
||||
channel.unmerge(other_channel)
|
||||
def set_connection_callbacks(connection, options)
|
||||
connection.on(:response, &method(:on_response))
|
||||
connection.on(:promise, &method(:on_promise))
|
||||
connection.on(:uncoalesce) do |uncoalesced_uri|
|
||||
other_connection = build_connection(uncoalesced_uri, options)
|
||||
connection.unmerge(other_connection)
|
||||
end
|
||||
channel.on(:altsvc) do |alt_origin, origin, alt_params|
|
||||
build_altsvc_channel(channel, alt_origin, origin, alt_params, options)
|
||||
connection.on(:altsvc) do |alt_origin, origin, alt_params|
|
||||
build_altsvc_connection(connection, alt_origin, origin, alt_params, options)
|
||||
end
|
||||
end
|
||||
|
||||
def build_channel(uri, options)
|
||||
channel = @pool.build_channel(uri, **options)
|
||||
set_channel_callbacks(channel, options)
|
||||
channel
|
||||
def build_connection(uri, options)
|
||||
connection = @pool.build_connection(uri, **options)
|
||||
set_connection_callbacks(connection, options)
|
||||
connection
|
||||
end
|
||||
|
||||
def build_altsvc_channel(existing_channel, alt_origin, origin, alt_params, options)
|
||||
def build_altsvc_connection(existing_connection, alt_origin, origin, alt_params, options)
|
||||
altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin))
|
||||
|
||||
# altsvc already exists, somehow it wasn't advertised, probably noop
|
||||
return unless altsvc
|
||||
|
||||
channel = @pool.find_channel(alt_origin) || build_channel(alt_origin, options)
|
||||
connection = @pool.find_connection(alt_origin) || build_connection(alt_origin, options)
|
||||
# advertised altsvc is the same origin being used, ignore
|
||||
return if channel == existing_channel
|
||||
return if connection == existing_connection
|
||||
|
||||
log(level: 1) { "#{origin} alt-svc: #{alt_origin}" }
|
||||
|
||||
# get uninitialized requests
|
||||
# incidentally, all requests will be re-routed to the first
|
||||
# advertised alt-svc, which incidentally follows the spec.
|
||||
existing_channel.purge_pending do |request, args|
|
||||
existing_connection.purge_pending do |request, args|
|
||||
is_idle = request.origin == origin &&
|
||||
request.state == :idle &&
|
||||
!request.headers.key?("alt-used")
|
||||
if is_idle
|
||||
log(level: 1) { "#{origin} alt-svc: sending #{request.uri} to #{alt_origin}" }
|
||||
channel.send(request, args)
|
||||
connection.send(request, args)
|
||||
end
|
||||
is_idle
|
||||
end
|
||||
@ -133,8 +133,8 @@ module HTTPX
|
||||
|
||||
def __send_reqs(*requests, **options)
|
||||
requests.each do |request|
|
||||
channel = find_channel(request, **options)
|
||||
channel.send(request)
|
||||
connection = find_connection(request, **options)
|
||||
connection.send(request)
|
||||
end
|
||||
responses = []
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
require "httpx/parser/http1"
|
||||
|
||||
module HTTPX
|
||||
class Channel::HTTP1
|
||||
class Connection::HTTP1
|
||||
include Callbacks
|
||||
include Loggable
|
||||
|
||||
@ -250,5 +250,5 @@ module HTTPX
|
||||
UPCASED[field] || field.to_s.split("-").map(&:capitalize).join("-")
|
||||
end
|
||||
end
|
||||
Channel.register "http/1.1", Channel::HTTP1
|
||||
Connection.register "http/1.1", Connection::HTTP1
|
||||
end
|
||||
|
@ -4,7 +4,7 @@ require "io/wait"
|
||||
require "http/2"
|
||||
|
||||
module HTTPX
|
||||
class Channel::HTTP2
|
||||
class Connection::HTTP2
|
||||
include Callbacks
|
||||
include Loggable
|
||||
|
||||
@ -286,5 +286,5 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
end
|
||||
Channel.register "h2", Channel::HTTP2
|
||||
Connection.register "h2", Connection::HTTP2
|
||||
end
|
||||
|
@ -13,7 +13,7 @@ module HTTPX
|
||||
end
|
||||
|
||||
def request(*args, **options)
|
||||
# do not needlessly close channels
|
||||
# do not needlessly close connections
|
||||
keep_open = @keep_open
|
||||
@keep_open = true
|
||||
|
||||
|
@ -25,8 +25,8 @@ module HTTPX
|
||||
upgrade_response = __send_reqs(*upgrade_request, **options).first
|
||||
|
||||
if upgrade_response.status == 101
|
||||
channel = find_channel(upgrade_request)
|
||||
parser = channel.upgrade_parser("h2")
|
||||
connection = find_connection(upgrade_request)
|
||||
parser = connection.upgrade_parser("h2")
|
||||
parser.extend(UpgradeExtensions)
|
||||
parser.upgrade(upgrade_request, upgrade_response, **options)
|
||||
data = upgrade_response.to_s
|
||||
|
@ -47,30 +47,30 @@ module HTTPX
|
||||
@options.proxy.merge(uri: @_proxy_uris.shift) unless @_proxy_uris.empty?
|
||||
end
|
||||
|
||||
def find_channel(request, **options)
|
||||
def find_connection(request, **options)
|
||||
uri = URI(request.uri)
|
||||
proxy = proxy_params(uri)
|
||||
raise Error, "Failed to connect to proxy" unless proxy
|
||||
|
||||
@connection.find_channel(proxy) || build_channel(proxy, options)
|
||||
@pool.find_connection(proxy) || build_connection(proxy, options)
|
||||
end
|
||||
|
||||
def build_channel(proxy, options)
|
||||
def build_connection(proxy, options)
|
||||
return super if proxy.is_a?(URI::Generic)
|
||||
|
||||
channel = build_proxy_channel(proxy, **options)
|
||||
set_channel_callbacks(channel, options)
|
||||
channel
|
||||
connection = build_proxy_connection(proxy, **options)
|
||||
set_connection_callbacks(connection, options)
|
||||
connection
|
||||
end
|
||||
|
||||
def build_proxy_channel(proxy, **options)
|
||||
def build_proxy_connection(proxy, **options)
|
||||
parameters = Parameters.new(**proxy)
|
||||
uri = parameters.uri
|
||||
log { "proxy: #{uri}" }
|
||||
proxy_type = Parameters.registry(parameters.uri.scheme)
|
||||
channel = proxy_type.new("tcp", uri, parameters, @options.merge(options), &method(:on_response))
|
||||
@connection.__send__(:resolve_channel, channel)
|
||||
channel
|
||||
connection = proxy_type.new("tcp", uri, parameters, @options.merge(options), &method(:on_response))
|
||||
@pool.__send__(:resolve_connection, connection)
|
||||
connection
|
||||
end
|
||||
|
||||
def fetch_response(request)
|
||||
@ -81,8 +81,8 @@ module HTTPX
|
||||
response.error.is_a?(Error)) &&
|
||||
!@_proxy_uris.empty?
|
||||
log { "failed connecting to proxy, trying next..." }
|
||||
channel = find_channel(request)
|
||||
channel.send(request)
|
||||
connection = find_connection(request)
|
||||
connection.send(request)
|
||||
return
|
||||
end
|
||||
response
|
||||
@ -107,7 +107,7 @@ module HTTPX
|
||||
register_plugin :proxy, Proxy
|
||||
end
|
||||
|
||||
class ProxyChannel < Channel
|
||||
class ProxyConnection < Connection
|
||||
def initialize(type, uri, parameters, options, &blk)
|
||||
super(type, uri, options, &blk)
|
||||
@parameters = parameters
|
||||
|
@ -6,7 +6,7 @@ module HTTPX
|
||||
module Plugins
|
||||
module Proxy
|
||||
module HTTP
|
||||
class HTTPProxyChannel < ProxyChannel
|
||||
class HTTPProxyConnection < ProxyConnection
|
||||
private
|
||||
|
||||
def proxy_connect
|
||||
@ -71,7 +71,7 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
class ProxyParser < Channel::HTTP1
|
||||
class ProxyParser < Connection::HTTP1
|
||||
def headline_uri(request)
|
||||
request.uri.to_s
|
||||
end
|
||||
@ -112,7 +112,7 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
Parameters.register("http", HTTPProxyChannel)
|
||||
Parameters.register("http", HTTPProxyConnection)
|
||||
end
|
||||
end
|
||||
register_plugin :"proxy/http", Proxy::HTTP
|
||||
|
@ -13,7 +13,7 @@ module HTTPX
|
||||
|
||||
Error = Class.new(Error)
|
||||
|
||||
class Socks4ProxyChannel < ProxyChannel
|
||||
class Socks4ProxyConnection < ProxyConnection
|
||||
private
|
||||
|
||||
def proxy_connect
|
||||
@ -64,8 +64,8 @@ module HTTPX
|
||||
throw(:called)
|
||||
end
|
||||
end
|
||||
Parameters.register("socks4", Socks4ProxyChannel)
|
||||
Parameters.register("socks4a", Socks4ProxyChannel)
|
||||
Parameters.register("socks4", Socks4ProxyConnection)
|
||||
Parameters.register("socks4a", Socks4ProxyConnection)
|
||||
|
||||
class SocksParser
|
||||
include Callbacks
|
||||
|
@ -16,7 +16,7 @@ module HTTPX
|
||||
|
||||
Error = Class.new(Error)
|
||||
|
||||
class Socks5ProxyChannel < ProxyChannel
|
||||
class Socks5ProxyConnection < ProxyConnection
|
||||
def call
|
||||
super
|
||||
case @state
|
||||
@ -108,7 +108,7 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
Parameters.register("socks5", Socks5ProxyChannel)
|
||||
Parameters.register("socks5", Socks5ProxyConnection)
|
||||
|
||||
class SocksParser
|
||||
include Callbacks
|
||||
|
@ -19,8 +19,8 @@ module HTTPX
|
||||
request.retries.positive? &&
|
||||
IDEMPOTENT_METHODS.include?(request.verb)
|
||||
request.retries -= 1
|
||||
channel = find_channel(request)
|
||||
channel.send(request)
|
||||
connection = find_connection(request)
|
||||
connection.send(request)
|
||||
return
|
||||
end
|
||||
response
|
||||
|
@ -1,7 +1,7 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "httpx/selector"
|
||||
require "httpx/channel"
|
||||
require "httpx/connection"
|
||||
require "httpx/resolver"
|
||||
|
||||
module HTTPX
|
||||
@ -12,77 +12,77 @@ module HTTPX
|
||||
resolver_type = @options.resolver_class
|
||||
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
|
||||
@selector = Selector.new
|
||||
@channels = []
|
||||
@connected_channels = 0
|
||||
@connections = []
|
||||
@connected_connections = 0
|
||||
@resolver = resolver_type.new(self, @options)
|
||||
@resolver.on(:resolve, &method(:on_resolver_channel))
|
||||
@resolver.on(:resolve, &method(:on_resolver_connection))
|
||||
@resolver.on(:error, &method(:on_resolver_error))
|
||||
@resolver.on(:close, &method(:on_resolver_close))
|
||||
end
|
||||
|
||||
def running?
|
||||
!@channels.empty?
|
||||
!@connections.empty?
|
||||
end
|
||||
|
||||
def next_tick
|
||||
catch(:jump_tick) do
|
||||
@selector.select(next_timeout) do |monitor|
|
||||
if (channel = monitor.value)
|
||||
channel.call
|
||||
if (connection = monitor.value)
|
||||
connection.call
|
||||
end
|
||||
monitor.interests = channel.interests
|
||||
monitor.interests = connection.interests
|
||||
end
|
||||
end
|
||||
rescue TimeoutError => timeout_error
|
||||
@channels.each do |ch|
|
||||
@connections.each do |ch|
|
||||
ch.handle_timeout_error(timeout_error)
|
||||
end
|
||||
rescue Errno::ECONNRESET,
|
||||
Errno::ECONNABORTED,
|
||||
Errno::EPIPE => ex
|
||||
@channels.each do |ch|
|
||||
@connections.each do |ch|
|
||||
ch.emit(:error, ex)
|
||||
end
|
||||
end
|
||||
|
||||
def close
|
||||
@resolver.close unless @resolver.closed?
|
||||
@channels.each(&:close)
|
||||
next_tick until @channels.empty?
|
||||
@connections.each(&:close)
|
||||
next_tick until @connections.empty?
|
||||
end
|
||||
|
||||
def build_channel(uri, **options)
|
||||
channel = Channel.by(uri, @options.merge(options))
|
||||
resolve_channel(channel)
|
||||
channel.on(:open) do
|
||||
@connected_channels += 1
|
||||
@timeout.transition(:open) if @channels.size == @connected_channels
|
||||
def build_connection(uri, **options)
|
||||
connection = Connection.by(uri, @options.merge(options))
|
||||
resolve_connection(connection)
|
||||
connection.on(:open) do
|
||||
@connected_connections += 1
|
||||
@timeout.transition(:open) if @connections.size == @connected_connections
|
||||
end
|
||||
channel.on(:reset) do
|
||||
connection.on(:reset) do
|
||||
@timeout.transition(:idle)
|
||||
end
|
||||
channel.on(:unreachable) do
|
||||
@resolver.uncache(channel)
|
||||
resolve_channel(channel)
|
||||
connection.on(:unreachable) do
|
||||
@resolver.uncache(connection)
|
||||
resolve_connection(connection)
|
||||
end
|
||||
channel
|
||||
connection
|
||||
end
|
||||
|
||||
# opens a channel to the IP reachable through +uri+.
|
||||
# opens a connection to the IP reachable through +uri+.
|
||||
# Many hostnames are reachable through the same IP, so we try to
|
||||
# maximize pipelining by opening as few channels as possible.
|
||||
# maximize pipelining by opening as few connections as possible.
|
||||
#
|
||||
def find_channel(uri)
|
||||
@channels.find do |channel|
|
||||
channel.match?(uri)
|
||||
def find_connection(uri)
|
||||
@connections.find do |connection|
|
||||
connection.match?(uri)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def resolve_channel(channel)
|
||||
@channels << channel unless @channels.include?(channel)
|
||||
@resolver << channel
|
||||
def resolve_connection(connection)
|
||||
@connections << connection unless @connections.include?(connection)
|
||||
@resolver << connection
|
||||
return if @resolver.empty?
|
||||
|
||||
@_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName
|
||||
@ -92,25 +92,25 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
def on_resolver_channel(channel, addresses)
|
||||
found_channel = @channels.find do |ch|
|
||||
ch != channel && ch.mergeable?(addresses)
|
||||
def on_resolver_connection(connection, addresses)
|
||||
found_connection = @connections.find do |ch|
|
||||
ch != connection && ch.mergeable?(addresses)
|
||||
end
|
||||
return register_channel(channel) unless found_channel
|
||||
return register_connection(connection) unless found_connection
|
||||
|
||||
if found_channel.state == :open
|
||||
coalesce_channels(found_channel, channel)
|
||||
if found_connection.state == :open
|
||||
coalesce_connections(found_connection, connection)
|
||||
else
|
||||
found_channel.once(:open) do
|
||||
coalesce_channels(found_channel, channel)
|
||||
found_connection.once(:open) do
|
||||
coalesce_connections(found_connection, connection)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def on_resolver_error(ch, error)
|
||||
ch.emit(:error, error)
|
||||
# must remove channel by hand, hasn't been started yet
|
||||
unregister_channel(ch)
|
||||
# must remove connection by hand, hasn't been started yet
|
||||
unregister_connection(ch)
|
||||
end
|
||||
|
||||
def on_resolver_close
|
||||
@ -119,36 +119,36 @@ module HTTPX
|
||||
@resolver.close unless @resolver.closed?
|
||||
end
|
||||
|
||||
def register_channel(channel)
|
||||
monitor = if channel.state == :open
|
||||
def register_connection(connection)
|
||||
monitor = if connection.state == :open
|
||||
# if open, an IO was passed upstream, therefore
|
||||
# consider it connected already.
|
||||
@connected_channels += 1
|
||||
@selector.register(channel, :rw)
|
||||
@connected_connections += 1
|
||||
@selector.register(connection, :rw)
|
||||
else
|
||||
@selector.register(channel, :w)
|
||||
@selector.register(connection, :w)
|
||||
end
|
||||
monitor.value = channel
|
||||
channel.on(:close) do
|
||||
unregister_channel(channel)
|
||||
monitor.value = connection
|
||||
connection.on(:close) do
|
||||
unregister_connection(connection)
|
||||
end
|
||||
return if channel.state == :open
|
||||
return if connection.state == :open
|
||||
|
||||
@timeout.transition(:idle)
|
||||
end
|
||||
|
||||
def unregister_channel(channel)
|
||||
@channels.delete(channel)
|
||||
@selector.deregister(channel)
|
||||
@connected_channels -= 1
|
||||
def unregister_connection(connection)
|
||||
@connections.delete(connection)
|
||||
@selector.deregister(connection)
|
||||
@connected_connections -= 1
|
||||
end
|
||||
|
||||
def coalesce_channels(ch1, ch2)
|
||||
def coalesce_connections(ch1, ch2)
|
||||
if ch1.coalescable?(ch2)
|
||||
ch1.merge(ch2)
|
||||
@channels.delete(ch2)
|
||||
@connections.delete(ch2)
|
||||
else
|
||||
register_channel(ch2)
|
||||
register_connection(ch2)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -22,30 +22,30 @@ module HTTPX
|
||||
use_get: false,
|
||||
}.freeze
|
||||
|
||||
def_delegator :@channels, :empty?
|
||||
def_delegator :@connections, :empty?
|
||||
|
||||
def_delegators :@resolver_channel, :to_io, :call, :interests, :close
|
||||
def_delegators :@resolver_connection, :to_io, :call, :interests, :close
|
||||
|
||||
def initialize(connection, options)
|
||||
@connection = connection
|
||||
def initialize(pool, options)
|
||||
@pool = pool
|
||||
@options = Options.new(options)
|
||||
@resolver_options = Resolver::Options.new(DEFAULTS.merge(@options.resolver_options || {}))
|
||||
@_record_types = Hash.new { |types, host| types[host] = RECORD_TYPES.keys.dup }
|
||||
@queries = {}
|
||||
@requests = {}
|
||||
@channels = []
|
||||
@connections = []
|
||||
@uri = URI(@resolver_options.uri)
|
||||
@uri_addresses = nil
|
||||
end
|
||||
|
||||
def <<(channel)
|
||||
def <<(connection)
|
||||
@uri_addresses ||= Resolv.getaddresses(@uri.host)
|
||||
if @uri_addresses.empty?
|
||||
ex = ResolveError.new("Can't resolve #{channel.uri.host}")
|
||||
ex = ResolveError.new("Can't resolve #{connection.uri.host}")
|
||||
ex.set_backtrace(caller)
|
||||
emit(:error, channel, ex)
|
||||
emit(:error, connection, ex)
|
||||
else
|
||||
early_resolve(channel) || resolve(channel)
|
||||
early_resolve(connection) || resolve(connection)
|
||||
end
|
||||
end
|
||||
|
||||
@ -55,58 +55,58 @@ module HTTPX
|
||||
end
|
||||
|
||||
def closed?
|
||||
return true unless @resolver_channel
|
||||
return true unless @resolver_connection
|
||||
|
||||
resolver_channel.closed?
|
||||
resolver_connection.closed?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def resolver_channel
|
||||
@resolver_channel ||= find_channel(@uri, @options)
|
||||
def resolver_connection
|
||||
@resolver_connection ||= find_connection(@uri, @options)
|
||||
end
|
||||
|
||||
def resolve(channel = @channels.first, hostname = nil)
|
||||
return if @building_channel
|
||||
def resolve(connection = @connections.first, hostname = nil)
|
||||
return if @building_connection
|
||||
|
||||
hostname = hostname || @queries.key(channel) || channel.uri.host
|
||||
hostname = hostname || @queries.key(connection) || connection.uri.host
|
||||
type = @_record_types[hostname].first
|
||||
log(label: "resolver: ") { "query #{type} for #{hostname}" }
|
||||
begin
|
||||
request = build_request(hostname, type)
|
||||
@requests[request] = channel
|
||||
resolver_channel.send(request)
|
||||
@queries[hostname] = channel
|
||||
@channels << channel
|
||||
@requests[request] = connection
|
||||
resolver_connection.send(request)
|
||||
@queries[hostname] = connection
|
||||
@connections << connection
|
||||
rescue Resolv::DNS::EncodeError, JSON::JSONError => e
|
||||
emit_resolve_error(channel, hostname, e)
|
||||
emit_resolve_error(connection, hostname, e)
|
||||
end
|
||||
end
|
||||
|
||||
def find_channel(_request, **options)
|
||||
@connection.find_channel(@uri) || begin
|
||||
@building_channel = true
|
||||
channel = @connection.build_channel(@uri, **options)
|
||||
emit_addresses(channel, @uri_addresses)
|
||||
set_channel_callbacks(channel)
|
||||
@building_channel = false
|
||||
channel
|
||||
def find_connection(_request, **options)
|
||||
@pool.find_connection(@uri) || begin
|
||||
@building_connection = true
|
||||
connection = @pool.build_connection(@uri, **options)
|
||||
emit_addresses(connection, @uri_addresses)
|
||||
set_connection_callbacks(connection)
|
||||
@building_connection = false
|
||||
connection
|
||||
end
|
||||
end
|
||||
|
||||
def set_channel_callbacks(channel)
|
||||
channel.on(:response, &method(:on_response))
|
||||
channel.on(:promise, &method(:on_response))
|
||||
def set_connection_callbacks(connection)
|
||||
connection.on(:response, &method(:on_response))
|
||||
connection.on(:promise, &method(:on_response))
|
||||
end
|
||||
|
||||
def on_response(request, response)
|
||||
response.raise_for_status
|
||||
rescue Error => ex
|
||||
channel = @requests[request]
|
||||
hostname = @queries.key(channel)
|
||||
connection = @requests[request]
|
||||
hostname = @queries.key(connection)
|
||||
error = ResolveError.new("Can't resolve #{hostname}: #{ex.message}")
|
||||
error.set_backtrace(ex.backtrace)
|
||||
emit(:error, channel, error)
|
||||
emit(:error, connection, error)
|
||||
else
|
||||
parse(response)
|
||||
ensure
|
||||
@ -117,18 +117,18 @@ module HTTPX
|
||||
begin
|
||||
answers = decode_response_body(response)
|
||||
rescue Resolv::DNS::DecodeError, JSON::JSONError => e
|
||||
host, channel = @queries.first
|
||||
host, connection = @queries.first
|
||||
if @_record_types[host].empty?
|
||||
emit_resolve_error(channel, host, e)
|
||||
emit_resolve_error(connection, host, e)
|
||||
return
|
||||
end
|
||||
end
|
||||
if answers.empty?
|
||||
host, channel = @queries.first
|
||||
host, connection = @queries.first
|
||||
@_record_types[host].shift
|
||||
if @_record_types[host].empty?
|
||||
@_record_types.delete(host)
|
||||
emit_resolve_error(channel, host)
|
||||
emit_resolve_error(connection, host)
|
||||
return
|
||||
end
|
||||
else
|
||||
@ -138,9 +138,9 @@ module HTTPX
|
||||
if address.key?("alias")
|
||||
alias_address = answers[address["alias"]]
|
||||
if alias_address.nil?
|
||||
channel = @queries[hostname]
|
||||
connection = @queries[hostname]
|
||||
@queries.delete(address["name"])
|
||||
resolve(channel, address["alias"])
|
||||
resolve(connection, address["alias"])
|
||||
return # rubocop:disable Lint/NonLocalExitFromIterator
|
||||
else
|
||||
alias_address
|
||||
@ -152,15 +152,15 @@ module HTTPX
|
||||
next if addresses.empty?
|
||||
|
||||
hostname = hostname[0..-2] if hostname.end_with?(".")
|
||||
channel = @queries.delete(hostname)
|
||||
next unless channel # probably a retried query for which there's an answer
|
||||
connection = @queries.delete(hostname)
|
||||
next unless connection # probably a retried query for which there's an answer
|
||||
|
||||
@channels.delete(channel)
|
||||
@connections.delete(connection)
|
||||
Resolver.cached_lookup_set(hostname, addresses)
|
||||
emit_addresses(channel, addresses.map { |addr| addr["data"] })
|
||||
emit_addresses(connection, addresses.map { |addr| addr["data"] })
|
||||
end
|
||||
end
|
||||
return if @channels.empty?
|
||||
return if @connections.empty?
|
||||
|
||||
resolve
|
||||
end
|
||||
|
@ -33,7 +33,7 @@ module HTTPX
|
||||
|
||||
DNS_PORT = 53
|
||||
|
||||
def_delegator :@channels, :empty?
|
||||
def_delegator :@connections, :empty?
|
||||
|
||||
def initialize(_, options)
|
||||
@options = Options.new(options)
|
||||
@ -43,7 +43,7 @@ module HTTPX
|
||||
@_timeouts = Array(@resolver_options.timeouts)
|
||||
@timeouts = Hash.new { |timeouts, host| timeouts[host] = @_timeouts.dup }
|
||||
@_record_types = Hash.new { |types, host| types[host] = @resolver_options.record_types.dup }
|
||||
@channels = []
|
||||
@connections = []
|
||||
@queries = {}
|
||||
@read_buffer = Buffer.new(@resolver_options.packet_size)
|
||||
@write_buffer = Buffer.new(@resolver_options.packet_size)
|
||||
@ -81,8 +81,8 @@ module HTTPX
|
||||
if @ns_index < @nameserver.size
|
||||
transition(:idle)
|
||||
else
|
||||
@queries.each do |host, channel|
|
||||
emit_resolve_error(channel, host, e)
|
||||
@queries.each do |host, connection|
|
||||
emit_resolve_error(connection, host, e)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -97,15 +97,15 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
def <<(channel)
|
||||
return if early_resolve(channel)
|
||||
def <<(connection)
|
||||
return if early_resolve(connection)
|
||||
|
||||
if @nameserver.nil?
|
||||
ex = ResolveError.new("Can't resolve #{channel.uri.host}: no nameserver")
|
||||
ex = ResolveError.new("Can't resolve #{connection.uri.host}: no nameserver")
|
||||
ex.set_backtrace(caller)
|
||||
emit(:error, channel, ex)
|
||||
emit(:error, connection, ex)
|
||||
else
|
||||
@channels << channel
|
||||
@connections << connection
|
||||
end
|
||||
end
|
||||
|
||||
@ -127,30 +127,30 @@ module HTTPX
|
||||
return if @queries.empty?
|
||||
|
||||
loop_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timeout
|
||||
channels = []
|
||||
connections = []
|
||||
queries = {}
|
||||
while (query = @queries.shift)
|
||||
h, channel = query
|
||||
host = channel.uri.host
|
||||
h, connection = query
|
||||
host = connection.uri.host
|
||||
timeout = (@timeouts[host][0] -= loop_time)
|
||||
unless timeout.negative?
|
||||
queries[h] = channel
|
||||
queries[h] = connection
|
||||
next
|
||||
end
|
||||
@timeouts[host].shift
|
||||
if @timeouts[host].empty?
|
||||
@timeouts.delete(host)
|
||||
emit_resolve_error(channel, host)
|
||||
emit_resolve_error(connection, host)
|
||||
return
|
||||
else
|
||||
channels << channel
|
||||
connections << connection
|
||||
log(label: "resolver: ") do
|
||||
"timeout after #{prev_timeout}s, retry(#{timeouts.first}) #{host}..."
|
||||
end
|
||||
end
|
||||
end
|
||||
@queries = queries
|
||||
channels.each { |ch| resolve(ch) }
|
||||
connections.each { |ch| resolve(ch) }
|
||||
end
|
||||
|
||||
def dread(wsize = @read_buffer.limit)
|
||||
@ -185,57 +185,57 @@ module HTTPX
|
||||
begin
|
||||
addresses = Resolver.decode_dns_answer(buffer)
|
||||
rescue Resolv::DNS::DecodeError => e
|
||||
hostname, channel = @queries.first
|
||||
hostname, connection = @queries.first
|
||||
if @_record_types[hostname].empty?
|
||||
emit_resolve_error(channel, hostname, e)
|
||||
emit_resolve_error(connection, hostname, e)
|
||||
return
|
||||
end
|
||||
end
|
||||
|
||||
if addresses.empty?
|
||||
hostname, channel = @queries.first
|
||||
hostname, connection = @queries.first
|
||||
@_record_types[hostname].shift
|
||||
if @_record_types[hostname].empty?
|
||||
@_record_types.delete(hostname)
|
||||
emit_resolve_error(channel, hostname)
|
||||
emit_resolve_error(connection, hostname)
|
||||
return
|
||||
end
|
||||
else
|
||||
address = addresses.first
|
||||
channel = @queries.delete(address["name"])
|
||||
return unless channel # probably a retried query for which there's an answer
|
||||
connection = @queries.delete(address["name"])
|
||||
return unless connection # probably a retried query for which there's an answer
|
||||
|
||||
if address.key?("alias") # CNAME
|
||||
if early_resolve(channel, hostname: address["alias"])
|
||||
@channels.delete(channel)
|
||||
if early_resolve(connection, hostname: address["alias"])
|
||||
@connections.delete(connection)
|
||||
else
|
||||
resolve(channel, address["alias"])
|
||||
resolve(connection, address["alias"])
|
||||
@queries.delete(address["name"])
|
||||
return
|
||||
end
|
||||
else
|
||||
@channels.delete(channel)
|
||||
Resolver.cached_lookup_set(channel.uri.host, addresses)
|
||||
emit_addresses(channel, addresses.map { |addr| addr["data"] })
|
||||
@connections.delete(connection)
|
||||
Resolver.cached_lookup_set(connection.uri.host, addresses)
|
||||
emit_addresses(connection, addresses.map { |addr| addr["data"] })
|
||||
end
|
||||
end
|
||||
return emit(:close) if @channels.empty?
|
||||
return emit(:close) if @connections.empty?
|
||||
|
||||
resolve
|
||||
end
|
||||
|
||||
def resolve(channel = @channels.first, hostname = nil)
|
||||
raise Error, "no URI to resolve" unless channel
|
||||
def resolve(connection = @connections.first, hostname = nil)
|
||||
raise Error, "no URI to resolve" unless connection
|
||||
return unless @write_buffer.empty?
|
||||
|
||||
hostname = hostname || @queries.key(channel) || channel.uri.host
|
||||
@queries[hostname] = channel
|
||||
hostname = hostname || @queries.key(connection) || connection.uri.host
|
||||
@queries[hostname] = connection
|
||||
type = @_record_types[hostname].first
|
||||
log(label: "resolver: ") { "query #{type} for #{hostname}" }
|
||||
begin
|
||||
@write_buffer << Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type])
|
||||
rescue Resolv::DNS::EncodeError => e
|
||||
emit_resolve_error(channel, hostname, e)
|
||||
emit_resolve_error(connection, hostname, e)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -18,31 +18,31 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
def uncache(channel)
|
||||
hostname = hostname || @queries.key(channel) || channel.uri.host
|
||||
def uncache(connection)
|
||||
hostname = hostname || @queries.key(connection) || connection.uri.host
|
||||
Resolver.uncache(hostname)
|
||||
@_record_types[hostname].shift
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def emit_addresses(channel, addresses)
|
||||
def emit_addresses(connection, addresses)
|
||||
addresses.map! do |address|
|
||||
address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s)
|
||||
end
|
||||
log(label: "resolver: ") { "answer #{channel.uri.host}: #{addresses.inspect}" }
|
||||
channel.addresses = addresses
|
||||
emit(:resolve, channel, addresses)
|
||||
log(label: "resolver: ") { "answer #{connection.uri.host}: #{addresses.inspect}" }
|
||||
connection.addresses = addresses
|
||||
emit(:resolve, connection, addresses)
|
||||
end
|
||||
|
||||
def early_resolve(channel, hostname: channel.uri.host)
|
||||
addresses = channel.addresses ||
|
||||
def early_resolve(connection, hostname: connection.uri.host)
|
||||
addresses = connection.addresses ||
|
||||
ip_resolve(hostname) ||
|
||||
Resolver.cached_lookup(hostname) ||
|
||||
system_resolve(hostname)
|
||||
return unless addresses
|
||||
|
||||
emit_addresses(channel, addresses)
|
||||
emit_addresses(connection, addresses)
|
||||
end
|
||||
|
||||
def ip_resolve(hostname)
|
||||
@ -57,11 +57,11 @@ module HTTPX
|
||||
ips.map { |ip| IPAddr.new(ip) }
|
||||
end
|
||||
|
||||
def emit_resolve_error(channel, hostname, ex = nil)
|
||||
def emit_resolve_error(connection, hostname, ex = nil)
|
||||
message = ex ? ex.message : "Can't resolve #{hostname}"
|
||||
error = ResolveError.new(message)
|
||||
error.set_backtrace(ex ? ex.backtrace : caller)
|
||||
emit(:error, channel, error)
|
||||
emit(:error, connection, error)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -28,17 +28,17 @@ module HTTPX
|
||||
true
|
||||
end
|
||||
|
||||
def <<(channel)
|
||||
hostname = channel.uri.host
|
||||
addresses = channel.addresses ||
|
||||
def <<(connection)
|
||||
hostname = connection.uri.host
|
||||
addresses = connection.addresses ||
|
||||
ip_resolve(hostname) ||
|
||||
system_resolve(hostname) ||
|
||||
@resolver.getaddresses(hostname)
|
||||
return emit_resolve_error(channel, hostname) if addresses.empty?
|
||||
return emit_resolve_error(connection, hostname) if addresses.empty?
|
||||
|
||||
emit_addresses(channel, addresses)
|
||||
emit_addresses(connection, addresses)
|
||||
rescue Errno::EHOSTUNREACH, *RESOLV_ERRORS => e
|
||||
emit_resolve_error(channel, hostname, e)
|
||||
emit_resolve_error(connection, hostname, e)
|
||||
end
|
||||
|
||||
def uncache(*); end
|
||||
|
@ -25,20 +25,20 @@ class HTTPSResolverTest < Minitest::Test
|
||||
def test_parse_no_record
|
||||
@has_error = false
|
||||
resolver.on(:error) { @has_error = true }
|
||||
channel = build_channel("https://idontthinkthisexists.org/")
|
||||
resolver << channel
|
||||
resolver.queries["idontthinkthisexists.org"] = channel
|
||||
connection = build_connection("https://idontthinkthisexists.org/")
|
||||
resolver << connection
|
||||
resolver.queries["idontthinkthisexists.org"] = connection
|
||||
|
||||
# this is only here to drain
|
||||
write_buffer.clear
|
||||
resolver.parse(no_record)
|
||||
assert channel.addresses.nil?
|
||||
assert connection.addresses.nil?
|
||||
assert resolver.queries.key?("idontthinkthisexists.org")
|
||||
assert !@has_error, "resolver should still be able to resolve AAAA"
|
||||
# A type
|
||||
write_buffer.clear
|
||||
resolver.parse(no_record)
|
||||
assert channel.addresses.nil?
|
||||
assert connection.addresses.nil?
|
||||
assert resolver.queries.key?("idontthinkthisexists.org")
|
||||
assert @has_error, "resolver should have failed"
|
||||
end
|
||||
@ -49,26 +49,26 @@ class HTTPSResolverTest < Minitest::Test
|
||||
|
||||
private
|
||||
|
||||
def build_channel(*)
|
||||
channel = super
|
||||
connection.expect(:find_channel, channel, [URI::HTTP])
|
||||
channel
|
||||
def build_connection(*)
|
||||
connection = super
|
||||
pool.expect(:find_connection, connection, [URI::HTTP])
|
||||
connection
|
||||
end
|
||||
|
||||
def resolver(options = Options.new)
|
||||
@resolver ||= begin
|
||||
resolver = Resolver::HTTPS.new(connection, options)
|
||||
resolver = Resolver::HTTPS.new(pool, options)
|
||||
resolver.extend(ResolverHelpers::ResolverExtensions)
|
||||
resolver
|
||||
end
|
||||
end
|
||||
|
||||
def connection
|
||||
@connection ||= Minitest::Mock.new
|
||||
def pool
|
||||
@pool ||= Minitest::Mock.new
|
||||
end
|
||||
|
||||
def write_buffer
|
||||
resolver.instance_variable_get(:@resolver_channel)
|
||||
resolver.instance_variable_get(:@resolver_connection)
|
||||
.instance_variable_get(:@pending)
|
||||
end
|
||||
|
||||
|
@ -25,21 +25,21 @@ class NativeResolverTest < Minitest::Test
|
||||
def test_parse_no_record
|
||||
@has_error = false
|
||||
resolver.on(:error) { @has_error = true }
|
||||
channel = build_channel("https://idontthinkthisexists.org/")
|
||||
resolver << channel
|
||||
connection = build_connection("https://idontthinkthisexists.org/")
|
||||
resolver << connection
|
||||
resolver.resolve
|
||||
resolver.queries["idontthinkthisexists.org"] = channel
|
||||
resolver.queries["idontthinkthisexists.org"] = connection
|
||||
|
||||
# this is only here to drain
|
||||
write_buffer.clear
|
||||
resolver.parse(no_record)
|
||||
assert channel.addresses.nil?
|
||||
assert connection.addresses.nil?
|
||||
assert resolver.queries.key?("idontthinkthisexists.org")
|
||||
assert !@has_error, "resolver should still be able to resolve A"
|
||||
# A type
|
||||
write_buffer.clear
|
||||
resolver.parse(no_record)
|
||||
assert channel.addresses.nil?
|
||||
assert connection.addresses.nil?
|
||||
assert resolver.queries.key?("idontthinkthisexists.org")
|
||||
assert @has_error, "resolver should have failed"
|
||||
end
|
||||
|
@ -8,9 +8,9 @@ class SystemResolverTest < Minitest::Test
|
||||
include HTTPX
|
||||
|
||||
def test_append_external_name
|
||||
channel = build_channel("https://news.ycombinator.com")
|
||||
resolver << channel
|
||||
assert !channel.addresses.empty?, "name should have been resolved immediately"
|
||||
connection = build_connection("https://news.ycombinator.com")
|
||||
resolver << connection
|
||||
assert !connection.addresses.empty?, "name should have been resolved immediately"
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -9,23 +9,23 @@ module ResolverHelpers
|
||||
|
||||
def test_append_localhost
|
||||
ips = [IPAddr.new("127.0.0.1"), IPAddr.new("::1")]
|
||||
channel = build_channel("https://localhost")
|
||||
resolver << channel
|
||||
assert (channel.addresses - ips).empty?, "localhost interfaces should have been attributed"
|
||||
connection = build_connection("https://localhost")
|
||||
resolver << connection
|
||||
assert (connection.addresses - ips).empty?, "localhost interfaces should have been attributed"
|
||||
end
|
||||
|
||||
def test_append_ipv4
|
||||
ip = IPAddr.new("255.255.0.1")
|
||||
channel = build_channel("https://255.255.0.1")
|
||||
resolver << channel
|
||||
assert channel.addresses == [ip], "#{ip} should have been statically resolved"
|
||||
connection = build_connection("https://255.255.0.1")
|
||||
resolver << connection
|
||||
assert connection.addresses == [ip], "#{ip} should have been statically resolved"
|
||||
end
|
||||
|
||||
def test_append_ipv6
|
||||
ip = IPAddr.new("fe80::1")
|
||||
channel = build_channel("https://[fe80::1]")
|
||||
resolver << channel
|
||||
assert channel.addresses == [ip], "#{ip} should have been statically resolved"
|
||||
connection = build_connection("https://[fe80::1]")
|
||||
resolver << connection
|
||||
assert connection.addresses == [ip], "#{ip} should have been statically resolved"
|
||||
end
|
||||
|
||||
def __test_io_api
|
||||
@ -39,29 +39,28 @@ module ResolverHelpers
|
||||
def test_parse_a_record
|
||||
return unless resolver.respond_to?(:parse)
|
||||
|
||||
channel = build_channel("http://ipv4.tlund.se/")
|
||||
resolver.queries["ipv4.tlund.se"] = channel
|
||||
connection = build_connection("http://ipv4.tlund.se/")
|
||||
resolver.queries["ipv4.tlund.se"] = connection
|
||||
resolver.parse(a_record)
|
||||
assert channel.addresses.include?("193.15.228.195")
|
||||
assert connection.addresses.include?("193.15.228.195")
|
||||
end
|
||||
|
||||
def test_parse_aaaa_record
|
||||
return unless resolver.respond_to?(:parse)
|
||||
|
||||
channel = build_channel("http://ipv6.tlund.se/")
|
||||
resolver.queries["ipv6.tlund.se"] = channel
|
||||
connection = build_connection("http://ipv6.tlund.se/")
|
||||
resolver.queries["ipv6.tlund.se"] = connection
|
||||
resolver.parse(aaaa_record)
|
||||
assert channel.addresses.include?("2a00:801:f::195")
|
||||
assert connection.addresses.include?("2a00:801:f::195")
|
||||
end
|
||||
|
||||
def test_parse_cname_record
|
||||
return unless resolver.respond_to?(:parse)
|
||||
|
||||
channel = build_channel("http://ipv4c.tlund.se/")
|
||||
resolver.queries["ipv4c.tlund.se"] = channel
|
||||
# require "pry-byebug"; binding.pry
|
||||
connection = build_connection("http://ipv4c.tlund.se/")
|
||||
resolver.queries["ipv4c.tlund.se"] = connection
|
||||
resolver.parse(cname_record)
|
||||
assert channel.addresses.nil?
|
||||
assert connection.addresses.nil?
|
||||
assert !resolver.queries.key?("ipv4c.tlund.se")
|
||||
assert resolver.queries.key?("ipv4.tlund.se")
|
||||
end
|
||||
@ -69,9 +68,9 @@ module ResolverHelpers
|
||||
def test_append_hostname
|
||||
return unless resolver.respond_to?(:resolve)
|
||||
|
||||
channel = build_channel("https://news.ycombinator.com")
|
||||
resolver << channel
|
||||
assert channel.addresses.nil?, "there should be no direct IP"
|
||||
connection = build_connection("https://news.ycombinator.com")
|
||||
resolver << connection
|
||||
assert connection.addresses.nil?, "there should be no direct IP"
|
||||
resolver.resolve
|
||||
assert !write_buffer.empty?, "there should be a DNS query ready to be sent"
|
||||
end
|
||||
@ -83,10 +82,10 @@ module ResolverHelpers
|
||||
HTTPX::Resolver.purge_lookup_cache
|
||||
end
|
||||
|
||||
def build_channel(uri)
|
||||
channel = HTTPX::Channel.by(URI(uri), HTTPX::Options.new)
|
||||
channel.extend(ChannelExtensions)
|
||||
channel
|
||||
def build_connection(uri)
|
||||
connection = HTTPX::Connection.by(URI(uri), HTTPX::Options.new)
|
||||
connection.extend(ConnectionExtensions)
|
||||
connection
|
||||
end
|
||||
|
||||
def a_record
|
||||
@ -140,7 +139,7 @@ module ResolverHelpers
|
||||
end
|
||||
end
|
||||
|
||||
module ChannelExtensions
|
||||
module ConnectionExtensions
|
||||
attr_reader :addresses
|
||||
|
||||
def addresses=(addrs)
|
||||
|
Loading…
x
Reference in New Issue
Block a user