fix: keep connection alive if HTTP/1.1 and there is connection-related

header;

this was wrongly closing connections for HTTP/1.1 connections which
didn't send a "Connection" header; according to spec, the default is
"Keep-Alive", contrary to HTTP/1.0
This commit is contained in:
HoneyryderChuck 2019-05-15 14:45:06 +00:00
parent ab73a6fef5
commit 2da56ce9af
32 changed files with 315 additions and 193 deletions

View File

@ -24,6 +24,7 @@ module Faraday
class Session < ::HTTPX::Session
plugin(:compression)
plugin(:persistent)
module ReasonPlugin
if RUBY_VERSION < "2.5"
@ -38,11 +39,11 @@ module Faraday
module ResponseMethods
if RUBY_VERSION < "2.5"
def reason
WEBrick::HTTPStatus::StatusMessage[@status]
WEBrick::HTTPStatus::StatusMessage.fetch(@status)
end
else
def reason
Net::HTTP::STATUS_CODES[@status]
Net::HTTP::STATUS_CODES.fetch(@status)
end
end
end

View File

@ -44,25 +44,25 @@ module HTTPX
def_delegator :@write_buffer, :empty?
attr_reader :uri, :state, :pending, :options
attr_reader :origin, :state, :pending, :options
attr_reader :timeout
def initialize(type, uri, options)
@type = type
@uri = uri
@origins = [@uri.origin]
@origins = [uri.origin]
@origin = URI(uri.origin)
@options = Options.new(options)
@window_size = @options.window_size
@read_buffer = Buffer.new(BUFFER_SIZE)
@write_buffer = Buffer.new(BUFFER_SIZE)
@pending = []
on(:error) { |ex| on_error(ex) }
on(:error, &method(:on_error))
if @options.io
# if there's an already open IO, get its
# peer address, and force-initiate the parser
transition(:already_open)
@io = IO.registry(@type).new(@uri, nil, @options)
@io = IO.registry(@type).new(@origin, nil, @options)
parser
else
transition(:idle)
@ -72,26 +72,32 @@ module HTTPX
# this is a semi-private method, to be used by the resolver
# to initiate the io object.
def addresses=(addrs)
@io ||= IO.registry(@type).new(@uri, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName
@io ||= IO.registry(@type).new(@origin, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName
end
def addresses
@io && @io.addresses
end
def mergeable?(addresses)
return false if @state == :closing || !@io
def match?(uri, options)
return false if @state == :closing || @state == :closed
!(@io.addresses & addresses).empty?
(@origins.include?(uri.origin) || match_altsvcs?(uri)) && connection_options_match?(options)
end
def mergeable?(connection)
return false if @state == :closing || @state == :closed || !@io
!(@io.addresses & connection.addresses).empty? && connection_options_match?(connection.options)
end
# coalescable connections need to be mergeable!
# but internally, #mergeable? is called before #coalescable?
def coalescable?(connection)
if @io.protocol == "h2" && @uri.scheme == "https"
@io.verify_hostname(connection.uri.host)
if @io.protocol == "h2" && @origin.scheme == "https"
@io.verify_hostname(connection.origin.host)
else
@uri.origin == connection.uri.origin
@origin == connection.origin
end
end
@ -105,10 +111,10 @@ module HTTPX
def unmerge(connection)
@origins -= connection.instance_variable_get(:@origins)
purge_pending do |request, args|
request.uri == connection.uri && begin
purge_pending do |request|
request.uri.origin == connection.origin && begin
request.transition(:idle)
connection.send(request, *args)
connection.send(request)
true
end
end
@ -122,16 +128,10 @@ module HTTPX
end
end
def match?(uri, _options)
return false if @state == :closing
@origins.include?(uri.origin) || match_altsvcs?(uri)
end
# checks if this is connection is an alternative service of
# +uri+
def match_altsvcs?(uri)
AltSvc.cached_altsvc(@uri.origin).any? do |altsvc|
AltSvc.cached_altsvc(@origin).any? do |altsvc|
origin = altsvc["origin"]
origin.altsvc_match?(uri.origin)
end
@ -176,14 +176,14 @@ module HTTPX
emit(:close)
end
def send(request, **args)
def send(request)
if @error_response
emit(:response, request, @error_response)
elsif @parser && !@write_buffer.full?
request.headers["alt-used"] = @uri.authority if match_altsvcs?(request.uri)
parser.send(request, **args)
request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
parser.send(request)
else
@pending << [request, args]
@pending << request
end
end
@ -202,26 +202,6 @@ module HTTPX
nil
end
def handle_timeout_error(e)
case e
when TotalTimeoutError
# return unless @options.timeout.no_time_left?
emit(:error, e)
when TimeoutError
return emit(:error, e) unless @timeout
@timeout -= e.timeout
return unless @timeout <= 0
if connecting?
emit(:error, e.to_connection_error)
else
emit(:error, e)
end
end
end
private
def consume
@ -268,8 +248,8 @@ module HTTPX
def send_pending
while !@write_buffer.full? && (req_args = @pending.shift)
request, args = req_args
parser.send(request, **args)
request = req_args
parser.send(request)
end
end
@ -288,14 +268,14 @@ module HTTPX
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
emit(:altsvc, alt_origin, origin, alt_params)
end
emit(:response, request, response)
request.emit(:response, response)
end
parser.on(:altsvc) do |alt_origin, origin, alt_params|
emit(:altsvc, alt_origin, origin, alt_params)
end
parser.on(:promise) do |*args|
emit(:promise, *args)
parser.on(:promise) do |request, stream|
request.emit(:promise, parser, stream)
end
parser.on(:close) do
transition(:closing)
@ -318,7 +298,7 @@ module HTTPX
emit(:uncoalesce, request.uri)
else
response = ErrorResponse.new(ex, @options)
emit(:response, request, response)
request.emit(:response, response)
end
end
end
@ -375,11 +355,24 @@ module HTTPX
end
def handle_error(e)
if e.instance_of?(TimeoutError) && @timeout
@timeout -= e.timeout
return unless @timeout <= 0
e = e.to_connection_error if connecting?
end
parser.handle_error(e) if @parser && parser.respond_to?(:handle_error)
@error_response = ErrorResponse.new(e, @options)
@pending.each do |request, _|
emit(:response, request, @error_response)
request.emit(:response, @error_response)
end
end
def connection_options_match?(options)
options.transport == @options.transport &&
options.transport_options == @options.transport_options &&
options.ssl == @options.ssl
end
end
end

View File

@ -39,7 +39,7 @@ module HTTPX
@parser << data
end
def send(request, **)
def send(request)
if @max_requests.positive? &&
@requests.size >= @max_concurrent_requests
@pending << request
@ -143,9 +143,7 @@ module HTTPX
def handle_error(ex)
if @pipelining
disable_pipelining
emit(:reset)
throw(:called)
disable
else
@requests.each do |request|
emit(:error, request, ex)
@ -170,14 +168,24 @@ module HTTPX
keep_alive_timeout = parameters["timeout"].to_i
emit(:timeout, keep_alive_timeout)
end
when /close/i, nil
disable_pipelining
when /close/i
@max_requests = Float::INFINITY
emit(:reset)
throw(:called)
disable
when nil
# In HTTP/1.1, it's keep alive by default
return if response.version == "1.1"
@max_requests = Float::INFINITY
disable
end
end
def disable
disable_pipelining
emit(:reset)
throw(:called)
end
def disable_pipelining
return if @requests.empty?

View File

@ -275,7 +275,7 @@ module HTTPX
end
def on_promise(stream)
emit(:promise, self, stream)
emit(:promise, @streams.key(stream.parent), stream)
end
def respond_to_missing?(meth, *args)

View File

@ -13,17 +13,17 @@ module HTTPX
alias_method :host, :ip
def initialize(uri, addresses, options)
def initialize(origin, addresses, options)
@state = :idle
@hostname = uri.host
@hostname = origin.host
@addresses = addresses
@options = Options.new(options)
@fallback_protocol = @options.fallback_protocol
@port = uri.port
@port = origin.port
if @options.io
@io = case @options.io
when Hash
@options.io[uri.authority]
@options.io[origin.authority]
else
@options.io
end

View File

@ -59,6 +59,7 @@ module HTTPX
:connection_class => Class.new(Connection),
:transport => nil,
:transport_options => nil,
:persistent => false,
:resolver_class => (ENV["HTTPX_RESOLVER"] || :native).to_sym,
}
@ -102,6 +103,7 @@ module HTTPX
follow ssl http2_settings
request_class response_class headers_class request_body_class response_body_class connection_class
io fallback_protocol debug debug_level transport_options resolver_class resolver_options
persistent
].each do |method_name|
def_option(method_name)
end

