Non-Blocking DNS

This commit is contained in:
HoneyryderChuck 2018-08-29 10:23:08 +00:00
parent c9abd0abf7
commit dcd2709e41
33 changed files with 1269 additions and 62 deletions

View File

@ -4,6 +4,8 @@ AllCops:
TargetRubyVersion: 2.3
DisplayCopNames: true
Include:
- lib/**/*.rb
- test/**/*.rb
- Rakefile
- palanca.gemspec
Exclude:
@ -16,6 +18,8 @@ AllCops:
- 'lib/httpx/extensions.rb'
Metrics/LineLength:
Exclude:
- 'test/resolver/native_test.rb'
Max: 120
Metrics/MethodLength:

View File

@ -7,7 +7,7 @@ gemspec
gem "hanna-nouveau", require: false
gem "rake", "~> 12.3"
gem "rubocop", "~> 0.55.0", require: false
gem "rubocop", "~> 0.57.0", require: false
gem "simplecov", require: false
platform :mri do

View File

@ -90,6 +90,10 @@ It means that it loads the bare minimum to perform requests, and the user has to
It also means that it ships with the minimum amount of dependencies.
### DNS-over-HTTPS
`HTTPX` ships with custom DNS resolver implementations, including a DNS-over-HTTPS resolver.
## Easy to test
The test suite runs against [httpbin proxied over nghttp2](https://nghttp2.org/httpbin/), so there are no mocking/stubbing false positives. The test suite uses [minitest](https://github.com/seattlerb/minitest), but its matchers usage is (almost) limited to `#assert` (`assert` is all you need).

View File

@ -22,6 +22,8 @@ module HTTPX
def_delegator :@buffer, :replace
attr_reader :limit
def initialize(limit)
@buffer = "".b
@limit = limit

View File

@ -50,8 +50,7 @@ module HTTPX
raise Error, "#{uri}: #{uri.scheme}: unrecognized channel"
end
end
io = IO.registry(type).new(uri, options)
new(io, options)
new(type, uri, options)
end
end
@ -59,8 +58,12 @@ module HTTPX
def_delegator :@write_buffer, :empty?
def initialize(io, options)
@io = io
attr_reader :uri
def initialize(type, uri, options)
@type = type
@uri = uri
@hostnames = [@uri.host]
@options = Options.new(options)
@window_size = @options.window_size
@read_buffer = Buffer.new(BUFFER_SIZE)
@ -70,17 +73,28 @@ module HTTPX
on(:error) { |ex| on_error(ex) }
end
def addresses=(addrs)
@io = IO.registry(@type).new(@uri, addrs, @options)
end
def mergeable?(channel, addresses)
return false if @state == :closing || !@io
!(@io.addresses & addresses).empty? &&
@uri.port == channel.uri.port &&
@uri.scheme == channel.uri.scheme
end
def merge(channel)
@hostnames += channel.instance_variable_get(:@hostnames)
@pending += channel.instance_variable_get(:@pending)
end
def match?(uri)
return false if @state == :closing
ips = begin
Resolv.getaddresses(uri.host)
rescue StandardError
[uri.host]
end
ips.include?(@io.ip) &&
uri.port == @io.port &&
uri.scheme == @io.scheme
@hostnames.include?(uri.host) &&
uri.port == @uri.port &&
uri.scheme == @uri.scheme
end
def interests
@ -114,7 +128,9 @@ module HTTPX
end
def send(request, **args)
if @parser && !@write_buffer.full?
if @error_response
emit(:response, request, @error_response)
elsif @parser && !@write_buffer.full?
parser.send(request, **args)
else
@pending << [request, args]
@ -220,7 +236,8 @@ module HTTPX
def transition(nextstate)
case nextstate
# when :idle
when :idle
@error_response = nil
when :open
return if @state == :closed
@io.connect
@ -235,9 +252,14 @@ module HTTPX
@read_buffer.clear
end
@state = nextstate
rescue Errno::EHOSTUNREACH
# at this point, all addresses from the IO object have failed
reset
emit(:unreachable)
throw(:jump_tick)
rescue Errno::ECONNREFUSED,
Errno::ENETUNREACH,
Errno::EADDRNOTAVAIL,
Errno::EHOSTUNREACH,
OpenSSL::SSL::SSLError => e
# connect errors, exit gracefully
handle_error(e)
@ -251,10 +273,10 @@ module HTTPX
end
def handle_error(e)
parser.handle_error(e) if parser.respond_to?(:handle_error)
response = ErrorResponse.new(e, @options)
parser.handle_error(e) if @parser && parser.respond_to?(:handle_error)
@error_response = ErrorResponse.new(e, @options)
@pending.each do |request, _|
emit(:response, request, response)
emit(:response, request, @error_response)
end
end
end

View File

@ -2,14 +2,21 @@
require "httpx/selector"
require "httpx/channel"
require "httpx/resolver"
module HTTPX
class Connection
def initialize(options)
@options = Options.new(options)
@timeout = options.timeout
resolver_type = @options.resolver_class
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
@selector = Selector.new
@channels = []
@resolver = resolver_type.new(self, @options)
@resolver.on(:resolve, &method(:on_resolver_channel))
@resolver.on(:error, &method(:on_resolver_error))
@resolver.on(:close, &method(:on_resolver_close))
end
def running?
@ -17,12 +24,13 @@ module HTTPX
end
def next_tick
timeout = @timeout.timeout
@selector.select(timeout) do |monitor|
if (channel = monitor.value)
channel.call
catch(:jump_tick) do
@selector.select(next_timeout) do |monitor|
if (channel = monitor.value)
channel.call
end
monitor.interests = channel.interests
end
monitor.interests = channel.interests
end
rescue TimeoutError,
Errno::ECONNRESET,
@ -40,7 +48,11 @@ module HTTPX
def build_channel(uri, **options)
channel = Channel.by(uri, @options.merge(options))
register_channel(channel)
resolve_channel(channel)
channel.once(:unreachable) do
@resolver.uncache(channel)
resolve_channel(channel)
end
channel
end
@ -56,14 +68,58 @@ module HTTPX
private
def resolve_channel(channel)
@channels << channel unless @channels.include?(channel)
@resolver << channel
return if @resolver.empty?
@_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName
monitor = @selector.register(@resolver, :w)
monitor.value = @resolver
monitor
end
end
def on_resolver_channel(channel, addresses)
found_channel = @channels.find do |ch|
next if ch == channel
ch.mergeable?(channel, addresses)
end
if found_channel
found_channel.merge(channel)
else
register_channel(channel)
end
end
def on_resolver_error(ch, error)
ch.emit(:error, error)
# must remove channel by hand, hasn't been started yet
unregister_channel(ch)
end
def on_resolver_close
@selector.deregister(@resolver)
@_resolver_monitor = nil
@resolver.close unless @resolver.closed?
end
def register_channel(channel)
monitor = @selector.register(channel, :w)
monitor.value = channel
channel.on(:close) do
@channels.delete(channel)
@selector.deregister(channel)
unregister_channel(channel)
end
@channels << channel
end
def unregister_channel(channel)
@channels.delete(channel)
@selector.deregister(channel)
end
def next_timeout
timeout = @timeout.timeout # force log time
return (@resolver.timeout || timeout) unless @resolver.closed?
timeout
end
end
end

View File

@ -5,6 +5,8 @@ module HTTPX
TimeoutError = Class.new(Error)
ResolveError = Class.new(Error)
HTTPError = Class.new(Error) do
attr_reader :response

View File

@ -4,12 +4,14 @@ require "socket"
require "httpx/io/tcp"
require "httpx/io/ssl"
require "httpx/io/unix"
require "httpx/io/udp"
module HTTPX
module IO
extend Registry
register "tcp", TCP
register "ssl", SSL
register "udp", UDP
register "unix", HTTPX::UNIX
end
end

View File

@ -10,7 +10,7 @@ module HTTPX
{}
end
def initialize(_, options)
def initialize(_, _, options)
@ctx = OpenSSL::SSL::SSLContext.new
ctx_options = TLS_OPTIONS.merge(options.ssl)
@ctx.set_params(ctx_options) unless ctx_options.empty?
@ -103,5 +103,17 @@ module HTTPX
end
do_transition(nextstate)
end
def log_transition_state(nextstate)
return super unless nextstate == :negotiated
server_cert = @io.peer_cert
"SSL connection using #{@io.ssl_version} / #{@io.cipher.first}\n" \
"ALPN, server accepted to use #{protocol}\n" \
"Server certificate:\n" \
" subject: #{server_cert.subject}\n" \
" start date: #{server_cert.not_before}\n" \
" start date: #{server_cert.not_after}\n" \
" issuer: #{server_cert.issuer}"
end
end
end

View File

@ -9,18 +9,22 @@ module HTTPX
attr_reader :ip, :port
attr_reader :addresses
alias_method :host, :ip
def initialize(uri, options)
def initialize(uri, addresses, options)
@state = :idle
@hostname = uri.host
@addresses = addresses
@ip_index = @addresses.size - 1
@options = Options.new(options)
@fallback_protocol = @options.fallback_protocol
@port = uri.port
if @options.io
@io = case @options.io
when Hash
@ip = Resolv.getaddress(@hostname)
@ip = @addresses[@ip_index]
@options.io[@ip] || @options.io["#{@ip}:#{@port}"]
else
@ip = @hostname
@ -31,7 +35,7 @@ module HTTPX
@state = :connected
end
else
@ip = Resolv.getaddress(@hostname)
@ip = @addresses[@ip_index]
end
@io ||= build_socket
end
@ -55,10 +59,14 @@ module HTTPX
transition(:idle)
@io = build_socket
end
@io.connect_nonblock(Socket.sockaddr_in(@port, @ip))
@io.connect_nonblock(Socket.sockaddr_in(@port, @ip.to_s))
rescue Errno::EISCONN
end
transition(:connected)
rescue Errno::EHOSTUNREACH => e
raise e if @ip_index <= 0
@ip_index -= 1
retry
rescue Errno::EINPROGRESS,
Errno::EALREADY,
::IO::WaitReadable
@ -125,8 +133,7 @@ module HTTPX
private
def build_socket
addr = IPAddr.new(@ip)
Socket.new(addr.family, :STREAM, 0)
Socket.new(@ip.family, :STREAM, 0)
end
def transition(nextstate)
@ -141,8 +148,19 @@ module HTTPX
end
def do_transition(nextstate)
log(level: 1, label: "#{inspect}: ") { nextstate.to_s }
log(level: 1) do
log_transition_state(nextstate)
end
@state = nextstate
end
def log_transition_state(nextstate)
case nextstate
when :connected
"Connected to #{@hostname} (#{@ip}) port #{@port} (##{@io.fileno})"
else
"#{@ip}:#{@port} #{@state} -> #{nextstate}"
end
end
end
end

56
lib/httpx/io/udp.rb Normal file
View File

@ -0,0 +1,56 @@
# frozen_string_literal: true
require "socket"
require "ipaddr"
module HTTPX
class UDP
include Loggable
def initialize(uri, _, _)
ip = IPAddr.new(uri.host)
@host = ip.to_s
@port = uri.port
@io = UDPSocket.new(ip.family)
end
def to_io
@io.to_io
end
def connect; end
def connected?
true
end
def close
@io.close
end
def write(buffer)
siz = @io.send(buffer, 0, @host, @port)
buffer.slice!(0, siz)
siz
end
if RUBY_VERSION < "2.3"
def read(size, buffer)
data, _ = @io.recvfrom_nonblock(size)
buffer.replace(data)
buffer.bytesize
rescue ::IO::WaitReadable
0
rescue IOError
end
else
def read(size, buffer)
ret = @io.recvfrom_nonblock(size, 0, buffer, exception: false)
return 0 if ret == :wait_readable
return if ret.nil?
buffer.bytesize
rescue IOError
end
end
end
end

View File

@ -1,3 +1,5 @@
# frozen_string_literal: true
require "forwardable"
module HTTPX
@ -6,8 +8,9 @@ module HTTPX
def_delegator :@uri, :port, :scheme
def initialize(uri, options)
def initialize(uri, addresses, options)
@uri = uri
@addresses = addresses
@state = :idle
@options = Options.new(options)
@path = @options.transport_options[:path]

View File

@ -55,6 +55,7 @@ module HTTPX
:response_body_class => Class.new(Response::Body),
:transport => nil,
:transport_options => nil,
:resolver_class => (ENV["HTTPX_RESOLVER"] || :native).to_sym,
}
defaults.merge!(options)
@ -86,7 +87,7 @@ module HTTPX
def_option(:transport) do |tr|
transport = tr.to_s
raise Error, "#{transport} is an unsupported transport type" unless IO.registry.keys.include?(transport)
raise Error, "#{transport} is an unsupported transport type" unless IO.registry.key?(transport)
self.transport = transport
end
@ -94,7 +95,7 @@ module HTTPX
params form json body
follow ssl http2_settings
request_class response_class headers_class request_body_class response_body_class
io fallback_protocol debug debug_level transport_options
io fallback_protocol debug debug_level transport_options resolver_class resolver_options
].each do |method_name|
def_option(method_name)
end

