mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-12-04 00:01:13 -05:00
native resolver falls back to tcp dns when receiving truncated packet
yet another compliance fix for the DNS protocol; while udp is the preferred transport, in case a truncated response is received, the resolver will switch to tcp, and performm the DNS query again. This introduces a new resolver option, `:socket_type`, which is `:udp` by default.
This commit is contained in:
parent
5bba381bb8
commit
6aacc9b0eb
@ -31,6 +31,10 @@ module HTTPX
|
||||
@buffer.bytesize >= @limit
|
||||
end
|
||||
|
||||
def capacity
|
||||
@limit - @buffer.bytesize
|
||||
end
|
||||
|
||||
def shift!(fin)
|
||||
@buffer = @buffer.byteslice(fin..-1) || "".b
|
||||
end
|
||||
|
||||
@ -98,25 +98,30 @@ module HTTPX
|
||||
@identifier_mutex.synchronize { @identifier = (@identifier + 1) & 0xFFFF }
|
||||
end
|
||||
|
||||
def encode_dns_query(hostname, type: Resolv::DNS::Resource::IN::A)
|
||||
def encode_dns_query(hostname, type: Resolv::DNS::Resource::IN::A, message_id: generate_id)
|
||||
Resolv::DNS::Message.new.tap do |query|
|
||||
query.id = generate_id
|
||||
query.id = message_id
|
||||
query.rd = 1
|
||||
query.add_question(hostname, type)
|
||||
end.encode
|
||||
end
|
||||
|
||||
def decode_dns_answer(payload)
|
||||
message = Resolv::DNS::Message.decode(payload)
|
||||
begin
|
||||
message = Resolv::DNS::Message.decode(payload)
|
||||
rescue Resolv::DNS::DecodeError => e
|
||||
return :decode_error, e
|
||||
end
|
||||
|
||||
# no domain was found
|
||||
return if message.rcode == Resolv::DNS::RCode::NXDomain
|
||||
return :no_domain_found if message.rcode == Resolv::DNS::RCode::NXDomain
|
||||
|
||||
return :message_truncated if message.tc == 1
|
||||
|
||||
return :dns_error, message.rcode if message.rcode != Resolv::DNS::RCode::NoError
|
||||
|
||||
addresses = []
|
||||
|
||||
# TODO: raise an "other dns OtherResolvError" type of error
|
||||
return addresses if message.rcode != Resolv::DNS::RCode::NoError
|
||||
|
||||
message.each_answer do |question, _, value|
|
||||
case value
|
||||
when Resolv::DNS::Resource::IN::CNAME
|
||||
@ -134,7 +139,8 @@ module HTTPX
|
||||
}
|
||||
end
|
||||
end
|
||||
addresses
|
||||
|
||||
[:ok, addresses]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -128,23 +128,32 @@ module HTTPX
|
||||
end
|
||||
|
||||
def parse(request, response)
|
||||
begin
|
||||
answers = decode_response_body(response)
|
||||
rescue Resolv::DNS::DecodeError => e
|
||||
host, connection = @queries.first
|
||||
@queries.delete(host)
|
||||
emit_resolve_error(connection, connection.origin.host, e)
|
||||
return
|
||||
end
|
||||
code, result = decode_response_body(response)
|
||||
|
||||
if answers.nil?
|
||||
case code
|
||||
when :ok
|
||||
parse_addresses(result)
|
||||
when :no_domain_found
|
||||
# Indicates no such domain was found.
|
||||
|
||||
host = @requests.delete(request)
|
||||
connection = @queries.delete(host)
|
||||
|
||||
emit_resolve_error(connection) unless @queries.value?(connection)
|
||||
elsif answers.empty?
|
||||
when :dns_error
|
||||
host = @requests.delete(request)
|
||||
connection = @queries.delete(host)
|
||||
|
||||
emit_resolve_error(connection)
|
||||
when :decode_error
|
||||
host, connection = @queries.first
|
||||
@queries.delete(host)
|
||||
emit_resolve_error(connection, connection.origin.host, result)
|
||||
end
|
||||
end
|
||||
|
||||
def parse_addresses(answers)
|
||||
if answers.empty?
|
||||
# no address found, eliminate candidates
|
||||
host = @requests.delete(request)
|
||||
connection = @queries.delete(host)
|
||||
|
||||
@ -33,6 +33,7 @@ module HTTPX
|
||||
super
|
||||
@ns_index = 0
|
||||
@resolver_options = DEFAULTS.merge(@options.resolver_options)
|
||||
@socket_type = @resolver_options.fetch(:socket_type, :udp)
|
||||
@nameserver = Array(@resolver_options[:nameserver]) if @resolver_options[:nameserver]
|
||||
@ndots = @resolver_options[:ndots]
|
||||
@search = Array(@resolver_options[:search]).map { |srch| srch.scan(/[^.]+/) }
|
||||
@ -169,10 +170,49 @@ module HTTPX
|
||||
|
||||
def dread(wsize = @resolver_options[:packet_size])
|
||||
loop do
|
||||
siz = @io.read(wsize, @read_buffer)
|
||||
return unless siz && siz.positive?
|
||||
wsize = @large_packet.capacity if @large_packet
|
||||
|
||||
siz = @io.read(wsize, @read_buffer)
|
||||
|
||||
unless siz
|
||||
ex = EOFError.new("descriptor closed")
|
||||
ex.set_backtrace(caller)
|
||||
raise ex
|
||||
end
|
||||
|
||||
return unless siz.positive?
|
||||
|
||||
if @socket_type == :tcp
|
||||
# packet may be incomplete, need to keep draining from the socket
|
||||
if @large_packet
|
||||
# large packet buffer already exists, continue pumping
|
||||
@large_packet << @read_buffer
|
||||
|
||||
next unless @large_packet.full?
|
||||
|
||||
parse(@large_packet.to_s)
|
||||
|
||||
@socket_type = @resolver_options.fetch(:socket_type, :udp)
|
||||
@large_packet = nil
|
||||
transition(:idle)
|
||||
return
|
||||
else
|
||||
size = @read_buffer[0, 2].unpack1("n")
|
||||
|
||||
if size > @read_buffer.bytesize
|
||||
# only do buffer logic if it's worth it, and the whole packet isn't here already
|
||||
@large_packet = Buffer.new(size)
|
||||
@large_packet << @read_buffer.byteslice(2..-1)
|
||||
|
||||
next
|
||||
else
|
||||
parse(@read_buffer)
|
||||
end
|
||||
end
|
||||
else # udp
|
||||
parse(@read_buffer)
|
||||
end
|
||||
|
||||
parse(@read_buffer)
|
||||
return if @state == :closed
|
||||
end
|
||||
end
|
||||
@ -182,26 +222,26 @@ module HTTPX
|
||||
return if @write_buffer.empty?
|
||||
|
||||
siz = @io.write(@write_buffer)
|
||||
return unless siz && siz.positive?
|
||||
|
||||
unless siz
|
||||
ex = EOFError.new("descriptor closed")
|
||||
ex.set_backtrace(caller)
|
||||
raise ex
|
||||
end
|
||||
|
||||
return unless siz.positive?
|
||||
|
||||
return if @state == :closed
|
||||
end
|
||||
end
|
||||
|
||||
def parse(buffer)
|
||||
begin
|
||||
addresses = Resolver.decode_dns_answer(buffer)
|
||||
rescue Resolv::DNS::DecodeError => e
|
||||
hostname, connection = @queries.first
|
||||
@queries.delete(hostname)
|
||||
@timeouts.delete(hostname)
|
||||
@connections.delete(connection)
|
||||
ex = NativeResolveError.new(connection, connection.origin.host, e.message)
|
||||
ex.set_backtrace(e.backtrace)
|
||||
raise ex
|
||||
end
|
||||
code, result = Resolver.decode_dns_answer(buffer)
|
||||
|
||||
if addresses.nil?
|
||||
case code
|
||||
when :ok
|
||||
parse_addresses(result)
|
||||
when :no_domain_found
|
||||
# Indicates no such domain was found.
|
||||
hostname, connection = @queries.first
|
||||
@queries.delete(hostname)
|
||||
@ -211,7 +251,38 @@ module HTTPX
|
||||
@connections.delete(connection)
|
||||
raise NativeResolveError.new(connection, connection.origin.host)
|
||||
end
|
||||
elsif addresses.empty?
|
||||
when :message_truncated
|
||||
# TODO: what to do if it's already tcp??
|
||||
return if @socket_type == :tcp
|
||||
|
||||
@socket_type = :tcp
|
||||
|
||||
hostname, _ = @queries.first
|
||||
@write_buffer.clear
|
||||
@queries.delete(hostname)
|
||||
@timeouts.delete(hostname)
|
||||
transition(:closed)
|
||||
when :dns_error
|
||||
hostname, connection = @queries.first
|
||||
@queries.delete(hostname)
|
||||
@timeouts.delete(hostname)
|
||||
@connections.delete(connection)
|
||||
ex = NativeResolveError.new(connection, connection.origin.host, "unknown DNS error (error code #{result})")
|
||||
ex.set_backtrace(e.backtrace)
|
||||
raise ex
|
||||
when :decode_error
|
||||
hostname, connection = @queries.first
|
||||
@queries.delete(hostname)
|
||||
@timeouts.delete(hostname)
|
||||
@connections.delete(connection)
|
||||
ex = NativeResolveError.new(connection, connection.origin.host, result.message)
|
||||
ex.set_backtrace(result.backtrace)
|
||||
raise ex
|
||||
end
|
||||
end
|
||||
|
||||
def parse_addresses(addresses)
|
||||
if addresses.empty?
|
||||
# no address found, eliminate candidates
|
||||
_, connection = @queries.first
|
||||
candidates = @queries.select { |_, conn| connection == conn }.keys
|
||||
@ -281,12 +352,19 @@ module HTTPX
|
||||
end
|
||||
log { "resolver: query #{@record_type.name.split("::").last} for #{hostname}" }
|
||||
begin
|
||||
@write_buffer << Resolver.encode_dns_query(hostname, type: @record_type)
|
||||
@write_buffer << encode_dns_query(hostname)
|
||||
rescue Resolv::DNS::EncodeError => e
|
||||
emit_resolve_error(connection, hostname, e)
|
||||
end
|
||||
end
|
||||
|
||||
def encode_dns_query(hostname)
|
||||
message_id = Resolver.generate_id
|
||||
msg = Resolver.encode_dns_query(hostname, type: @record_type, message_id: message_id)
|
||||
msg[0, 2] = [msg.size, message_id].pack("nn") if @socket_type == :tcp
|
||||
msg
|
||||
end
|
||||
|
||||
def generate_candidates(name)
|
||||
return [name] if name.end_with?(".")
|
||||
|
||||
@ -300,12 +378,18 @@ module HTTPX
|
||||
end
|
||||
|
||||
def build_socket
|
||||
return if @io
|
||||
|
||||
ip, port = @nameserver[@ns_index]
|
||||
port ||= DNS_PORT
|
||||
log { "resolver: server: #{ip}:#{port}..." }
|
||||
@io = UDP.new(ip, port, @options)
|
||||
|
||||
case @socket_type
|
||||
when :udp
|
||||
log { "resolver: server: udp://#{ip}:#{port}..." }
|
||||
UDP.new(ip, port, @options)
|
||||
when :tcp
|
||||
log { "resolver: server: tcp://#{ip}:#{port}..." }
|
||||
origin = URI("tcp://#{ip}:#{port}")
|
||||
TCP.new(origin, [ip], @options)
|
||||
end
|
||||
end
|
||||
|
||||
def transition(nextstate)
|
||||
@ -319,7 +403,7 @@ module HTTPX
|
||||
when :open
|
||||
return unless @state == :idle
|
||||
|
||||
build_socket
|
||||
@io ||= build_socket
|
||||
|
||||
@io.connect
|
||||
return unless @io.connected?
|
||||
|
||||
@ -11,6 +11,7 @@ module HTTPX
|
||||
|
||||
def full?: () -> bool
|
||||
def shift!: (Integer) -> void
|
||||
def capacity: () -> Integer
|
||||
|
||||
# delegated
|
||||
def <<: (string data) -> String
|
||||
|
||||
@ -13,6 +13,7 @@ module HTTPX
|
||||
type dns_result = { "name" => String, "TTL" => Numeric, "alias" => String }
|
||||
| { "name" => String, "TTL" => Numeric, "data" => String }
|
||||
|
||||
type dns_decoding_response = [:ok, Array[dns_result]] | [:decode_error, Resolv::DNS::DecodeError] | [:dns_error, Integer] | Symbol
|
||||
|
||||
def nolookup_resolve: (String hostname) -> Array[IPAddr]
|
||||
|
||||
@ -28,8 +29,8 @@ module HTTPX
|
||||
|
||||
def self?.generate_id: () -> Integer
|
||||
|
||||
def self?.encode_dns_query: (String hostname, ?type: dns_resource) -> String
|
||||
def self?.encode_dns_query: (String hostname, ?type: dns_resource, ?message_id: Integer) -> String
|
||||
|
||||
def self?.decode_dns_answer: (String) -> Array[dns_result]?
|
||||
def self?.decode_dns_answer: (String) -> dns_decoding_response
|
||||
end
|
||||
end
|
||||
@ -34,7 +34,7 @@ module HTTPX
|
||||
|
||||
def build_request: (String hostname) -> Request
|
||||
|
||||
def decode_response_body: (Response) -> Array[dns_result]?
|
||||
def decode_response_body: (Response) -> dns_decoding_response
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -20,6 +20,7 @@ module HTTPX
|
||||
@connections: Array[Connection]
|
||||
@read_buffer: String
|
||||
@write_buffer: Buffer
|
||||
@large_packet: Buffer?
|
||||
|
||||
attr_reader state: Symbol
|
||||
|
||||
|
||||
@ -70,7 +70,7 @@ module Requests
|
||||
uri = URI(build_uri("/get"))
|
||||
resolver_class = Class.new(HTTPX::Resolver::HTTPS) do
|
||||
def decode_response_body(_response)
|
||||
raise Resolv::DNS::DecodeError
|
||||
[:decode_error, Resolv::DNS::DecodeError.new("smth")]
|
||||
end
|
||||
end
|
||||
response = session.head(uri, resolver_class: resolver_class, resolver_options: options.merge(record_types: %w[]))
|
||||
@ -170,30 +170,23 @@ module Requests
|
||||
session = HTTPX.plugin(SessionWithPool)
|
||||
|
||||
resolver_class = Class.new(HTTPX::Resolver::Native) do
|
||||
attr_reader :io
|
||||
|
||||
@instances = []
|
||||
@ios = []
|
||||
|
||||
class << self
|
||||
attr_reader :instances
|
||||
attr_reader :ios
|
||||
end
|
||||
|
||||
def new(*)
|
||||
resolver = super
|
||||
|
||||
@instances << resolver
|
||||
|
||||
resolver
|
||||
end
|
||||
def build_socket
|
||||
io = super
|
||||
self.class.ios << io
|
||||
io
|
||||
end
|
||||
end
|
||||
|
||||
response = session.head(uri, resolver_class: resolver_class, resolver_options: options.merge(nameserver: %w[8.8.8.8]))
|
||||
response = session.head(uri, resolver_class: resolver_class, resolver_options: options)
|
||||
verify_status(response, 200)
|
||||
|
||||
resolver = resolver_class.instances.find { |res| res.family == Socket::AF_INET }
|
||||
|
||||
assert !resolver.nil?, "resolver instance not collected"
|
||||
assert resolver.io.is_a?(HTTPX::TCP), "resolver did not fallback to tcp (#{resolver.io} instead)"
|
||||
assert resolver_class.ios.any?(HTTPX::TCP), "resolver did not upgrade to tcp"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user