View File

@ -2,6 +2,10 @@
module HTTPX
module Plugins
#
# This plugin adds a shim #authentication method to the session, which will fill
# the HTTP Authorization header.
#
module Authentication
module InstanceMethods
def authentication(token)

View File

@ -2,6 +2,10 @@
module HTTPX
module Plugins
#
# This plugin adds helper methods to implement HTTP Basic Auth
# https://tools.ietf.org/html/rfc7617
#
module BasicAuthentication
def self.load_dependencies(klass, *)
require "base64"

View File

@ -2,6 +2,14 @@
module HTTPX
module Plugins
#
# This plugin adds compression support. Namely it:
#
# * Compresses the request body when passed a supported "Content-Encoding" mime-type;
# * Decompresses the response body from a supported "Content-Encoding" mime-type;
#
# It supports both *gzip* and *deflate*.
#
module Compression
extend Registry
def self.configure(klass, *)

View File

@ -4,6 +4,11 @@ require "forwardable"
module HTTPX
module Plugins
#
# This plugin implements a persistent cookie jar for the duration of a session.
#
# It also adds a *#cookies* helper, so that you can pre-fill the cookies of a session.
#
module Cookies
using URIExtensions
@ -83,11 +88,12 @@ module HTTPX
private
def on_response(request, response)
@options.cookies.set(request.origin, response.headers["set-cookie"])
@options.cookies.set(request.origin, response.headers["set-cookie"]) if response.respond_to?(:headers)
super
end
def __build_req(*, _)
def build_request(*, _)
request = super
request.headers.set_cookie(@options.cookies[request.uri])
request