View File

@ -1,5 +1,7 @@
# frozen_string_literal: true
require "resolv"
require "ipaddr"
require "forwardable"
module HTTPX
@ -60,10 +62,9 @@ module HTTPX
parameters = Parameters.new(**proxy)
uri = parameters.uri
log { "proxy: #{uri}" }
io = TCP.new(uri, @options)
proxy_type = Parameters.registry(parameters.uri.scheme)
channel = proxy_type.new(io, parameters, @options.merge(options), &method(:on_response))
@connection.__send__(:register_channel, channel)
channel = proxy_type.new("tcp", uri, parameters, @options.merge(options), &method(:on_response))
@connection.__send__(:resolve_channel, channel)
channel
end
@ -102,8 +103,8 @@ module HTTPX
end
class ProxyChannel < Channel
def initialize(io, parameters, options, &blk)
super(io, options, &blk)
def initialize(type, uri, parameters, options, &blk)
super(type, uri, options, &blk)
@parameters = parameters
end
@ -144,7 +145,7 @@ module HTTPX
class ProxySSL < SSL
def initialize(tcp, request_uri, options)
@io = tcp.to_io
super(tcp, options)
super(request_uri, tcp.addresses, options)
@hostname = request_uri.host
@state = :connected
end

100
lib/httpx/resolver.rb Normal file
View File

