pool: allowing it to load more than one resolver per pool, depending on connection conditions; still reusing it whenever possible (right now, per type, which is still wrong, if we want to connect, let's say, to more than a different DNS server with the same type)

This commit is contained in:
HoneyryderChuck 2019-04-27 15:51:37 +00:00
parent e711c8f812
commit ce9f55c96e
26 changed files with 423 additions and 366 deletions

View File

@ -1,6 +1,6 @@
version: '3'
services:
httpx:
image: jruby:9.2.6-alpine
image: jruby:9.2.7-alpine
environment:
- JRUBY_OPTS=--debug

View File

@ -19,10 +19,6 @@ module HTTPX
protected
def inherit_callbacks(callbackable)
@callbacks = callbackable.callbacks
end
def callbacks(type = nil)
return @callbacks unless type

View File

@ -63,7 +63,9 @@ module HTTPX
def_delegator :@write_buffer, :empty?
attr_reader :uri, :state, :pending
attr_reader :uri, :state, :pending, :options
attr_reader :timeout
def initialize(type, uri, options)
@type = type
@ -158,6 +160,10 @@ module HTTPX
@state == :idle
end
def inflight?
@parser && !@parser.empty?
end
def interests
return :w if @state == :idle
@ -221,15 +227,22 @@ module HTTPX
end
def handle_timeout_error(e)
return emit(:error, e) unless @timeout
case e
when TotalTimeoutError
# return unless @options.timeout.no_time_left?
@timeout -= e.timeout
return unless @timeout <= 0
if connecting?
emit(:error, e.to_connection_error)
else
emit(:error, e)
when TimeoutError
return emit(:error, e) unless @timeout
@timeout -= e.timeout
return unless @timeout <= 0
if connecting?
emit(:error, e.to_connection_error)
else
emit(:error, e)
end
end
end
@ -290,11 +303,16 @@ module HTTPX
def build_parser(protocol = @io.protocol)
parser = registry(protocol).new(@write_buffer, @options)
parser.on(:response) do |*args|
AltSvc.emit(*args) do |alt_origin, origin, alt_params|
set_parser_callbacks(parser)
parser
end
def set_parser_callbacks(parser)
parser.on(:response) do |request, response|
AltSvc.emit(request, response) do |alt_origin, origin, alt_params|
emit(:altsvc, alt_origin, origin, alt_params)
end
emit(:response, *args)
emit(:response, request, response)
end
parser.on(:altsvc) do |alt_origin, origin, alt_params|
emit(:altsvc, alt_origin, origin, alt_params)
@ -327,7 +345,6 @@ module HTTPX
emit(:response, request, response)
end
end
parser
end
def transition(nextstate)

View File

@ -233,7 +233,9 @@ module HTTPX
if error
ex = Error.new(0, error)
ex.set_backtrace(caller)
emit(:error, request, ex)
@streams.each_key do |request|
emit(:error, request, ex)
end
end
return unless @connection.state == :closed && @connection.active_stream_count.zero?

View File

@ -20,6 +20,8 @@ module HTTPX
end
end
TotalTimeoutError = Class.new(TimeoutError)
ConnectTimeoutError = Class.new(TimeoutError)
ResolveError = Class.new(Error)

View File

@ -55,35 +55,26 @@ module HTTPX
end
module URIExtensions
URI_KLASSES = case RUBY_ENGINE
when "jruby"
[URI::Generic, URI::HTTP, URI::HTTPS]
else
[URI::Generic]
end
refine URI::Generic do
def authority
port_string = port == default_port ? nil : ":#{port}"
"#{host}#{port_string}"
end
URI_KLASSES.each do |klass|
refine klass do
def authority
port_string = port == default_port ? nil : ":#{port}"
"#{host}#{port_string}"
end
def origin
"#{scheme}://#{authority}"
end
def origin
"#{scheme}://#{authority}"
end
def altsvc_match?(uri)
uri = URI.parse(uri)
self == uri || begin
case scheme
when 'h2'
uri.scheme == "https" &&
host == uri.host &&
(port || default_port) == (uri.port || uri.default_port)
else
false
end
def altsvc_match?(uri)
uri = URI.parse(uri)
self == uri || begin
case scheme
when 'h2'
uri.scheme == "https" &&
host == uri.host &&
(port || default_port) == (uri.port || uri.default_port)
else
false
end
end
end

View File

@ -32,7 +32,9 @@ module HTTPX
protected :"#{name}="
define_method(:"with_#{name}") do |value|
dup { |opts| opts.send(:"#{name}=", instance_exec(value, &interpreter)) }
other = dup
other.send(:"#{name}=", other.instance_exec(value, &interpreter))
other
end
end
end
@ -126,17 +128,26 @@ module HTTPX
Hash[*hash_pairs]
end
def dup
dupped = super
dupped.headers = headers.dup
dupped.ssl = ssl.dup
dupped.request_class = request_class.dup
dupped.response_class = response_class.dup
dupped.headers_class = headers_class.dup
dupped.request_body_class = request_body_class.dup
dupped.response_body_class = response_body_class.dup
yield(dupped) if block_given?
dupped
def initialize_dup(other)
self.headers = other.headers.dup
self.ssl = other.ssl.dup
self.request_class = other.request_class.dup
self.response_class = other.response_class.dup
self.headers_class = other.headers_class.dup
self.request_body_class = other.request_body_class.dup
self.response_body_class = other.response_body_class.dup
end
def freeze
super
headers.freeze
ssl.freeze
request_class.freeze
response_class.freeze
headers_class.freeze
request_body_class.freeze
response_body_class.freeze
end
protected

View File

@ -9,10 +9,8 @@ module HTTPX
klass.plugin(:"compression/deflate")
end
module InstanceMethods
def initialize(opts = {})
super(opts.merge(headers: { "accept-encoding" => Compression.registry.keys }))
end
def self.extra_options(options)
options.merge(headers: { "accept-encoding" => Compression.registry.keys })
end
module RequestBodyMethods

View File

@ -5,6 +5,16 @@ module HTTPX
module Cookies
using URIExtensions
def self.extra_options(options)
Class.new(options.class) do
def_option(:cookies) do |cookies|
return cookies if cookies.is_a?(Store)
Store.new(cookies)
end
end.new(options)
end
class Store
def initialize(cookies = nil)
@store = Hash.new { |hash, origin| hash[origin] = HTTP::CookieJar.new }
@ -75,7 +85,7 @@ module HTTPX
super
end
def __build_req(*)
def __build_req(*, _)
request = super
request.headers.cookies(@cookies_store[request.uri], request)
request
@ -92,17 +102,6 @@ module HTTPX
add("cookie", cookie_value)
end
end
module OptionsMethods
def self.included(klass)
super
klass.def_option(:cookies) do |cookies|
return cookies if cookies.is_a?(Store)
Store.new(cookies)
end
end
end
end
register_plugin :cookies, Cookies
end

View File

@ -17,34 +17,36 @@ module HTTPX
end
alias_method :digest_auth, :digest_authentication
def request(*args, keep_open: @keep_open, **options)
def request(*args, **options)
return super unless @_digest
begin
requests = __build_reqs(*args, **options)
probe_request = requests.first
prev_response = __send_reqs(*probe_request).first
requests = __build_reqs(*args, options)
probe_request = requests.first
prev_response = wrap { __send_reqs(*probe_request, options).first }
unless prev_response.status == 401
raise Error, "request doesn't require authentication (status: #{prev_response})"
end
probe_request.transition(:idle)
responses = []
requests.each do |request|
token = @_digest.generate_header(request, prev_response)
request.headers["authorization"] = "Digest #{token}"
response = __send_reqs(*request).first
responses << response
prev_response = response
end
return responses.first if responses.size == 1
responses
ensure
close unless keep_open
unless prev_response.status == 401
raise Error, "request doesn't require authentication (status: #{prev_response})"
end
probe_request.transition(:idle)
responses = []
while (request = requests.shift)
token = @_digest.generate_header(request, prev_response)
request.headers["authorization"] = "Digest #{token}"
response = if requests.empty?
__send_reqs(*request, options).first
else
wrap { __send_reqs(*request, options).first }
end
responses << response
prev_response = response
end
return responses.first if responses.size == 1
responses
end
end

View File

@ -4,76 +4,65 @@ module HTTPX
InsecureRedirectError = Class.new(Error)
module Plugins
module FollowRedirects
module InstanceMethods
MAX_REDIRECTS = 3
REDIRECT_STATUS = (300..399).freeze
MAX_REDIRECTS = 3
REDIRECT_STATUS = (300..399).freeze
def self.extra_options(options)
Class.new(options.class) do
def_option(:max_redirects) do |num|
num = Integer(num)
raise Error, ":max_redirects must be positive" unless num.positive?
num
end
def_option(:follow_insecure_redirects)
end.new(options)
end
module InstanceMethods
def max_redirects(n)
branch(default_options.with_max_redirects(n.to_i))
end
def request(*args, **options)
# do not needlessly close connections
keep_open = @keep_open
@keep_open = true
max_redirects = @options.max_redirects || MAX_REDIRECTS
requests = __build_reqs(*args, **options)
responses = __send_reqs(*requests)
loop do
redirect_requests = []
indexes = responses.each_with_index.map do |response, index|
next unless REDIRECT_STATUS.include?(response.status)
request = requests[index]
retry_request = __build_redirect_req(request, response, options)
redirect_requests << retry_request
index
end.compact
break if redirect_requests.empty?
break if max_redirects <= 0
max_redirects -= 1
redirect_responses = __send_reqs(*redirect_requests)
indexes.each_with_index do |index, i2|
requests[index] = redirect_requests[i2]
responses[index] = redirect_responses[i2]
end
end
return responses.first if responses.size == 1
responses
ensure
@keep_open = keep_open
end
private
def fetch_response(request)
response = super
if response &&
REDIRECT_STATUS.include?(response.status) &&
!@options.follow_insecure_redirects
redirect_uri = __get_location_from_response(response)
if response.uri.scheme == "https" &&
redirect_uri.scheme == "http"
error = InsecureRedirectError.new(redirect_uri.to_s)
error.set_backtrace(caller)
response = ErrorResponse.new(error, @options)
end
def fetch_response(request, connections, options)
redirect_request = request.redirect_request
response = super(redirect_request, connections, options)
return unless response
max_redirects = redirect_request.max_redirects
return response unless REDIRECT_STATUS.include?(response.status)
return response unless max_redirects.positive?
retry_request = __build_redirect_req(redirect_request, response, options)
request.redirect_request = retry_request
if !options.follow_insecure_redirects &&
response.uri.scheme == "https" &&
retry_request.uri.scheme == "http"
error = InsecureRedirectError.new(retry_request.uri.to_s)
error.set_backtrace(caller)
return ErrorResponse.new(error, options)
end
response
connection = find_connection(retry_request, options)
connections << connection unless connections.include?(connection)
connection.send(retry_request)
nil
end
def __build_redirect_req(request, response, options)
redirect_uri = __get_location_from_response(response)
max_redirects = request.max_redirects
# redirects are **ALWAYS** GET
retry_options = options.merge(headers: request.headers,
body: request.body)
body: request.body,
max_redirects: max_redirects - 1)
__build_req(:get, redirect_uri, retry_options)
end
@ -84,17 +73,17 @@ module HTTPX
end
end
module OptionsMethods
module RequestMethods
def self.included(klass)
super
klass.def_option(:max_redirects) do |num|
num = Integer(num)
raise Error, ":max_redirects must be positive" unless num.positive?
klass.__send__(:attr_writer, :redirect_request)
end
num
end
def redirect_request
@redirect_request || self
end
klass.def_option(:follow_insecure_redirects)
def max_redirects
@options.max_redirects || MAX_REDIRECTS
end
end
end