View File

@ -2,6 +2,10 @@
module HTTPX
module Plugins
#
# This plugin adds helper methods to implement HTTP Digest Auth
# https://tools.ietf.org/html/rfc7616
#
module DigestAuthentication
DigestError = Class.new(Error)
@ -28,13 +32,13 @@ module HTTPX
alias_method :digest_auth, :digest_authentication
def request(*args, **options)
requests = __build_reqs(*args, options)
requests = build_requests(*args, options)
probe_request = requests.first
digest = probe_request.options.digest
return super unless digest
prev_response = wrap { __send_reqs(*probe_request, options).first }
prev_response = wrap { send_requests(*probe_request, options).first }
raise Error, "request doesn't require authentication (status: #{prev_response.status})" unless prev_response.status == 401
@ -46,9 +50,9 @@ module HTTPX
token = digest.generate_header(request, prev_response)
request.headers["authorization"] = "Digest #{token}"
response = if requests.empty?
__send_reqs(*request, options).first
send_requests(*request, options).first
else
wrap { __send_reqs(*request, options).first }
wrap { send_requests(*request, options).first }
end
responses << response
prev_response = response

View File

@ -3,6 +3,14 @@
module HTTPX
InsecureRedirectError = Class.new(Error)
module Plugins
#
# This plugin adds support for following redirect (status 30X) responses.
#
# It has an upper bound of followed redirects (see *MAX_REDIRECTS*), after which it
# will return the last redirect response. It will **not** raise an exception.
#
# It also doesn't follow insecure redirects (https -> http) by default (see *follow_insecure_redirects*).
#
module FollowRedirects
MAX_REDIRECTS = 3
REDIRECT_STATUS = (300..399).freeze
@ -37,7 +45,7 @@ module HTTPX
return response unless REDIRECT_STATUS.include?(response.status)
return response unless max_redirects.positive?
retry_request = __build_redirect_req(redirect_request, response, options)
retry_request = build_redirect_request(redirect_request, response, options)
request.redirect_request = retry_request
@ -49,13 +57,12 @@ module HTTPX
return ErrorResponse.new(error, options)
end
connection = find_connection(retry_request, options)
connections << connection unless connections.include?(connection)
connection = find_connection(retry_request, connections, options)
connection.send(retry_request)
nil
end
def __build_redirect_req(request, response, options)
def build_redirect_request(request, response, options)
redirect_uri = __get_location_from_response(response)
max_redirects = request.max_redirects
@ -63,7 +70,7 @@ module HTTPX
retry_options = options.merge(headers: request.headers,
body: request.body,
max_redirects: max_redirects - 1)
__build_req(:get, redirect_uri, retry_options)
build_request(:get, redirect_uri, retry_options)
end
def __get_location_from_response(response)

View File