@ -0,0 +1,100 @@
# frozen_string_literal: true
module HTTPX
module Resolver
autoload :ResolverMixin, "httpx/resolver/resolver_mixin"
autoload :System, "httpx/resolver/system"
autoload :Native, "httpx/resolver/native"
autoload :HTTPS, "httpx/resolver/https"
extend Registry
register :system, :System
register :native, :Native
register :https, :HTTPS
@lookup_mutex = Mutex.new
@lookups = {}
@identifier_mutex = Mutex.new
@identifier = 1
module_function
def cached_lookup(hostname)
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
@lookup_mutex.synchronize do
lookup(hostname, now)
end
end
def cached_lookup_set(hostname, entries)
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
entries.each do |entry|
entry["TTL"] += now
end
@lookup_mutex.synchronize do
@lookups[hostname] = entries
end
end
def uncache(hostname)
@lookup_mutex.synchronize do
@lookups.delete(hostname)
end
end
# do not use directly!
def lookup(hostname, ttl)
return unless @lookups.key?(hostname)
@lookups[hostname] = @lookups[hostname].select do |address|
address["TTL"] > ttl
end
ips = @lookups[hostname].flat_map do |address|
if address.key?("alias")
lookup(address["alias"], ttl)
else
address["data"]
end
end
ips unless ips.empty?
end
def generate_id
@identifier_mutex.synchronize { @identifier = (@identifier + 1) & 0xFFFF }
end
def encode_dns_query(hostname, type: Resolv::DNS::Resource::IN::A)
Resolv::DNS::Message.new.tap do |query|
query.id = generate_id
query.rd = 1
query.add_question(hostname, type)
end.encode
end
def decode_dns_answer(payload)
message = Resolv::DNS::Message.decode(payload)
addresses = []
message.each_answer do |question, _, value|
case value
when Resolv::DNS::Resource::IN::CNAME
addresses << {
"name" => question.to_s,
"TTL" => value.ttl,
"alias" => value.name.to_s,
}
when Resolv::DNS::Resource::IN::A,
Resolv::DNS::Resource::IN::AAAA
addresses << {
"name" => question.to_s,
"TTL" => value.ttl,
"data" => value.address.to_s,
}
end
end
addresses
end
end
end
require "httpx/resolver/options"

181
lib/httpx/resolver/https.rb Normal file
View File