View File

@ -8,11 +8,11 @@ module HTTPX
end
module InstanceMethods
def request(*args, keep_open: @keep_open, **options)
def request(*args, **options)
return super if @_h2c_probed
begin
requests = __build_reqs(*args, **options)
requests = __build_reqs(*args, options)
upgrade_request = requests.first
return super unless valid_h2c_upgrade_request?(upgrade_request)
@ -20,32 +20,37 @@ module HTTPX
upgrade_request.headers["upgrade"] = "h2c"
upgrade_request.headers.add("connection", "upgrade")
upgrade_request.headers.add("connection", "http2-settings")
upgrade_request.headers["http2-settings"] = HTTP2::Client.settings_header(@options.http2_settings)
upgrade_response = __send_reqs(*upgrade_request, **options).first
upgrade_request.headers["http2-settings"] = HTTP2::Client.settings_header(upgrade_request.http2_settings)
upgrade_response = wrap { __send_reqs(*upgrade_request, options).first }
if upgrade_response.status == 101
connection = find_connection(upgrade_request)
# if 101, assume that connection exists and was kept open
connection = find_connection(upgrade_request, options)
parser = connection.upgrade_parser("h2")
parser.extend(UpgradeExtensions)
parser.upgrade(upgrade_request, upgrade_response, **options)
parser.upgrade(upgrade_request, upgrade_response, **upgrade_request.options)
# clean up data left behind in the buffer, if the server started
# sending frames
data = upgrade_response.to_s
parser << data
response = upgrade_request.response
if response.status == 200
requests.delete(upgrade_request)
return response if requests.empty?
end
responses = __send_reqs(*requests)
responses = __send_reqs(*requests, options)
else
# proceed as usual
responses = [upgrade_response] + __send_reqs(*requests[1..-1])
responses = [upgrade_response] + __send_reqs(*requests[1..-1], options)
end
return responses.first if responses.size == 1
responses
ensure
@_h2c_probed = true
close unless keep_open
end
end
@ -60,6 +65,13 @@ module HTTPX
end
end
module RequestMethods
def self.included(klass)
klass.__send__(:attr_reader, :options)
klass.def_delegator :@options, :http2_settings
end
end
module UpgradeExtensions
def upgrade(request, _response, **)
@connection.send_connection_preface