@ -2,6 +2,11 @@
module HTTPX
module Plugins
#
# This plugin adds support for upgrading a plaintext HTTP/1.1 connection to HTTP/2.
#
# https://tools.ietf.org/html/rfc7540#section-3.2
#
module H2C
def self.load_dependencies(*)
require "base64"
@ -11,32 +16,18 @@ module HTTPX
def request(*args, **options)
h2c_options = options.merge(fallback_protocol: "h2c")
requests = __build_reqs(*args, h2c_options)
requests = build_requests(*args, h2c_options)
upgrade_request = requests.first
return super unless valid_h2c_upgrade_request?(upgrade_request)
upgrade_request.headers["upgrade"] = "h2c"
upgrade_request.headers.add("connection", "upgrade")
upgrade_request.headers.add("connection", "http2-settings")
upgrade_request.headers["upgrade"] = "h2c"
upgrade_request.headers["http2-settings"] = HTTP2::Client.settings_header(upgrade_request.options.http2_settings)
upgrade_response = wrap { __send_reqs(*upgrade_request, h2c_options).first }
wrap { send_requests(*upgrade_request, h2c_options).first }
if upgrade_response.status == 101
# if 101, assume that connection exists and was kept open
connection = find_connection(upgrade_request, upgrade_request.options)
connection.upgrade(upgrade_request, upgrade_response)
response = upgrade_request.response
if response.status == 200
requests.delete(upgrade_request)
return response if requests.empty?
end
responses = __send_reqs(*requests, h2c_options)
else
# proceed as usual
responses = [upgrade_response] + __send_reqs(*requests[1..-1], h2c_options)
end
responses = send_requests(*requests, h2c_options)
return responses.first if responses.size == 1
@ -45,6 +36,17 @@ module HTTPX
private
def fetch_response(request, connections, options)
response = super
if response && valid_h2c_upgrade?(request, response, options)
log { "upgrading to h2c..." }
connection = find_connection(request, connections, options)
connections << connection unless connections.include?(connection)
connection.upgrade(request, response)
end
response
end
VALID_H2C_METHODS = %i[get options head].freeze
private_constant :VALID_H2C_METHODS
@ -52,6 +54,13 @@ module HTTPX
VALID_H2C_METHODS.include?(request.verb) &&
request.scheme == "http"
end
def valid_h2c_upgrade?(request, response, options)
options.fallback_protocol == "h2c" &&
request.headers.get("connection").include?("upgrade") &&
request.headers.get("upgrade").include?("h2c") &&
response.status == 101
end
end
class H2CParser < Connection::HTTP2
@ -80,9 +89,9 @@ module HTTPX
end
def coalescable?(connection)
return super unless @options.fallback_protocol == "h2c" && @uri.scheme == "http"
return super unless @options.fallback_protocol == "h2c" && @origin.scheme == "http"
@uri.origin == connection.uri.origin && connection.options.fallback_protocol == "h2c"
@origin == connection.origin && connection.options.fallback_protocol == "h2c"
end
def upgrade(request, response)
@ -93,7 +102,7 @@ module HTTPX
end
def build_parser(*)
return super unless @uri.scheme == "http"
return super unless @origin.scheme == "http"
super("http/1.1")
end

View File

@ -2,6 +2,11 @@
module HTTPX
module Plugins
#
# This plugin adds support for passing `http-form_data` objects (like file objects) as "multipart/form-data";
#
# HTTPX.post(URL, form: form: { image: HTTP::FormData::File.new("path/to/file")})
#
module Multipart
module FormTranscoder
module_function

View File

@ -0,0 +1,29 @@
# frozen_string_literal: true
module HTTPX
module Plugins
# This plugin implements a session that persists connections over the duration of the process.
#
# This will improve connection reuse in a long-running process.
#
# One important caveat to note is, although this session might not close connections,
# other sessions from the same process that don't have this plugin turned on might.
#
# This session will still be able to work with it, as if, when expecting a connection
# terminated by a different session, it will just retry on a new one and keep it open.
#
# This plugin is also not recommendable when connecting to >9000 (like, a lot) different origins.
# So when you use this, make sure that you don't fall into this trap.
#
module Persistent
def self.load_dependencies(klass, *)
klass.plugin(:retries) # TODO: pass default max_retries -> 1 as soon as this is a parameter
end
def self.extra_options(options)
options.merge(persistent: true)
end
end
register_plugin :persistent, Persistent
end
end

View File