@ -0,0 +1,181 @@
# frozen_string_literal: true
require "uri"
require "cgi"
require "forwardable"
module HTTPX
class Resolver::HTTPS
extend Forwardable
include Resolver::ResolverMixin
NAMESERVER = "https://1.1.1.1/dns-query"
RECORD_TYPES = {
"A" => Resolv::DNS::Resource::IN::A,
"AAAA" => Resolv::DNS::Resource::IN::AAAA,
}.freeze
DEFAULTS = {
uri: NAMESERVER,
use_get: false,
}.freeze
def_delegator :@channels, :empty?
def_delegators :@resolver_channel, :to_io, :call, :interests, :close
def initialize(connection, options)
@connection = connection
@options = Options.new(options)
@resolver_options = Resolver::Options.new(DEFAULTS.merge(@options.resolver_options || {}))
@_record_types = Hash.new { |types, host| types[host] = RECORD_TYPES.keys.dup }
@queries = {}
@requests = {}
@channels = []
@uri = URI(@resolver_options.uri)
@uri_addresses = nil
end
def <<(channel)
@uri_addresses ||= Resolv.getaddresses(@uri.host)
if @uri_addresses.empty?
ex = ResolveError.new("Can't resolve #{channel.uri.host}")
ex.set_backtrace(caller)
emit(:error, channel, ex)
else
early_resolve(channel) || resolve(channel)
end
end
def timeout
timeout = @options.timeout
timeout.timeout
end
def closed?
return true unless @resolver_channel
resolver_channel.closed?
end
private
def resolver_channel
@resolver_channel ||= find_channel(@uri, @options)
end
def resolve(channel = @channels.first, hostname = nil)
return if @building_channel
hostname = hostname || @queries.key(channel) || channel.uri.host
type = @_record_types[hostname].shift
log(label: "resolver: ") { "query #{type} for #{hostname}" }
request = build_request(hostname, type)
@requests[request] = channel
resolver_channel.send(request)
@queries[hostname] = channel
@channels << channel
end
def find_channel(_request, **options)
@connection.find_channel(@uri) || begin
@building_channel = true
channel = @connection.build_channel(@uri, **options)
emit_addresses(channel, @uri_addresses)
set_channel_callbacks(channel)
@building_channel = false
channel
end
end
def set_channel_callbacks(channel)
channel.on(:response, &method(:on_response))
channel.on(:promise, &method(:on_response))
end
def on_response(request, response)
response.raise_for_status
rescue Error => ex
channel = @requests[request]
hostname = @queries.key(channel)
error = ResolveError.new("Can't resolve #{hostname}: #{ex.message}")
error.set_backtrace(ex.backtrace)
emit(:error, channel, error)
else
parse(response)
ensure
@requests.delete(request)
end
def parse(response)
answers = decode_response_body(response)
if answers.empty?
host, channel = @queries.first
if @_record_types[host].empty?
emit_resolve_error(channel, host)
return
end
else
answers = answers.group_by { |answer| answer["name"] }
answers.each do |hostname, addresses|
addresses = addresses.flat_map do |address|
if address.key?("alias")
alias_address = answers[address["alias"]]
if alias_address.nil?
channel = @queries[hostname]
@queries.delete(address["name"])
resolve(channel, address["alias"])
return # rubocop:disable Lint/NonLocalExitFromIterator
else
alias_address
end
else
address
end
end.compact
next if addresses.empty?
hostname = hostname[0..-2] if hostname.end_with?(".")
channel = @queries.delete(hostname)
next unless channel # probably a retried query for which there's an answer
@channels.delete(channel)
Resolver.cached_lookup_set(hostname, addresses)
emit_addresses(channel, addresses.map { |addr| addr["data"] })
end
end
return emit(:close) if @channels.empty?
resolve
end
def build_request(hostname, type)
uri = @uri.dup
rklass = @options.request_class
if @resolver_options.use_get
params = URI.decode_www_form(uri.query.to_s)
params << ["type", type]
params << ["name", CGI.escape(hostname)]
uri.query = URI.encode_www_form(params)
request = rklass.new("GET", uri, @options)
else
payload = Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type])
request = rklass.new("POST", uri, @options.merge(body: [payload]))
request.headers["content-type"] = "application/dns-message"
request.headers["accept"] = "application/dns-message"
end
request
end
def decode_response_body(response)
case response.headers["content-type"]
when "application/dns-json",
"application/json",
%r{^application\/x\-javascript} # because google...
payload = JSON.parse(response.to_s)
payload["Answer"]
when "application/dns-udpwireformat",
"application/dns-message"
Resolver.decode_dns_answer(response.to_s)
# TODO: what about the rest?
end
end
end
end

View File