View File

@ -8,6 +8,21 @@ module HTTPX
module Plugins
module Proxy
Error = Class.new(Error)
def self.configure(klass, *)
klass.plugin(:"proxy/http")
klass.plugin(:"proxy/socks4")
klass.plugin(:"proxy/socks5")
end
def self.extra_options(options)
Class.new(options.class) do
def_option(:proxy) do |pr|
Hash[pr]
end
end.new(options)
end
class Parameters
extend Registry
@ -35,45 +50,45 @@ module HTTPX
private
def proxy_params(uri)
def proxy_params(uri, options)
@_proxy_uris ||= begin
uris = @options.proxy ? Array(@options.proxy[:uri]) : []
uris = options.proxy ? Array(options.proxy[:uri]) : []
if uris.empty?
uri = URI(uri).find_proxy
uris << uri if uri
end
uris
end
@options.proxy.merge(uri: @_proxy_uris.shift) unless @_proxy_uris.empty?
options.proxy.merge(uri: @_proxy_uris.shift) unless @_proxy_uris.empty?
end
def find_connection(request, **options)
def find_connection(request, options)
uri = URI(request.uri)
proxy = proxy_params(uri)
raise Error, "Failed to connect to proxy" unless proxy
proxy_params = proxy_params(uri, options)
raise Error, "Failed to connect to proxy" unless proxy_params
@pool.find_connection(proxy) || build_connection(proxy, options)
@pool.find_connection(URI(proxy_params[:uri])) || build_connection(proxy_params, options)
end
def build_connection(proxy, options)
return super if proxy.is_a?(URI::Generic)
connection = build_proxy_connection(proxy, **options)
connection = build_proxy_connection(proxy, options)
set_connection_callbacks(connection, options)
connection
end
def build_proxy_connection(proxy, **options)
parameters = Parameters.new(**proxy)
def build_proxy_connection(params, options)
parameters = Parameters.new(**params)
uri = parameters.uri
log { "proxy: #{uri}" }
proxy_type = Parameters.registry(parameters.uri.scheme)
connection = proxy_type.new("tcp", uri, parameters, @options.merge(options), &method(:on_response))
connection = proxy_type.new("tcp", uri, parameters, options, &method(:on_response))
@pool.__send__(:resolve_connection, connection)
connection
end
def fetch_response(request)
def fetch_response(request, connections, options)
response = super
if response.is_a?(ErrorResponse) &&
# either it was a timeout error connecting, or it was a proxy error
@ -81,28 +96,14 @@ module HTTPX
response.error.is_a?(Error)) &&
!@_proxy_uris.empty?
log { "failed connecting to proxy, trying next..." }
connection = find_connection(request)
connection = find_connection(request, options)
connections << connection unless connections.include?(connection)
connection.send(request)
return
end
response
end
end
module OptionsMethods
def self.included(klass)
super
klass.def_option(:proxy) do |pr|
Hash[pr]
end
end
end
def self.configure(klass, *)
klass.plugin(:"proxy/http")
klass.plugin(:"proxy/socks4")
klass.plugin(:"proxy/socks5")
end
end
register_plugin :proxy, Proxy
end