@ -6,8 +6,17 @@ require "forwardable"
module HTTPX
module Plugins
#
# This plugin adds support for proxies. It ships with support for:
#
# * HTTP proxies
# * HTTPS proxies
# * Socks4/4a proxies
# * Socks5 proxies
#
module Proxy
Error = Class.new(Error)
PROXY_ERRORS = [TimeoutError, IOError, SystemCallError, Error].freeze
class Parameters
attr_reader :uri, :username, :password
@ -72,7 +81,7 @@ module HTTPX
options.proxy.merge(uri: @_proxy_uris.first) unless @_proxy_uris.empty?
end
def find_connection(request, options)
def find_connection(request, connections, options)
return super unless options.respond_to?(:proxy)
uri = URI(request.uri)
@ -80,26 +89,32 @@ module HTTPX
raise Error, "Failed to connect to proxy" unless next_proxy
proxy_options = options.merge(proxy: Parameters.new(**next_proxy))
pool.find_connection(uri, proxy_options) || build_connection(uri, proxy_options)
connection = pool.find_connection(uri, proxy_options) || build_connection(uri, proxy_options)
unless connections.nil? || connections.include?(connection)
connections << connection
set_connection_callbacks(connection, options)
end
connection
end
def __build_connection(uri, options)
def build_connection(uri, options)
proxy = options.proxy
return super unless proxy
options.connection_class.new("tcp", uri, options)
connection = options.connection_class.new("tcp", uri, options)
pool.init_connection(connection, options)
connection
end
def fetch_response(request, connections, options)
response = super
if response.is_a?(ErrorResponse) &&
# either it was a timeout error connecting, or it was a proxy error
(((response.error.is_a?(TimeoutError) || response.error.is_a?(IOError)) && request.state == :idle) ||
response.error.is_a?(Error)) &&
!@_proxy_uris.empty?
PROXY_ERRORS.any? { |ex| response.error.is_a?(ex) } && !@_proxy_uris.empty?
@_proxy_uris.shift
log { "failed connecting to proxy, trying next..." }
connection = find_connection(request, options)
request.transition(:idle)
connection = find_connection(request, connections, options)
connections << connection unless connections.include?(connection)
connection.send(request)
return
@ -115,9 +130,9 @@ module HTTPX
super
return unless @options.proxy
# redefining the connection uri as the proxy's URI,
# redefining the connection origin as the proxy's URI,
# as this will be used as the tcp peer ip.
@uri = @options.proxy.uri
@origin = URI(@options.proxy.uri.origin)
end
def match?(uri, options)
@ -133,11 +148,11 @@ module HTTPX
false
end
def send(request, **args)
def send(request)
return super unless @options.proxy
return super unless connecting?
@pending << [request, args]
@pending << request
end
def connecting?

View File

@ -26,7 +26,7 @@ module HTTPX
private
def __send_reqs(*requests, options)
def send_requests(*requests, options)
request_options = @options.merge(options)
return super unless request_options.proxy

View File

@ -2,12 +2,16 @@
module HTTPX
module Plugins
#
# This plugin adds support for HTTP/2 Push responses.
#
# In order to benefit from this, requests are sent one at a time, so that
# no push responses are received after corresponding request has been sent.
#
module PushPromise
PUSH_OPTIONS = { http2_settings: { settings_enable_push: 1 },
max_concurrent_requests: 1 }.freeze
def self.extra_options(options)
options.merge(PUSH_OPTIONS)
options.merge(http2_settings: { settings_enable_push: 1 },
max_concurrent_requests: 1)
end
module ResponseMethods
@ -43,6 +47,7 @@ module HTTPX
headers = @options.headers_class.new(h)
path = headers[":path"]
authority = headers[":authority"]
request = parser.pending.find { |r| r.authority == authority && r.path == path }
if request
request.merge_headers(headers)

View File

@ -2,9 +2,24 @@
module HTTPX
module Plugins
#
# This plugin adds support for retrying requests when certain errors happen.
#
module Retries
MAX_RETRIES = 3
# TODO: pass max_retries in a configure/load block
IDEMPOTENT_METHODS = %i[get options head put delete].freeze
RETRYABLE_ERRORS = [IOError,
EOFError,
Errno::ECONNRESET,
Errno::ECONNABORTED,
Errno::EPIPE,
(OpenSSL::SSL::SSLError if defined?(OpenSSL)),
TimeoutError,
Parser::Error,
Errno::EINVAL,
Errno::ETIMEDOUT].freeze
def self.extra_options(options)
Class.new(options.class) do
@ -14,6 +29,8 @@ module HTTPX
num
end
def_option(:retry_change_requests)
end.new(options)
end
@ -28,15 +45,25 @@ module HTTPX
response = super
if response.is_a?(ErrorResponse) &&
request.retries.positive? &&
IDEMPOTENT_METHODS.include?(request.verb)
__repeatable_request?(request, options) &&
__retryable_error?(response.error)
request.retries -= 1
connection = find_connection(request, options)
connections << connection unless connections.include?(connection)
log { "failed to get response, #{request.retries} tries to go..." }
request.transition(:idle)
connection = find_connection(request, connections, options)
connection.send(request)
return
end
response
end
def __repeatable_request?(request, options)
IDEMPOTENT_METHODS.include?(request.verb) || options.retry_change_requests
end
def __retryable_error?(ex)
RETRYABLE_ERRORS.any? { |klass| ex.is_a?(klass) }
end
end
module RequestMethods

