mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-10-04 00:00:37 -04:00
implement per-origin connection threshold per pool
defaulting to unbounded, in order to preserve current behaviour; this will cap the number of connections initiated for a given origin for a pool, which if not shared, will be per-origin; this will include connections from separate option profiles a pool timeout is defined to checkout a connection when limit is reeached
This commit is contained in:
parent
d77e97d31d
commit
1f9dcfb353
@ -29,6 +29,18 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
# Raise when it can't acquire a connection for a given origin.
|
||||
class PoolTimeoutError < TimeoutError
|
||||
attr_reader :origin
|
||||
|
||||
# initializes the +origin+ it refers to, and the
|
||||
# +timeout+ causing the error.
|
||||
def initialize(origin, timeout)
|
||||
@origin = origin
|
||||
super(timeout, "Timed out after #{timeout} seconds while waiting for a connection to #{origin}")
|
||||
end
|
||||
end
|
||||
|
||||
# Error raised when there was a timeout establishing the connection to a server.
|
||||
# This may be raised due to timeouts during TCP and TLS (when applicable) connection
|
||||
# establishment.
|
||||
|
@ -7,18 +7,32 @@ require "httpx/resolver"
|
||||
module HTTPX
|
||||
class Pool
|
||||
using ArrayExtensions::FilterMap
|
||||
using URIExtensions
|
||||
|
||||
POOL_TIMEOUT = 5
|
||||
|
||||
# Sets up the connection pool with the given +options+, which can be the following:
|
||||
#
|
||||
# :max_connections_per_origin :: the maximum number of connections held in the pool pointing to a given origin.
|
||||
# :pool_timeout :: the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError)
|
||||
#
|
||||
def initialize(options)
|
||||
@options = options
|
||||
@pool_options = options.pool_options
|
||||
@max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY)
|
||||
@pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT)
|
||||
@resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] }
|
||||
@resolver_mtx = Thread::Mutex.new
|
||||
@connections = []
|
||||
@connection_mtx = Thread::Mutex.new
|
||||
@origin_counters = Hash.new(0)
|
||||
@origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new }
|
||||
end
|
||||
|
||||
def pop_connection
|
||||
@connection_mtx.synchronize { @connections.shift }
|
||||
@connection_mtx.synchronize do
|
||||
conn = @connections.shift
|
||||
@origin_conds.delete(conn.origin) if conn && ((@origin_counters[conn.origin.to_s] -= 1) == 0)
|
||||
conn
|
||||
end
|
||||
end
|
||||
|
||||
# opens a connection to the IP reachable through +uri+.
|
||||
@ -29,19 +43,29 @@ module HTTPX
|
||||
return checkout_new_connection(uri, options) if options.io
|
||||
|
||||
@connection_mtx.synchronize do
|
||||
conn = @connections.find do |connection|
|
||||
connection.match?(uri, options)
|
||||
end
|
||||
@connections.delete(conn) if conn
|
||||
acquire_connection(uri, options) || begin
|
||||
if @origin_counters[uri.origin] == @max_connections_per_origin
|
||||
|
||||
conn
|
||||
end || checkout_new_connection(uri, options)
|
||||
@origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout)
|
||||
|
||||
return acquire_connection(uri, options) || raise(PoolTimeoutError.new(uri.origin, @pool_timeout))
|
||||
end
|
||||
|
||||
@origin_counters[uri.origin] += 1
|
||||
|
||||
checkout_new_connection(uri, options)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def checkin_connection(connection, delete = false)
|
||||
def checkin_connection(connection)
|
||||
return if connection.options.io
|
||||
|
||||
@connection_mtx.synchronize { @connections << connection } unless delete
|
||||
@connection_mtx.synchronize do
|
||||
@connections << connection
|
||||
|
||||
@origin_conds[connection.origin.to_s].signal
|
||||
end
|
||||
end
|
||||
|
||||
def checkout_mergeable_connection(connection)
|
||||
@ -89,6 +113,16 @@ module HTTPX
|
||||
|
||||
private
|
||||
|
||||
def acquire_connection(uri, options)
|
||||
conn = @connections.find do |connection|
|
||||
connection.match?(uri, options)
|
||||
end
|
||||
|
||||
@connections.delete(conn) if conn
|
||||
|
||||
conn
|
||||
end
|
||||
|
||||
def checkout_new_connection(uri, options)
|
||||
options.connection_class.new(uri, options)
|
||||
end
|
||||
|
@ -17,7 +17,7 @@ module HTTPX
|
||||
@options = self.class.default_options.merge(options)
|
||||
@responses = {}
|
||||
@persistent = @options.persistent
|
||||
@pool = @options.pool_class.new(@options)
|
||||
@pool = @options.pool_class.new(@options.pool_options)
|
||||
@wrapped = false
|
||||
@closing = false
|
||||
wrap(&blk) if blk
|
||||
@ -242,9 +242,13 @@ module HTTPX
|
||||
|
||||
# sends the +request+ to the corresponding HTTPX::Connection
|
||||
def send_request(request, selector, options = request.options)
|
||||
error = catch(:resolve_error) do
|
||||
connection = find_connection(request.uri, selector, options)
|
||||
connection.send(request)
|
||||
error = begin
|
||||
catch(:resolve_error) do
|
||||
connection = find_connection(request.uri, selector, options)
|
||||
connection.send(request)
|
||||
end
|
||||
rescue StandardError => e
|
||||
e
|
||||
end
|
||||
return unless error.is_a?(Error)
|
||||
|
||||
|
@ -17,6 +17,12 @@ module HTTPX
|
||||
def initialize: (Numeric timeout, String message) -> untyped
|
||||
end
|
||||
|
||||
class PoolTimeoutError < TimeoutError
|
||||
attr_reader origin: String
|
||||
|
||||
def initialize: (String origin, Numeric timeout) -> void
|
||||
end
|
||||
|
||||
class ConnectTimeoutError < TimeoutError
|
||||
end
|
||||
|
||||
|
@ -113,7 +113,7 @@ module HTTPX
|
||||
attr_reader resolver_options: Hash[Symbol, untyped]
|
||||
|
||||
# resolver_options
|
||||
attr_reader pool_options: Hash[Symbol, untyped]
|
||||
attr_reader pool_options: pool_options
|
||||
|
||||
# ip_families
|
||||
attr_reader ip_families: Array[ip_family]
|
||||
|
17
sig/pool.rbs
17
sig/pool.rbs
@ -1,18 +1,27 @@
|
||||
module HTTPX
|
||||
type pool_options = {
|
||||
max_connections_per_origin: Integer?,
|
||||
pool_timeout: Numeric?
|
||||
}
|
||||
|
||||
class Pool
|
||||
type resolver_manager = Resolver::Multi | Resolver::System
|
||||
|
||||
@max_connections_per_origin: Integer
|
||||
@pool_timeout: Numeric
|
||||
@options: Options
|
||||
@resolvers: Hash[Class, Array[resolver_manager]]
|
||||
@resolver_mtx: Thread::Mutex
|
||||
@connections: Array[Connection]
|
||||
@connections: Hash[String, Array[Connection]]
|
||||
@connection_mtx: Thread::Mutex
|
||||
@origin_counters: Hash[String, Integer]
|
||||
@origin_conds: Hash[String, ConditionVariable]
|
||||
|
||||
def pop_connection: () -> Connection?
|
||||
|
||||
def checkout_connection: (http_uri uri, Options options) -> Connection
|
||||
|
||||
def checkin_connection: (Connection connection, ?boolish delete) -> void
|
||||
def checkin_connection: (Connection connection) -> void
|
||||
|
||||
def checkout_mergeable_connection: (Connection connection) -> Connection?
|
||||
|
||||
@ -24,7 +33,9 @@ module HTTPX
|
||||
|
||||
private
|
||||
|
||||
def initialize: (Options options) -> void
|
||||
def initialize: (pool_options options) -> void
|
||||
|
||||
def acquire_connection: (http_uri, Options options) -> Connection?
|
||||
|
||||
def checkout_new_connection: (http_uri uri, Options options) -> Connection
|
||||
|
||||
|
@ -4,8 +4,88 @@ require_relative "test_helper"
|
||||
|
||||
class PoolTest < Minitest::Test
|
||||
include HTTPHelpers
|
||||
include HTTPX
|
||||
|
||||
# TODO: add connection pool tests
|
||||
using URIExtensions
|
||||
|
||||
def test_pool_max_connections_per_origin
|
||||
uri = URI(build_uri("/"))
|
||||
responses = []
|
||||
q = Queue.new
|
||||
mtx = Thread::Mutex.new
|
||||
|
||||
pool = Pool.new(max_connections_per_origin: 2)
|
||||
def pool.connections
|
||||
@connections
|
||||
end
|
||||
|
||||
def pool.origin_counters
|
||||
@origin_counters
|
||||
end
|
||||
ths = 3.times.map do |_i|
|
||||
Thread.start do
|
||||
HTTPX.with(pool_options: { max_connections_per_origin: 2, pool_timeout: 30 }) do |http|
|
||||
http.instance_variable_set(:@pool, pool)
|
||||
response = http.get(uri)
|
||||
mtx.synchronize { responses << response }
|
||||
q.pop
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
not_after = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 3
|
||||
until (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) > not_after || q.num_waiting == 2
|
||||
ths.first(&:alive?).join(not_after - now)
|
||||
end
|
||||
|
||||
assert pool.connections.empty?, "thread sessions should still be holding to the connections"
|
||||
assert pool.origin_counters[uri.origin] <= 2
|
||||
|
||||
3.times { q << :done }
|
||||
ths.each(&:join)
|
||||
|
||||
assert responses.size == 3
|
||||
responses.each do |res|
|
||||
verify_status(res, 200)
|
||||
end
|
||||
end
|
||||
|
||||
def test_pool_pool_timeout
|
||||
uri = URI(build_uri("/"))
|
||||
q = Queue.new
|
||||
Thread::Mutex.new
|
||||
|
||||
pool = Pool.new(max_connections_per_origin: 2, pool_timeout: 1)
|
||||
|
||||
ths = 3.times.map do |_i|
|
||||
Thread.start do
|
||||
res = nil
|
||||
HTTPX.with(pool_options: { max_connections_per_origin: 2, pool_timeout: 1 }) do |http|
|
||||
begin
|
||||
http.instance_variable_set(:@pool, pool)
|
||||
res = http.get(uri).tap { q.pop }
|
||||
rescue StandardError => e
|
||||
res = e
|
||||
end
|
||||
end
|
||||
res
|
||||
end
|
||||
end
|
||||
|
||||
not_after = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 3
|
||||
until (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) > not_after || q.num_waiting == 2
|
||||
ths.first(&:alive?).join(not_after - now)
|
||||
end
|
||||
sleep 1
|
||||
3.times { q << :done }
|
||||
ths.each(&:join)
|
||||
|
||||
results = ths.map(&:value)
|
||||
|
||||
assert(results.one?(ErrorResponse))
|
||||
err_res = results.find { |r| r.is_a?(ErrorResponse) }
|
||||
verify_error_response(err_res, PoolTimeoutError)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
|
@ -4,6 +4,7 @@ require_relative "test_helper"
|
||||
require "httpx/plugins/proxy"
|
||||
|
||||
class ProxyTest < Minitest::Test
|
||||
include HTTPHelpers
|
||||
include HTTPX
|
||||
|
||||
def test_parameters_equality
|
||||
@ -29,10 +30,9 @@ class ProxyTest < Minitest::Test
|
||||
end
|
||||
|
||||
def test_proxy_unsupported_scheme
|
||||
ex = assert_raises(HTTPX::HTTPProxyError) do
|
||||
HTTPX.plugin(:proxy).with_proxy(uri: "https://proxy:123").get("http://smth.com")
|
||||
end
|
||||
assert ex.message == "https: unsupported proxy protocol"
|
||||
res = HTTPX.plugin(:proxy).with_proxy(uri: "https://proxy:123").get("http://smth.com")
|
||||
verify_error_response(res, HTTPX::HTTPProxyError)
|
||||
verify_error_response(res, "https: unsupported proxy protocol")
|
||||
end
|
||||
|
||||
private
|
||||
|
@ -13,7 +13,8 @@ module Requests
|
||||
def test_plugin_no_proxy_defined
|
||||
http = HTTPX.plugin(:proxy)
|
||||
uri = build_uri("/get")
|
||||
assert_raises(HTTPX::HTTPProxyError) { http.with_proxy(uri: []).get(uri) }
|
||||
res = http.with_proxy(uri: []).get(uri)
|
||||
verify_error_response(res, HTTPX::HTTPProxyError)
|
||||
end
|
||||
|
||||
def test_plugin_http_http_proxy
|
||||
|
@ -163,7 +163,7 @@ module Requests
|
||||
response = session.get(uri, resolver_class: resolver_type, resolver_options: options.merge(resolver_opts))
|
||||
verify_status(response, 200)
|
||||
|
||||
resolver = session.pool.resolver
|
||||
resolver = session.resolver
|
||||
assert resolver.instance_variable_get(:@ns_index) == 1
|
||||
end
|
||||
end
|
||||
|
@ -2,20 +2,7 @@
|
||||
|
||||
module SessionWithPool
|
||||
module PoolMethods
|
||||
include HTTPX
|
||||
|
||||
attr_reader :resolvers
|
||||
|
||||
def resolver
|
||||
resolver_type = @options.resolver_class
|
||||
resolver_type = Resolver.resolver_for(resolver_type)
|
||||
|
||||
resolver = @resolvers[resolver_type].first
|
||||
|
||||
resolver = resolver.resolvers[0] if resolver.is_a?(Resolver::Multi)
|
||||
|
||||
resolver
|
||||
end
|
||||
end
|
||||
|
||||
module InstanceMethods
|
||||
@ -29,6 +16,16 @@ module SessionWithPool
|
||||
super
|
||||
end
|
||||
|
||||
def resolver
|
||||
resolver_type = HTTPX::Resolver.resolver_for(@options.resolver_class)
|
||||
|
||||
resolver = @pool.resolvers[resolver_type].first
|
||||
|
||||
resolver = resolver.resolvers[0] if resolver.is_a?(HTTPX::Resolver::Multi)
|
||||
|
||||
resolver
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def do_init_connection(connection, *)
|
||||
|
Loading…
x
Reference in New Issue
Block a user