View File

@ -48,7 +48,7 @@ module HTTPX
@parser = nil
when :idle
@parser = ProxyParser.new(@write_buffer, @options)
@parser.inherit_callbacks(self)
set_parser_callbacks(@parser)
@parser.on(:close) { transition(:closing) }
end
end

View File

@ -11,6 +11,14 @@ module HTTPX
require "net/ssh/gateway"
end
def self.extra_options(options)
Class.new(options.class) do
def_option(:proxy) do |pr|
Hash[pr]
end
end.new(options)
end
module InstanceMethods
def with_proxy(*args)
branch(default_options.with_proxy(*args))
@ -18,32 +26,34 @@ module HTTPX
private
def __send_reqs(*requests, **options)
ssh_options = @options.proxy
def __send_reqs(*requests, options)
request_options = @options.merge(options)
ssh_options = request_options.proxy
ssh_uris = ssh_options.delete(:uri)
ssh_username = ssh_options.delete(:username)
ssh_uri = URI.parse(ssh_uris.shift)
ssh_options[:port] ||= ssh_uri.port || 22
if @options.debug
ssh_options[:verbose] = @options.debug_level == 2 ? :debug : :info
if request_options.debug
ssh_options[:verbose] = request_options.debug_level == 2 ? :debug : :info
end
request_uri = URI(requests.first.uri)
@_gateway = Net::SSH::Gateway.new(ssh_uri.host, ssh_username, ssh_options)
begin
@_gateway.open(request_uri.host, request_uri.port) do |local_port|
io = build_gateway_socket(local_port, request_uri)
super(*requests, **options.merge(io: io))
io = build_gateway_socket(local_port, request_uri, request_options)
super(*requests, options.merge(io: io))
end
ensure
@_gateway.shutdown!
end
end
def build_gateway_socket(port, request_uri)
def build_gateway_socket(port, request_uri, options)
case request_uri.scheme
when "https"
ctx = OpenSSL::SSL::SSLContext.new
ctx_options = SSL::TLS_OPTIONS.merge(@options.ssl)
ctx_options = SSL::TLS_OPTIONS.merge(options.ssl)
ctx.set_params(ctx_options) unless ctx_options.empty?
sock = TCPSocket.open("localhost", port)
io = OpenSSL::SSL::SSLSocket.new(sock, ctx)
@ -59,15 +69,6 @@ module HTTPX
end
end
end
module OptionsMethods
def self.included(klass)
super
klass.def_option(:proxy) do |pr|
Hash[pr]
end
end
end
end
end
register_plugin :"proxy/ssh", Proxy::SSH