View File

@ -23,19 +23,11 @@ module HTTPX
tout = timeout.total_timeout if timeout
@selector.select(next_timeout || tout) do |monitor|
if (connection = monitor.value)
connection.call
end
monitor.interests = connection.interests
monitor.io.call
monitor.interests = monitor.io.interests
end
end
rescue TimeoutError => timeout_error
@connections.each do |connection|
connection.handle_timeout_error(timeout_error)
end
rescue Errno::ECONNRESET,
Errno::ECONNABORTED,
Errno::EPIPE => ex
rescue StandardError => ex
@connections.each do |connection|
connection.emit(:error, ex)
end
@ -80,16 +72,12 @@ module HTTPX
resolver << connection
return if resolver.empty?
@_resolver_monitors[resolver] ||= begin
monitor = @selector.register(resolver, :w)
monitor.value = resolver
monitor
end
@_resolver_monitors[resolver] ||= @selector.register(resolver, :w)
end
def on_resolver_connection(connection, addresses)
def on_resolver_connection(connection)
found_connection = @connections.find do |ch|
ch != connection && ch.mergeable?(addresses)
ch != connection && ch.mergeable?(connection)
end
return register_connection(connection) unless found_connection
@ -121,7 +109,7 @@ module HTTPX
end
def register_connection(connection)
monitor = if connection.state == :open
if connection.state == :open
# if open, an IO was passed upstream, therefore
# consider it connected already.
@connected_connections += 1
@ -129,7 +117,6 @@ module HTTPX
else
@selector.register(connection, :w)
end
monitor.value = connection
connection.on(:close) do
unregister_connection(connection)
end
@ -152,7 +139,7 @@ module HTTPX
end
def next_timeout
@resolvers.values.reject(&:closed?).map(&:timeout).min || @connections.map(&:timeout).min
@resolvers.values.reject(&:closed?).map(&:timeout).compact.min || @connections.map(&:timeout).compact.min
end
def find_resolver_for(connection)

View File