@ -0,0 +1,247 @@
# frozen_string_literal: true
require "forwardable"
require "resolv"
module HTTPX
class Resolver::Native
extend Forwardable
include Resolver::ResolverMixin
RESOLVE_TIMEOUT = 5
RECORD_TYPES = {
"A" => Resolv::DNS::Resource::IN::A,
"AAAA" => Resolv::DNS::Resource::IN::AAAA,
}.freeze
DEFAULTS = if RUBY_VERSION < "2.2"
{
**Resolv::DNS::Config.default_config_hash,
packet_size: 512,
timeouts: RESOLVE_TIMEOUT,
record_types: RECORD_TYPES.keys,
}.freeze
else
{
nameserver: nil,
**Resolv::DNS::Config.default_config_hash,
packet_size: 512,
timeouts: RESOLVE_TIMEOUT,
record_types: RECORD_TYPES.keys,
}.freeze
end
DNS_PORT = 53
def_delegator :@channels, :empty?
def initialize(_, options)
@options = Options.new(options)
@ns_index = 0
@resolver_options = Resolver::Options.new(DEFAULTS.merge(@options.resolver_options || {}))
@nameserver = @resolver_options.nameserver
@_timeouts = Array(@resolver_options.timeouts)
@timeouts = Hash.new { |timeouts, host| timeouts[host] = @_timeouts.dup }
@_record_types = Hash.new { |types, host| types[host] = @resolver_options.record_types.dup }
@channels = []
@queries = {}
@read_buffer = Buffer.new(@resolver_options.packet_size)
@write_buffer = Buffer.new(@resolver_options.packet_size)
@state = :idle
end
def close
transition(:closed)
end
def closed?
@state == :closed
end
def to_io
case @state
when :idle
transition(:open)
when :closed
transition(:idle)
transition(:open)
end
resolve if @queries.empty?
@io.to_io
end
def call
case @state
when :open
consume
end
nil
rescue Errno::EHOSTUNREACH => e
@ns_index += 1
if @ns_index < @nameserver.size
transition(:idle)
else
ex = ResolvError.new(e.message)
ex.set_backtrace(e.backtrace)
raise ex
end
end
def interests
readable = !@read_buffer.full?
writable = !@write_buffer.empty?
if readable
writable ? :rw : :r
else
writable ? :w : :r
end
end
def <<(channel)
return if early_resolve(channel)
if @nameserver.nil?
ex = ResolveError.new("Can't resolve #{channel.uri.host}")
ex.set_backtrace(caller)
emit(:error, channel, ex)
else
@channels << channel
end
end
def timeout
@start_timeout = Process.clock_gettime(Process::CLOCK_MONOTONIC)
hosts = @queries.keys
@timeouts.values_at(*hosts).reject(&:empty?).map(&:first).min
end
private
def consume
dread
do_retry
dwrite
end
def do_retry
return if @queries.empty?
loop_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @start_timeout
channels = []
queries = {}
while (query = @queries.shift)
h, channel = query
host = channel.uri.host
timeout = (@timeouts[host][0] -= loop_time)
unless timeout.negative?
queries[h] = channel
next
end
@timeouts[host].shift
if @timeouts[host].empty?
@timeouts.delete(host)
emit_resolve_error(channel, host)
return
else
channels << channel
log(label: "resolver: ") do
"timeout after #{prev_timeout}s, retry(#{timeouts.first}) #{host}..."
end
end
end
@queries = queries
channels.each { |ch| resolve(ch) }
end
def dread(wsize = @read_buffer.limit)
loop do
siz = @io.read(wsize, @read_buffer)
unless siz
emit(:close)
return
end
return if siz.zero?
log(label: "resolver: ") { "READ: #{siz} bytes..." }
parse(@read_buffer.to_s)
end
end
def dwrite
loop do
return if @write_buffer.empty?
siz = @io.write(@write_buffer)
unless siz
emit(:close)
return
end
log(label: "resolver: ") { "WRITE: #{siz} bytes..." }
return if siz.zero?
end
end
def parse(buffer)
addresses = Resolver.decode_dns_answer(buffer)
if addresses.empty?
hostname, channel = @queries.first
if @_record_types[hostname].empty?
emit_resolve_error(channel, hostname)
return
end
else
address = addresses.first
channel = @queries.delete(address["name"])
return unless channel # probably a retried query for which there's an answer
if address.key?("alias") # CNAME
resolve(channel, address["alias"])
@queries.delete(address["name"])
return
else
@channels.delete(channel)
Resolver.cached_lookup_set(channel.uri.host, addresses)
emit_addresses(channel, addresses.map { |addr| addr["data"] })
end
end
return emit(:close) if @channels.empty?
resolve
end
def resolve(channel = @channels.first, hostname = nil)
raise Error, "no URI to resolve" unless channel
return unless @write_buffer.empty?
hostname = hostname || @queries.key(channel) || channel.uri.host
@queries[hostname] = channel
type = @_record_types[hostname].shift
log(label: "resolver: ") { "query #{type} for #{hostname}" }
@write_buffer << Resolver.encode_dns_query(hostname, type: RECORD_TYPES[type])
end
def build_socket
return if @io
ip, port = @nameserver[@ns_index]
port ||= DNS_PORT
uri = URI::Generic.build(scheme: "udp", port: port)
uri.hostname = ip
type = IO.registry(uri.scheme)
log(label: "resolver: ") { "server: #{uri}..." }
@io = type.new(uri, [IPAddr.new(ip)], @options)
end
def transition(nextstate)
case nextstate
when :idle
if @io
@io.close
@io = nil
end
@timeouts.clear
when :open
return unless @state == :idle
build_socket
@io.connect
return unless @io.connected?
when :closed
return unless @state == :open
@io.close if @io
end
@state = nextstate
end
end
end

View File

@ -0,0 +1,25 @@
# frozen_string_literal: true
module HTTPX
class Resolver::Options
def initialize(options = {})
@options = options
end
def method_missing(m, *args, &block)
if @options.key?(m)
@options[m]
else
super
end
end
def respond_to_missing?(m)
@options.key?(m) || super
end
def to_h
@options
end
end
end

View File

@ -0,0 +1,62 @@
# frozen_string_literal: true
require "resolv"
require "ipaddr"
module HTTPX
module Resolver
module ResolverMixin
include Callbacks
include Loggable
CHECK_IF_IP = proc do |name|
begin
IPAddr.new(name)
true
rescue ArgumentError
false
end
end
def uncache(channel)
hostname = hostname || @queries.key(channel) || channel.uri.host
Resolver.uncache(hostname)
end
private
def emit_addresses(channel, addresses)
addresses.map! do |address|
address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s)
end
log(label: "resolver: ") { "answer #{channel.uri.host}: #{addresses.inspect}" }
channel.addresses = addresses
emit(:resolve, channel, addresses)
end
def early_resolve(channel)
hostname = channel.uri.host
addresses = ip_resolve(hostname) || Resolver.cached_lookup(hostname) || system_resolve(hostname)
return unless addresses
emit_addresses(channel, addresses)
end
def ip_resolve(hostname)
[hostname] if CHECK_IF_IP[hostname]
end
def system_resolve(hostname)
@system_resolver ||= Resolv::Hosts.new
ips = @system_resolver.getaddresses(hostname)
return if ips.empty?
ips.map { |ip| IPAddr.new(ip) }
end
def emit_resolve_error(channel, hostname)
error = ResolveError.new("Can't resolve #{hostname}")
error.set_backtrace(caller)
emit(:error, channel, error)
end
end
end
end

View File

@ -0,0 +1,34 @@
# frozen_string_literal: true
require "forwardable"
require "resolv"
module HTTPX
class Resolver::System
include Resolver::ResolverMixin
def initialize(_, options)
@options = Options.new(options)
roptions = @options.resolver_options
@state = :idle
@resolver = Resolv::DNS.new(roptions.nil? ? nil : roptions)
@resolver.timeouts = roptions[:timeouts] if roptions
end
def closed?
true
end
def empty?
true
end
def <<(channel)
hostname = channel.uri.host
addresses = ip_resolve(hostname) || system_resolve(hostname) || @resolver.getaddresses(hostname)
addresses.empty? ? emit_resolve_error(channel, hostname) : emit_addresses(channel, addresses)
end
def uncache(*); end
end
end