View File

@ -6,6 +6,10 @@ module HTTPX
PUSH_OPTIONS = { http2_settings: { settings_enable_push: 1 },
max_concurrent_requests: 1 }.freeze
def self.extra_options(options)
options.merge(PUSH_OPTIONS)
end
module ResponseMethods
def pushed?
@__pushed
@ -17,13 +21,12 @@ module HTTPX
end
module InstanceMethods
def initialize(opts = {})
super(PUSH_OPTIONS.merge(opts))
@promise_headers = {}
end
private
def promise_headers
@promise_headers ||= {}
end
def on_promise(parser, stream)
stream.on(:promise_headers) do |h|
__on_promise_request(parser, stream, h)
@ -43,7 +46,7 @@ module HTTPX
request = parser.pending.find { |r| r.authority == authority && r.path == path }
if request
request.merge_headers(headers)
@promise_headers[stream] = request
promise_headers[stream] = request
parser.pending.delete(request)
else
stream.refuse
@ -51,7 +54,7 @@ module HTTPX
end
def __on_promise_response(parser, stream, h)
request = @promise_headers.delete(stream)
request = promise_headers.delete(stream)
return unless request
parser.__send__(:on_stream_headers, stream, request, h)

View File

@ -6,6 +6,17 @@ module HTTPX
MAX_RETRIES = 3
IDEMPOTENT_METHODS = %i[get options head put delete].freeze
def self.extra_options(options)
Class.new(options.class) do
def_option(:max_retries) do |num|
num = Integer(num)
raise Error, ":max_retries must be positive" unless num.positive?
num
end
end.new(options)
end
module InstanceMethods
def max_retries(n)
branch(default_options.with_max_retries(n.to_i))
@ -13,13 +24,14 @@ module HTTPX
private
def fetch_response(request)
def fetch_response(request, connections, options)
response = super
if response.is_a?(ErrorResponse) &&
request.retries.positive? &&
IDEMPOTENT_METHODS.include?(request.verb)
request.retries -= 1
connection = find_connection(request)
connection = find_connection(request, options)
connections << connection unless connections.include?(connection)
connection.send(request)
return
end
@ -35,18 +47,6 @@ module HTTPX
@retries = @options.max_retries || MAX_RETRIES
end
end
module OptionsMethods
def self.included(klass)
super
klass.def_option(:max_retries) do |num|
num = Integer(num)
raise Error, ":max_retries must be positive" unless num.positive?
num
end
end
end
end
register_plugin :retries, Retries
end

View File