@ -5,6 +5,7 @@ require "forwardable"
module HTTPX
class Request
extend Forwardable
include Callbacks
using URIExtensions
METHODS = [

View File

@ -40,7 +40,7 @@ module HTTPX
def <<(connection)
@uri_addresses ||= Resolv.getaddresses(@uri.host)
if @uri_addresses.empty?
ex = ResolveError.new("Can't resolve #{connection.uri.host}")
ex = ResolveError.new("Can't resolve #{connection.origin.host}")
ex.set_backtrace(caller)
emit(:error, connection, ex)
else
@ -70,7 +70,6 @@ module HTTPX
connection = @options.connection_class.new("ssl", @uri, @options.merge(ssl: { alpn_protocols: %w[h2] }))
pool.init_connection(connection, @options)
emit_addresses(connection, @uri_addresses)
set_connection_callbacks(connection)
@building_connection = false
connection
end
@ -79,7 +78,7 @@ module HTTPX
def resolve(connection = @connections.first, hostname = nil)
return if @building_connection
hostname = hostname || @queries.key(connection) || connection.uri.host
hostname = hostname || @queries.key(connection) || connection.origin.host
type = @_record_types[hostname].first
log(label: "resolver: ") { "query #{type} for #{hostname}" }
begin
@ -93,11 +92,6 @@ module HTTPX
end
end
def set_connection_callbacks(connection)
connection.on(:response, &method(:on_response))
connection.on(:promise, &method(:on_response))
end
def on_response(request, response)
response.raise_for_status
rescue Error => ex
@ -112,6 +106,11 @@ module HTTPX
@requests.delete(request)
end
def on_promise(_, stream)
log(level: 2, label: "#{stream.id}: ") { "refusing stream!" }
stream.refuse
end
def parse(response)
begin
answers = decode_response_body(response)
@ -179,6 +178,8 @@ module HTTPX
request.headers["content-type"] = "application/dns-message"
request.headers["accept"] = "application/dns-message"
end
request.on(:response, &method(:on_response).curry[request])
request.on(:promise, &method(:on_promise))
request
end

View File

@ -101,7 +101,7 @@ module HTTPX
return if early_resolve(connection)
if @nameserver.nil?
ex = ResolveError.new("Can't resolve #{connection.uri.host}: no nameserver")
ex = ResolveError.new("Can't resolve #{connection.origin.host}: no nameserver")
ex.set_backtrace(caller)
emit(:error, connection, ex)
else
@ -132,7 +132,7 @@ module HTTPX
queries = {}
while (query = @queries.shift)
h, connection = query
host = connection.uri.host
host = connection.origin.host
timeout = (@timeouts[host][0] -= loop_time)
unless timeout.negative?
queries[h] = connection
@ -216,7 +216,7 @@ module HTTPX
end
else
@connections.delete(connection)
Resolver.cached_lookup_set(connection.uri.host, addresses)
Resolver.cached_lookup_set(connection.origin.host, addresses)
emit_addresses(connection, addresses.map { |addr| addr["data"] })
end
end
@ -229,7 +229,7 @@ module HTTPX
raise Error, "no URI to resolve" unless connection
return unless @write_buffer.empty?
hostname = hostname || @queries.key(connection) || connection.uri.host
hostname = hostname || @queries.key(connection) || connection.origin.host
@queries[hostname] = connection
type = @_record_types[hostname].first
log(label: "resolver: ") { "query #{type} for #{hostname}" }

View File

@ -19,7 +19,7 @@ module HTTPX
end
def uncache(connection)
hostname = hostname || @queries.key(connection) || connection.uri.host
hostname = hostname || @queries.key(connection) || connection.origin.host
Resolver.uncache(hostname)
@_record_types[hostname].shift
end
@ -30,12 +30,12 @@ module HTTPX
addresses.map! do |address|
address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s)
end
log(label: "resolver: ") { "answer #{connection.uri.host}: #{addresses.inspect}" }
log(label: "resolver: ") { "answer #{connection.origin.host}: #{addresses.inspect}" }
connection.addresses = addresses
emit(:resolve, connection, addresses)
emit(:resolve, connection)
end
def early_resolve(connection, hostname: connection.uri.host)
def early_resolve(connection, hostname: connection.origin.host)
addresses = connection.addresses ||
ip_resolve(hostname) ||
Resolver.cached_lookup(hostname) ||

View File

@ -29,7 +29,7 @@ module HTTPX
end
def <<(connection)
hostname = connection.uri.host
hostname = connection.origin.host
addresses = connection.addresses ||
ip_resolve(hostname) ||
system_resolve(hostname) ||

View File

@ -11,7 +11,7 @@ class HTTPX::Selector
# I/O monitor
#
class Monitor
attr_accessor :value, :interests, :readiness
attr_accessor :io, :interests, :readiness
def initialize(io, interests, reactor)
@io = io

View File