View File

@ -108,7 +108,14 @@ module HTTPX
def to_s
rewind
return @buffer.read.force_encoding(@encoding) if @buffer
if @buffer
content = @buffer.read
begin
return content.force_encoding(@encoding)
rescue ArgumentError # ex: unknown encoding name - utf
return content
end
end
""
ensure
close

View File

@ -1,6 +1,12 @@
# frozen_string_literal: true
class HTTPX::Selector
READABLE = %i[rw r].freeze
WRITABLE = %i[rw w].freeze
private_constant :READABLE
private_constant :WRITABLE
#
# I/O monitor
#
@ -15,11 +21,11 @@ class HTTPX::Selector
end
def readable?
@interests == :rw || @interests == :r
READABLE.include?(@interests)
end
def writable?
@interests == :rw || @interests == :w
WRITABLE.include?(@interests)
end
# closes +@io+, deregisters from reactor (unless +deregister+ is false)
@ -60,8 +66,8 @@ class HTTPX::Selector
# register +io+ for +interests+ events.
def register(io, interests)
readable = interests == :r || interests == :rw
writable = interests == :w || interests == :rw
readable = READABLE.include?(interests)
writable = WRITABLE.include?(interests)
@lock.synchronize do
if readable
monitor = @readers[io]

View File

@ -11,7 +11,7 @@ module HTTPX
super
end
def initialize(loop_timeout: 5, total_timeout: nil)
def initialize(loop_timeout: LOOP_TIMEOUT, total_timeout: nil)
@loop_timeout = loop_timeout
@total_timeout = total_timeout
reset_counter

View File

@ -19,12 +19,12 @@ class UnixTest < Minitest::Test
private
RESPONSE_HEADER = <<-HTTP.lines.map.map(&:chomp).join("\r\n") << ("\r\n" * 2)
HTTP/1.1 200 OK
Date: Mon, 27 Jul 2009 12:28:53 GMT
Content-Length: 4
Content-Type: text/plain
Connection: close
RESPONSE_HEADER = <<-HTTP.lines.map(&:strip).map(&:chomp).join("\r\n") << ("\r\n" * 2)
HTTP/1.1 200 OK
Date: Mon, 27 Jul 2009 12:28:53 GMT
Content-Length: 4
Content-Type: text/plain
Connection: close
HTTP
def on_unix_server

View File

@ -77,6 +77,8 @@ class OptionsTest < Minitest::Test
:response_body_class => bar.response_body_class,
:transport => nil,
:transport_options => nil,
:resolver_class => bar.resolver_class,
:resolver_options => bar.resolver_options,
}, "options haven't merged correctly"
end

View File

@ -30,6 +30,14 @@ class RequestTest < Minitest::Test
assert r2.authority == "google.com", "unexpected authority (#{r2.authority})"
r3 = Request.new(:get, "http://app.dev:8080/path")
assert r3.authority == "app.dev:8080", "unexpected authority (#{r3.authority})"
r4 = Request.new(:get, "http://127.0.0.1:80/path")
assert r4.authority == "127.0.0.1", "unexpected authority (#{r4.authority})"
r5 = Request.new(:get, "https://[::1]:443/path")
assert r5.authority == "[::1]", "unexpected authority (#{r5.authority})"
r6 = Request.new(:get, "http://127.0.0.1:81/path")
assert r6.authority == "127.0.0.1:81", "unexpected authority (#{r6.authority})"
r7 = Request.new(:get, "https://[::1]:444/path")
assert r7.authority == "[::1]:444", "unexpected authority (#{r7.authority})"
end
def test_request_path

View File

@ -0,0 +1,96 @@
# frozen_string_literal: true
require "ostruct"
require_relative "../test_helper"
class HTTPSResolverTest < Minitest::Test
include ResolverHelpers
include HTTPX
def test_append_ipv4
super
assert resolver.empty?
end
def test_append_ipv6
super
assert resolver.empty?
end
def test_append_localhost
super
assert resolver.empty?
end
def test_parse_no_record
@has_error = false
resolver.on(:error) { @has_error = true }
channel = build_channel("https://idontthinkthisexists.org/")
resolver << channel
resolver.queries["idontthinkthisexists.org"] = channel
# this is only here to drain
write_buffer.clear
resolver.parse(no_record)
assert channel.addresses.nil?
assert resolver.queries.key?("idontthinkthisexists.org")
assert !@has_error, "resolver should still be able to resolve AAAA"
# A type
write_buffer.clear
resolver.parse(no_record)
assert channel.addresses.nil?
assert resolver.queries.key?("idontthinkthisexists.org")
assert @has_error, "resolver should have failed"
end
def test_io_api
__test_io_api
end
private
def build_channel(*)
channel = super
connection.expect(:find_channel, channel, [URI::HTTP])
channel
end
def resolver(options = Options.new)
@resolver ||= begin
resolver = Resolver::HTTPS.new(connection, options)
resolver.extend(ResolverHelpers::ResolverExtensions)
resolver
end
end
def connection
@connection ||= Minitest::Mock.new
end
def write_buffer
resolver.instance_variable_get(:@resolver_channel)
.instance_variable_get(:@pending)
end
MockResponse = Struct.new(:headers, :body) do
def to_s
body
end
end
def a_record
MockResponse.new({ "content-type" => "application/dns-udpwireformat" }, super)
end
def aaaa_record
MockResponse.new({ "content-type" => "application/dns-udpwireformat" }, super)
end
def cname_record
MockResponse.new({ "content-type" => "application/dns-udpwireformat" }, super)
end
def no_record
MockResponse.new({ "content-type" => "application/dns-udpwireformat" }, super)
end
end