@ -6,27 +6,23 @@ require "httpx/resolver"
module HTTPX
class Pool
def initialize(options)
@options = Options.new(options)
@timeout = options.timeout
resolver_type = @options.resolver_class
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
def initialize
@resolvers = {}
@_resolver_monitors = {}
@selector = Selector.new
@connections = []
@connected_connections = 0
@resolver = resolver_type.new(self, @options)
@resolver.on(:resolve, &method(:on_resolver_connection))
@resolver.on(:error, &method(:on_resolver_error))
@resolver.on(:close, &method(:on_resolver_close))
end
def running?
!@connections.empty?
def empty?
@connections.empty?
end
def next_tick
def next_tick(timeout = nil)
catch(:jump_tick) do
@selector.select(next_timeout) do |monitor|
tout = timeout.total_timeout if timeout
@selector.select(next_timeout || tout) do |monitor|
if (connection = monitor.value)
connection.call
end
@ -34,35 +30,35 @@ module HTTPX
end
end
rescue TimeoutError => timeout_error
@connections.each do |ch|
ch.handle_timeout_error(timeout_error)
@connections.each do |connection|
connection.handle_timeout_error(timeout_error)
end
rescue Errno::ECONNRESET,
Errno::ECONNABORTED,
Errno::EPIPE => ex
@connections.each do |ch|
ch.emit(:error, ex)
@connections.each do |connection|
connection.emit(:error, ex)
end
end
def close
@resolver.close unless @resolver.closed?
@connections.each(&:close)
next_tick until @connections.empty?
def close(connections = @connections)
connections = connections.reject(&:inflight?)
connections.each(&:close)
next_tick until connections.none? { |c| @connections.include?(c) }
@resolvers.each_value do |resolver|
resolver.close unless resolver.closed?
end if @connections.empty?
end
def build_connection(uri, **options)
connection = Connection.by(uri, @options.merge(options))
def build_connection(uri, options)
connection = Connection.by(uri, options)
resolve_connection(connection)
connection.on(:open) do
@connected_connections += 1
@timeout.transition(:open) if @connections.size == @connected_connections
end
connection.on(:reset) do
@timeout.transition(:idle)
end
connection.on(:unreachable) do
@resolver.uncache(connection)
resolver = find_resolver_for(connection)
resolver.uncache(connection) if resolver
resolve_connection(connection)
end
connection
@ -82,12 +78,13 @@ module HTTPX
def resolve_connection(connection)
@connections << connection unless @connections.include?(connection)
@resolver << connection
return if @resolver.empty?
resolver = find_resolver_for(connection)
resolver << connection
return if resolver.empty?
@_resolver_monitor ||= begin # rubocop:disable Naming/MemoizedInstanceVariableName
monitor = @selector.register(@resolver, :w)
monitor.value = @resolver
@_resolver_monitors[resolver] ||= begin
monitor = @selector.register(resolver, :w)
monitor.value = resolver
monitor
end
end
@ -113,10 +110,16 @@ module HTTPX
unregister_connection(ch)
end
def on_resolver_close
@selector.deregister(@resolver)
@_resolver_monitor = nil
@resolver.close unless @resolver.closed?
def on_resolver_close(resolver)
resolver_type = resolver.class
return unless @resolvers[resolver_type] == resolver
@resolvers.delete(resolver_type)
@selector.deregister(resolver)
monitor = @_resolver_monitors.delete(resolver)
monitor.close if monitor
resolver.close unless resolver.closed?
end
def register_connection(connection)
@ -133,8 +136,6 @@ module HTTPX
unregister_connection(connection)
end
return if connection.state == :open
@timeout.transition(:idle)
end
def unregister_connection(connection)
@ -153,10 +154,21 @@ module HTTPX
end
def next_timeout
timeout = @timeout.timeout
return (@resolver.timeout || timeout) unless @resolver.closed?
@resolvers.values.reject(&:closed?).map(&:timeout).min || @connections.map(&:timeout).min
end
timeout
def find_resolver_for(connection)
connection_options = connection.options
resolver_type = connection_options.resolver_class
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
@resolvers[resolver_type] ||= begin
resolver = resolver_type.new(self, connection_options)
resolver.on(:resolve, &method(:on_resolver_connection))
resolver.on(:error, &method(:on_resolver_error))
resolver.on(:close) { on_resolver_close(resolver) }
resolver
end
end
end
end

View File

@ -106,6 +106,7 @@ module HTTPX
emit(:error, connection, ex)
else
@connections << connection
resolve
end
end

View File