@ -8,7 +8,7 @@ module HTTPX
def initialize(options = {}, &blk)
@options = self.class.default_options.merge(options)
@responses = {}
@keep_open = false
@persistent = @options.persistent
wrap(&blk) if block_given?
end
@ -16,11 +16,11 @@ module HTTPX
return unless block_given?
begin
prev_keep_open = @keep_open
@keep_open = true
prev_persistent = @persistent
@persistent = true
yield self
ensure
@keep_open = prev_keep_open
@persistent = prev_persistent
end
end
@ -29,8 +29,8 @@ module HTTPX
end
def request(*args, **options)
requests = __build_reqs(*args, options)
responses = __send_reqs(*requests, options)
requests = build_requests(*args, options)
responses = send_requests(*requests, options)
return responses.first if responses.size == 1
responses
@ -55,14 +55,17 @@ module HTTPX
@responses.delete(request)
end
def find_connection(request, options)
def find_connection(request, connections, options)
uri = URI(request.uri)
pool.find_connection(uri, options) || build_connection(uri, options)
connection = pool.find_connection(uri, options) || build_connection(uri, options)
unless connections.nil? || connections.include?(connection)
connections << connection
set_connection_callbacks(connection, options)
end
connection
end
def set_connection_callbacks(connection, options)
connection.on(:response, &method(:on_response))
connection.on(:promise, &method(:on_promise))
connection.on(:uncoalesce) do |uncoalesced_uri|
other_connection = build_connection(uncoalesced_uri, options)
connection.unmerge(other_connection)
@ -72,13 +75,6 @@ module HTTPX
end
end
def build_connection(uri, options)
connection = __build_connection(uri, options)
pool.init_connection(connection, options)
set_connection_callbacks(connection, options)
connection
end
def build_altsvc_connection(existing_connection, alt_origin, origin, alt_params, options)
altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin))
@ -89,18 +85,20 @@ module HTTPX
# advertised altsvc is the same origin being used, ignore
return if connection == existing_connection
set_connection_callbacks(connection, options)
log(level: 1) { "#{origin} alt-svc: #{alt_origin}" }
# get uninitialized requests
# incidentally, all requests will be re-routed to the first
# advertised alt-svc, which incidentally follows the spec.
existing_connection.purge_pending do |request, args|
existing_connection.purge_pending do |request|
is_idle = request.origin == origin &&
request.state == :idle &&
!request.headers.key?("alt-used")
if is_idle
log(level: 1) { "#{origin} alt-svc: sending #{request.uri} to #{alt_origin}" }
connection.send(request, args)
connection.send(request)
end
is_idle
end
@ -108,23 +106,23 @@ module HTTPX
altsvc["noop"] = true
end
def __build_reqs(*args, options)
def build_requests(*args, options)
request_options = @options.merge(options)
requests = case args.size
when 1
reqs = args.first
reqs.map do |verb, uri|
__build_req(verb, uri, request_options)
build_request(verb, uri, request_options)
end
when 2, 3
verb, uris = args
if uris.respond_to?(:each)
uris.map do |uri, **opts|
__build_req(verb, uri, request_options.merge(opts))
build_request(verb, uri, request_options.merge(opts))
end
else
[__build_req(verb, uris, request_options)]
[build_request(verb, uris, request_options)]
end
else
raise ArgumentError, "unsupported number of arguments"
@ -134,7 +132,7 @@ module HTTPX
requests
end
def __build_connection(uri, options)
def build_connection(uri, options)
type = options.transport || begin
case uri.scheme
when "http"
@ -148,17 +146,18 @@ module HTTPX
raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
end
end
options.connection_class.new(type, uri, options)
connection = options.connection_class.new(type, uri, options)
pool.init_connection(connection, options)
connection
end
def __send_reqs(*requests, options)
def send_requests(*requests, options)
connections = []
request_options = @options.merge(options)
timeout = request_options.timeout
requests.each do |request|
connection = find_connection(request, request_options)
connections << connection unless connections.include?(connection)
connection = find_connection(request, connections, request_options)
connection.send(request)
end
@ -179,13 +178,16 @@ module HTTPX
end
responses
ensure
close(connections) unless @keep_open
close(connections) unless @persistent
end
end
def __build_req(verb, uri, options)
def build_request(verb, uri, options)
rklass = @options.request_class
rklass.new(verb, uri, @options.merge(options))
request = rklass.new(verb, uri, @options.merge(options))
request.on(:response, &method(:on_response).curry[request])
request.on(:promise, &method(:on_promise))
request
end
@default_options = Options.new

View File

@ -16,7 +16,7 @@ class HTTPTest < Minitest::Test
include Timeouts
include Errors
include Plugins::Proxy
include Plugins::Proxy unless ENV.key?("HTTPX_NO_PROXY")
include Plugins::Authentication
include Plugins::FollowRedirects
include Plugins::Cookies

View File

@ -14,7 +14,7 @@ class HTTPSTest < Minitest::Test
include Timeouts
include Errors
include Plugins::Proxy
include Plugins::Proxy unless ENV.key?("HTTPX_NO_PROXY")
include Plugins::Authentication
include Plugins::FollowRedirects
include Plugins::Cookies

View File

@ -78,6 +78,7 @@ class OptionsTest < Minitest::Test
:connection_class => bar.connection_class,
:transport => nil,
:transport_options => nil,
:persistent => false,
:resolver_class => bar.resolver_class,
:resolver_options => bar.resolver_options,
}, "options haven't merged correctly"

View File

@ -13,6 +13,8 @@ module HTTPHelpers
end
def json_body(response)
raise response.status if response.is_a?(HTTPX::ErrorResponse)
JSON.parse(response.body.to_s)
end

View File

@ -47,7 +47,8 @@ module Requests
.max_redirects(1)
.with(follow_insecure_redirects: true)
insecure_response = insecure_session.get(insecure_redirect_uri)
assert insecure_response.is_a?(HTTPX::Response), "request should follow insecure URLs"
assert insecure_response.is_a?(HTTPX::Response),
"request should follow insecure URLs (instead: #{insecure_response.status})"
end
private