View File

@ -0,0 +1,68 @@
# frozen_string_literal: true
require "ostruct"
require_relative "../test_helper"
class NativeResolverTest < Minitest::Test
include ResolverHelpers
include HTTPX
def test_append_ipv4
super
assert resolver.empty?
end
def test_append_ipv6
super
assert resolver.empty?
end
def test_append_localhost
super
assert resolver.empty?
end
def test_parse_no_record
@has_error = false
resolver.on(:error) { @has_error = true }
channel = build_channel("https://idontthinkthisexists.org/")
resolver << channel
resolver.resolve
resolver.queries["idontthinkthisexists.org"] = channel
# this is only here to drain
write_buffer.clear
resolver.parse(no_record)
assert channel.addresses.nil?
assert resolver.queries.key?("idontthinkthisexists.org")
assert !@has_error, "resolver should still be able to resolve A"
# A type
write_buffer.clear
resolver.parse(no_record)
assert channel.addresses.nil?
assert resolver.queries.key?("idontthinkthisexists.org")
assert @has_error, "resolver should have failed"
end
def test_io_api
__test_io_api
end
private
def resolver(options = Options.new)
@resolver ||= begin
resolver = Resolver::Native.new(connection, options)
resolver.extend(ResolverHelpers::ResolverExtensions)
resolver
end
end
def connection
@connection ||= Minitest::Mock.new
end
def write_buffer
resolver.instance_variable_get(:@write_buffer)
end
end

View File

@ -0,0 +1,24 @@
# frozen_string_literal: true
require "ostruct"
require_relative "../test_helper"
class SystemResolverTest < Minitest::Test
include ResolverHelpers
include HTTPX
def test_append_external_name
channel = build_channel("https://news.ycombinator.com")
resolver << channel
assert !channel.addresses.empty?, "name should have been resolved immediately"
end
private
def resolver(options = Options.new)
@resolver ||= begin
connection = Minitest::Mock.new
Resolver::System.new(connection, options)
end
end
end

24
test/resolver_test.rb Normal file
View File

@ -0,0 +1,24 @@
# frozen_string_literal: true
require_relative "test_helper"
class ResolverTest < Minitest::Test
include HTTPX
def test_cached_lookup
ips = Resolver.cached_lookup("test.com")
assert ips.nil?
dns_entry = { "data" => "IP", "TTL" => 2, "name" => "test.com" }
Resolver.cached_lookup_set("test.com", [dns_entry])
ips = Resolver.cached_lookup("test.com")
assert ips == ["IP"]
sleep 2
ips = Resolver.cached_lookup("test.com")
assert ips.nil?
alias_entry = { "alias" => "test.com", "TTL" => 2, "name" => "foo.com" }
Resolver.cached_lookup_set("test.com", [dns_entry])
Resolver.cached_lookup_set("foo.com", [alias_entry])
ips = Resolver.cached_lookup("foo.com")
assert ips == ["IP"]
end
end

View File

@ -1,6 +1,6 @@
module Requests
module Errors
def test_connection_refused
def test_errors_connection_refused
skip if RUBY_ENGINE == "jruby"
unavailable_host = URI(origin("localhost"))
unavailable_host.port = next_available_port

View File

@ -2,7 +2,7 @@
module Requests
module ResponseBody
def test_http_copy_to_file
def test_http_response_copy_to_file
file = Tempfile.new(%w[cat .jpeg])
uri = build_uri("/image")
response = HTTPX.get(uri, headers: { "accept" => "image/jpeg" })
@ -19,7 +19,7 @@ module Requests
end
end
def test_http_copy_to_io
def test_http_response_copy_to_io
io = StringIO.new
uri = build_uri("/image")
response = HTTPX.get(uri, headers: { "accept" => "image/jpeg" })
@ -32,8 +32,8 @@ module Requests
io.close if io
end
def test_http_buffer_to_custom
uri = build_uri("/get")
def test_http_response_buffer_to_custom
uri = build_uri("/")
custom_body = Class.new do
attr_reader :file

View File