@ -7,7 +7,7 @@ module HTTPX
def initialize(options = {}, &blk)
@options = self.class.default_options.merge(options)
@pool = Pool.new(@options)
@pool = Pool.new
@responses = {}
@keep_open = false
wrap(&blk) if block_given?
@ -22,22 +22,19 @@ module HTTPX
yield self
ensure
@keep_open = prev_keep_open
close
end
end
def close
@pool.close
def close(*args)
@pool.close(*args)
end
def request(*args, keep_open: @keep_open, **options)
requests = __build_reqs(*args, **options)
responses = __send_reqs(*requests, **options)
def request(*args, **options)
requests = __build_reqs(*args, options)
responses = __send_reqs(*requests, options)
return responses.first if responses.size == 1
responses
ensure
close unless keep_open
end
private
@ -51,11 +48,11 @@ module HTTPX
stream.refuse
end
def fetch_response(request)
def fetch_response(request, _, _)
@responses.delete(request)
end
def find_connection(request, **options)
def find_connection(request, options)
uri = URI(request.uri)
@pool.find_connection(uri) || build_connection(uri, options)
end
@ -73,7 +70,7 @@ module HTTPX
end
def build_connection(uri, options)
connection = @pool.build_connection(uri, **options)
connection = @pool.build_connection(uri, options)
set_connection_callbacks(connection, options)
connection
end
@ -107,21 +104,23 @@ module HTTPX
altsvc["noop"] = true
end
def __build_reqs(*args, **options)
def __build_reqs(*args, options)
request_options = @options.merge(options)
requests = case args.size
when 1
reqs = args.first
reqs.map do |verb, uri|
__build_req(verb, uri, options)
__build_req(verb, uri, request_options)
end
when 2, 3
verb, uris = args
if uris.respond_to?(:each)
uris.map do |uri, **opts|
__build_req(verb, uri, options.merge(opts))
__build_req(verb, uri, request_options.merge(opts))
end
else
[__build_req(verb, uris, options)]
[__build_req(verb, uris, request_options)]
end
else
raise ArgumentError, "unsupported number of arguments"
@ -131,34 +130,45 @@ module HTTPX
requests
end
def __send_reqs(*requests, **options)
def __send_reqs(*requests, options)
connections = []
request_options = @options.merge(options)
timeout = request_options.timeout
requests.each do |request|
connection = find_connection(request, **options)
connection = find_connection(request, request_options)
connections << connection unless connections.include?(connection)
connection.send(request)
end
responses = []
# guarantee ordered responses
loop do
begin
request = requests.first
@pool.next_tick until (response = fetch_response(request))
begin
# guarantee ordered responses
loop do
begin
request = requests.first
@pool.next_tick(timeout) until (response = fetch_response(request, connections, request_options))
responses << response
requests.shift
responses << response
requests.shift
break if requests.empty? || !@pool.running?
break if requests.empty? || @pool.empty?
end
end
responses
ensure
close(connections) unless @keep_open
end
responses
end
def __build_req(verb, uri, options = {})
def __build_req(verb, uri, options)
rklass = @options.request_class
rklass.new(verb, uri, @options.merge(options))
end
@default_options = Options.new
@default_options.freeze
@plugins = []
class << self
@ -166,7 +176,7 @@ module HTTPX
def inherited(klass)
super
klass.instance_variable_set(:@default_options, @default_options.dup)
klass.instance_variable_set(:@default_options, @default_options)
klass.instance_variable_set(:@plugins, @plugins.dup)
end
@ -176,15 +186,13 @@ module HTTPX
unless @plugins.include?(pl)
@plugins << pl
pl.load_dependencies(self, *args, &block) if pl.respond_to?(:load_dependencies)
@default_options = @default_options.dup
@default_options = pl.extra_options(@default_options) if pl.respond_to?(:extra_options)
include(pl::InstanceMethods) if defined?(pl::InstanceMethods)
extend(pl::ClassMethods) if defined?(pl::ClassMethods)
if defined?(pl::OptionsMethods) || defined?(pl::OptionsClassMethods)
options_klass = Class.new(@default_options.class)
options_klass.extend(pl::OptionsClassMethods) if defined?(pl::OptionsClassMethods)
options_klass.__send__(:include, pl::OptionsMethods) if defined?(pl::OptionsMethods)
@default_options = options_klass.new
end
opts = default_options
opts = @default_options
opts.request_class.__send__(:include, pl::RequestMethods) if defined?(pl::RequestMethods)
opts.request_class.extend(pl::RequestClassMethods) if defined?(pl::RequestClassMethods)
opts.response_class.__send__(:include, pl::ResponseMethods) if defined?(pl::ResponseMethods)
@ -196,6 +204,8 @@ module HTTPX
opts.response_body_class.__send__(:include, pl::ResponseBodyMethods) if defined?(pl::ResponseBodyMethods)
opts.response_body_class.extend(pl::ResponseBodyClassMethods) if defined?(pl::ResponseBodyClassMethods)
pl.configure(self, *args, &block) if pl.respond_to?(:configure)
@default_options.freeze
end
self
end