@ -0,0 +1,140 @@
# frozen_string_literal: true
module ResolverHelpers
def test_resolver_api
assert resolver.respond_to?(:<<)
assert resolver.respond_to?(:closed?)
assert resolver.respond_to?(:empty?)
end
def test_append_localhost
ips = [IPAddr.new("127.0.0.1"), IPAddr.new("::1")]
channel = build_channel("https://localhost")
resolver << channel
assert (channel.addresses - ips).empty?, "localhost interfaces should have been attributed"
end
def test_append_ipv4
ip = IPAddr.new("255.255.0.1")
channel = build_channel("https://255.255.0.1")
resolver << channel
assert channel.addresses == [ip], "#{ip} should have been statically resolved"
end
def test_append_ipv6
ip = IPAddr.new("fe80::1")
channel = build_channel("https://[fe80::1]")
resolver << channel
assert channel.addresses == [ip], "#{ip} should have been statically resolved"
end
def __test_io_api
assert resolver.respond_to?(:interests)
assert resolver.respond_to?(:to_io)
assert resolver.respond_to?(:call)
assert resolver.respond_to?(:timeout)
assert resolver.respond_to?(:close)
end
def test_parse_a_record
return unless resolver.respond_to?(:parse)
channel = build_channel("http://ipv4.tlund.se/")
resolver.queries["ipv4.tlund.se"] = channel
resolver.parse(a_record)
assert channel.addresses.include?("193.15.228.195")
end
def test_parse_aaaa_record
return unless resolver.respond_to?(:parse)
channel = build_channel("http://ipv6.tlund.se/")
resolver.queries["ipv6.tlund.se"] = channel
resolver.parse(aaaa_record)
assert channel.addresses.include?("2a00:801:f::195")
end
def test_parse_cname_record
return unless resolver.respond_to?(:parse)
channel = build_channel("http://ipv4c.tlund.se/")
resolver.queries["ipv4c.tlund.se"] = channel
resolver.parse(cname_record)
assert channel.addresses.nil?
assert !resolver.queries.key?("ipv4c.tlund.se")
assert resolver.queries.key?("ipv4.tlund.se")
end
def test_append_hostname
return unless resolver.respond_to?(:resolve)
channel = build_channel("https://news.ycombinator.com")
resolver << channel
assert channel.addresses.nil?, "there should be no direct IP"
resolver.resolve
assert !write_buffer.empty?, "there should be a DNS query ready to be sent"
end
private
def build_channel(uri)
channel = HTTPX::Channel.by(URI(uri), HTTPX::Options.new)
channel.extend(ChannelExtensions)
channel
end
def a_record
"\x00\x03\x81\x80\x00\x01\x00\x01\x00\x03\x00\x06\x04ipv4\x05tlund\x02se\x00\x00\x01\x00\x01\xC0\f\x00" \
"\x01\x00\x01\x00\x00\v\xC3\x00\x04\xC1\x0F\xE4\xC3\xC0\x11\x00\x02\x00\x01\x00\x00\v\xC3\x00\x0F\x02ns" \
"\x06agartz\x03net\x00\xC0\x11\x00\x02\x00\x01\x00\x00\v\xC3\x00\t\x02ns\x03nxs\xC0\x17\xC0\x11\x00\x02" \
"\x00\x01\x00\x00\v\xC3\x00\b\x05slave\xC0Y\xC0V\x00\x01\x00\x01\x00\x01M\xCF\x00\x04\xC1\x0F\xE4\xC2\xC0;" \
"\x00\x01\x00\x01\x00\x01M\xCF\x00\x04\xC1\x0F\xE4\xC2\xC0k\x00\x01\x00\x01\x00\x01M\xCF\x00\x04\xC1\x0F" \
"\xE4\xC2\xC0V\x00\x1C\x00\x01\x00\x01M\xCF\x00\x10 \x01\x06|\x18\x98\x02\x01\x00\x00\x00\x00\x00\x00\x00S" \
"\xC0;\x00\x1C\x00\x01\x00\x01M\xCF\x00\x10*\x00\b\x01\x00\x0F\x00\x00\x00\x00\x00\x00\x00\x00\x00S\xC0k\x00" \
"\x1C\x00\x01\x00\x01M\xCF\x00\x10*\x00\b\x01\x00\x0F\x00\x00\x00\x00\x00\x00\x00\x00\x00S".b
end
def aaaa_record
"\x00\x02\x81\x80\x00\x01\x00\x01\x00\x03\x00\x06\x04" \
"ipv6\x05tlund\x02se\x00\x00\x1C\x00\x01\xC0\f\x00\x1C" \
"\x00\x01\x00\x00\x0E\x10\x00\x10*\x00\b\x01\x00\x0F\x00" \
"\x00\x00\x00\x00\x00\x00\x00\x01\x95\xC0\x11\x00\x02\x00\x01" \
"\x00\x00\r\xBE\x00\t\x02ns\x03nxs\xC0\x17\xC0\x11\x00\x02\x00" \
"\x01\x00\x00\r\xBE\x00\x0F\x02ns\x06agartz\x03net\x00\xC0\x11" \
"\x00\x02\x00\x01\x00\x00\r\xBE\x00\b\x05slave\xC0J\xC0G\x00\x01" \
"\x00\x01\x00\x01O\xCA\x00\x04\xC1\x0F\xE4\xC2\xC0\\\x00\x01\x00" \
"\x01\x00\x01O\xCA\x00\x04\xC1\x0F\xE4\xC2\xC0w\x00\x01\x00\x01\x00" \
"\x01O\xCA\x00\x04\xC1\x0F\xE4\xC2\xC0G\x00\x1C\x00\x01\x00\x01O\xCA" \
"\x00\x10 \x01\x06|\x18\x98\x02\x01\x00\x00\x00\x00\x00\x00\x00S\xC0\\" \
"\x00\x1C\x00\x01\x00\x01O\xCA\x00\x10*\x00\b\x01\x00\x0F\x00\x00\x00" \
"\x00\x00\x00\x00\x00\x00S\xC0w\x00\x1C\x00\x01\x00\x01O\xCA\x00\x10*" \
"\x00\b\x01\x00\x0F\x00\x00\x00\x00\x00\x00\x00\x00\x00S".b
end
def cname_record
"\x00\x02\x81\x80\x00\x01\x00\x01\x00\x01\x00\x00\x05ipv4c\x05tlund\x02se" \
"\x00\x00\x1C\x00\x01\xC0\f\x00\x05\x00\x01\x00\x00\a\x1F\x00\a\x04ipv4\xC0" \
"\x12\xC0\x12\x00\x06\x00\x01\x00\x00\x01,\x00%\x02ns\x03nxs\xC0\x18\x05tlund" \
"\xC0BxIv\x91\x00\x008@\x00\x00\x0E\x10\x00\e\xAF\x80\x00\x00\x01,".b
end
def no_record
"\x00\x02\x81\x83\x00\x01\x00\x00\x00\x01\x00\x00\x14idontthinkthisexists\x03org\x00" \
"\x00\x1C\x00\x01\xC0!\x00\x06\x00\x01\x00\x00\x03\x84\x003\x02a0\x03org\vafilias-nst" \
"\x04info\x00\x03noc\xC0=w\xFD|\xD2\x00\x00\a\b\x00\x00\x03\x84\x00\t:\x80\x00\x01Q\x80".b
end
module ResolverExtensions
def self.extended(obj)
obj.singleton_class.class_eval do
attr_reader :queries
public :parse # rubocop:disable Style/AccessModifierDeclarations
public :resolve # rubocop:disable Style/AccessModifierDeclarations
end
end
end
module ChannelExtensions
attr_reader :addresses
def addresses=(addrs)
@addresses = addrs
end
end
end