View File

@ -27,12 +27,10 @@ module HTTPX
@operation_timeout = loop_timeout
end
reset_counter
@state = :idle # this is here not to trigger the log
transition(:idle)
end
def timeout
@timeout || @total_timeout
def total_timeout
@total_timeout
ensure
log_time
end
@ -64,17 +62,8 @@ module HTTPX
end
end
def transition(nextstate)
return if @state == nextstate
case nextstate
# when :idle
when :idle
@timeout = @connect_timeout
when :open
@timeout = @operation_timeout
end
@state = nextstate
def no_time_left?
@time_left <= 0
end
private
@ -92,7 +81,7 @@ module HTTPX
return reset_timer unless @started
@time_left -= (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started)
raise TimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds") if @time_left <= 0
raise TotalTimeoutError.new(@total_timeout, "Timed out after #{@total_timeout} seconds") if no_time_left?
reset_timer
end

View File

@ -65,16 +65,6 @@ class SessionTest < Minitest::Test
@options.response_class.new(*args, @options)
end
end
self::OptionsClassMethods = Module.new do
def foo
"options-foo"
end
end
self::OptionsMethods = Module.new do
def foo
self.class.foo
end
end
self::RequestClassMethods = Module.new do
def foo
"request-foo"
@ -134,6 +124,12 @@ class SessionTest < Minitest::Test
end)
end
def self.extra_options(options)
Class.new(options.class) do
def_option(:foo)
end.new(options).merge(foo: "options-foo")
end
def self.configure(mod)
mod.__send__(:include, Module.new do
def bar

View File

@ -0,0 +1,49 @@
# frozen_string_literal: true
# testing proxies is a drag...
module TestTimeoutDefaults
def new(*)
timeout = super
timeout.instance_variable_set(:@connect_timeout, 5)
timeout
end
end
HTTPX::Timeout.extend(TestTimeoutDefaults)
module MinitestExtensions
module TimeoutForTest
# our own subclass so we never confused different timeouts
TestTimeout = Class.new(Timeout::Error)
def run(*)
::Timeout.timeout(RUBY_ENGINE == "jruby" ? 60 : 30, TestTimeout) { super }
end
end
module FirstFailedTestInThread
def run(*)
(Thread.current[:passed_tests] ||= []) << "#{self.class.name}##{name}"
super
ensure
if !Thread.current[:tests_already_failed] && !failures.empty?
Thread.current[:tests_already_failed] = true
puts "first test failed: #{Thread.current[:passed_tests].pop}\n"
puts "this thread also executed: #{Thread.current[:passed_tests].join(", ")}"
end
end
end
module TestName
def run(*)
print "#{self.class.name}##{name}: "
super
ensure
puts " "
end
end
end
Minitest::Test.prepend(MinitestExtensions::TimeoutForTest) unless ENV.key?("HTTPX_DEBUG")
# Minitest::Test.prepend(MinitestExtensions::FirstFailedTestInThread)
# Minitest::Test.prepend(MinitestExtensions::TestName)

View File

@ -15,7 +15,7 @@ module Requests
session = HTTPX.timeout(operation_timeout: 1, total_timeout: 2)
response = session.get(uri)
assert response.is_a?(HTTPX::ErrorResponse), "response should have failed"
assert response.status =~ /timed out after 2 seconds/i, "response should have timed out"
assert response.status =~ /timed out after \d+ seconds/i, "response should have timed out"
end
# def test_http_timeout_connect_timeout

View File

@ -335,7 +335,7 @@
"strict": true
},
{
"name": "HTTP/1.1 with chunked endocing and a 200 response",
"name": "HTTP/1.1 with chunked encoding and a 200 response",
"type": "HTTP_RESPONSE",
"raw": "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n0\r\n\r\n",
"should_keep_alive": true,

View File

@ -1,24 +0,0 @@
# frozen_string_literal: true
# testing proxies is a drag...
module TestTimeoutDefaults
def new(*)
timeout = super
timeout.instance_variable_set(:@connect_timeout, 5)
timeout
end
end
HTTPX::Timeout.extend(TestTimeoutDefaults)
module TimeoutForTest
# our own subclass so we never confused different timeouts
class TestTimeout < Timeout::Error
end
def run(*)
::Timeout.timeout(RUBY_ENGINE == "jruby" ? 60 : 30, TestTimeout) { super }
end
end
Minitest::Test.prepend(TimeoutForTest) unless ENV.key?("HTTPX_DEBUG")