diff --git a/lib/httpx/errors.rb b/lib/httpx/errors.rb index a13fe049..ec9a7d16 100644 --- a/lib/httpx/errors.rb +++ b/lib/httpx/errors.rb @@ -29,6 +29,18 @@ module HTTPX end end + # Raise when it can't acquire a connection for a given origin. + class PoolTimeoutError < TimeoutError + attr_reader :origin + + # initializes the +origin+ it refers to, and the + # +timeout+ causing the error. + def initialize(origin, timeout) + @origin = origin + super(timeout, "Timed out after #{timeout} seconds while waiting for a connection to #{origin}") + end + end + # Error raised when there was a timeout establishing the connection to a server. # This may be raised due to timeouts during TCP and TLS (when applicable) connection # establishment. diff --git a/lib/httpx/pool.rb b/lib/httpx/pool.rb index f2da914d..6a051e57 100644 --- a/lib/httpx/pool.rb +++ b/lib/httpx/pool.rb @@ -7,18 +7,32 @@ require "httpx/resolver" module HTTPX class Pool using ArrayExtensions::FilterMap + using URIExtensions + POOL_TIMEOUT = 5 + + # Sets up the connection pool with the given +options+, which can be the following: + # + # :max_connections_per_origin :: the maximum number of connections held in the pool pointing to a given origin. + # :pool_timeout :: the number of seconds to wait for a connection to a given origin (before raising HTTPX::PoolTimeoutError) + # def initialize(options) - @options = options - @pool_options = options.pool_options + @max_connections_per_origin = options.fetch(:max_connections_per_origin, Float::INFINITY) + @pool_timeout = options.fetch(:pool_timeout, POOL_TIMEOUT) @resolvers = Hash.new { |hs, resolver_type| hs[resolver_type] = [] } @resolver_mtx = Thread::Mutex.new @connections = [] @connection_mtx = Thread::Mutex.new + @origin_counters = Hash.new(0) + @origin_conds = Hash.new { |hs, orig| hs[orig] = ConditionVariable.new } end def pop_connection - @connection_mtx.synchronize { @connections.shift } + @connection_mtx.synchronize do + conn = @connections.shift + @origin_conds.delete(conn.origin) if conn && ((@origin_counters[conn.origin.to_s] -= 1) == 0) + conn + end end # opens a connection to the IP reachable through +uri+. @@ -29,19 +43,29 @@ module HTTPX return checkout_new_connection(uri, options) if options.io @connection_mtx.synchronize do - conn = @connections.find do |connection| - connection.match?(uri, options) - end - @connections.delete(conn) if conn + acquire_connection(uri, options) || begin + if @origin_counters[uri.origin] == @max_connections_per_origin - conn - end || checkout_new_connection(uri, options) + @origin_conds[uri.origin].wait(@connection_mtx, @pool_timeout) + + return acquire_connection(uri, options) || raise(PoolTimeoutError.new(uri.origin, @pool_timeout)) + end + + @origin_counters[uri.origin] += 1 + + checkout_new_connection(uri, options) + end + end end - def checkin_connection(connection, delete = false) + def checkin_connection(connection) return if connection.options.io - @connection_mtx.synchronize { @connections << connection } unless delete + @connection_mtx.synchronize do + @connections << connection + + @origin_conds[connection.origin.to_s].signal + end end def checkout_mergeable_connection(connection) @@ -89,6 +113,16 @@ module HTTPX private + def acquire_connection(uri, options) + conn = @connections.find do |connection| + connection.match?(uri, options) + end + + @connections.delete(conn) if conn + + conn + end + def checkout_new_connection(uri, options) options.connection_class.new(uri, options) end diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index 291cc137..13cc021a 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -17,7 +17,7 @@ module HTTPX @options = self.class.default_options.merge(options) @responses = {} @persistent = @options.persistent - @pool = @options.pool_class.new(@options) + @pool = @options.pool_class.new(@options.pool_options) @wrapped = false @closing = false wrap(&blk) if blk @@ -242,9 +242,13 @@ module HTTPX # sends the +request+ to the corresponding HTTPX::Connection def send_request(request, selector, options = request.options) - error = catch(:resolve_error) do - connection = find_connection(request.uri, selector, options) - connection.send(request) + error = begin + catch(:resolve_error) do + connection = find_connection(request.uri, selector, options) + connection.send(request) + end + rescue StandardError => e + e end return unless error.is_a?(Error) diff --git a/sig/errors.rbs b/sig/errors.rbs index 90d31618..ac8b62ae 100644 --- a/sig/errors.rbs +++ b/sig/errors.rbs @@ -17,6 +17,12 @@ module HTTPX def initialize: (Numeric timeout, String message) -> untyped end + class PoolTimeoutError < TimeoutError + attr_reader origin: String + + def initialize: (String origin, Numeric timeout) -> void + end + class ConnectTimeoutError < TimeoutError end diff --git a/sig/options.rbs b/sig/options.rbs index 30a46fdb..ab727d41 100644 --- a/sig/options.rbs +++ b/sig/options.rbs @@ -113,7 +113,7 @@ module HTTPX attr_reader resolver_options: Hash[Symbol, untyped] # resolver_options - attr_reader pool_options: Hash[Symbol, untyped] + attr_reader pool_options: pool_options # ip_families attr_reader ip_families: Array[ip_family] diff --git a/sig/pool.rbs b/sig/pool.rbs index 61976553..02a540d6 100644 --- a/sig/pool.rbs +++ b/sig/pool.rbs @@ -1,18 +1,27 @@ module HTTPX + type pool_options = { + max_connections_per_origin: Integer?, + pool_timeout: Numeric? + } + class Pool type resolver_manager = Resolver::Multi | Resolver::System + @max_connections_per_origin: Integer + @pool_timeout: Numeric @options: Options @resolvers: Hash[Class, Array[resolver_manager]] @resolver_mtx: Thread::Mutex - @connections: Array[Connection] + @connections: Hash[String, Array[Connection]] @connection_mtx: Thread::Mutex + @origin_counters: Hash[String, Integer] + @origin_conds: Hash[String, ConditionVariable] def pop_connection: () -> Connection? def checkout_connection: (http_uri uri, Options options) -> Connection - def checkin_connection: (Connection connection, ?boolish delete) -> void + def checkin_connection: (Connection connection) -> void def checkout_mergeable_connection: (Connection connection) -> Connection? @@ -24,7 +33,9 @@ module HTTPX private - def initialize: (Options options) -> void + def initialize: (pool_options options) -> void + + def acquire_connection: (http_uri, Options options) -> Connection? def checkout_new_connection: (http_uri uri, Options options) -> Connection diff --git a/test/pool_test.rb b/test/pool_test.rb index ec6d22fd..967fae1d 100644 --- a/test/pool_test.rb +++ b/test/pool_test.rb @@ -4,8 +4,88 @@ require_relative "test_helper" class PoolTest < Minitest::Test include HTTPHelpers + include HTTPX - # TODO: add connection pool tests + using URIExtensions + + def test_pool_max_connections_per_origin + uri = URI(build_uri("/")) + responses = [] + q = Queue.new + mtx = Thread::Mutex.new + + pool = Pool.new(max_connections_per_origin: 2) + def pool.connections + @connections + end + + def pool.origin_counters + @origin_counters + end + ths = 3.times.map do |_i| + Thread.start do + HTTPX.with(pool_options: { max_connections_per_origin: 2, pool_timeout: 30 }) do |http| + http.instance_variable_set(:@pool, pool) + response = http.get(uri) + mtx.synchronize { responses << response } + q.pop + end + end + end + + not_after = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 3 + until (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) > not_after || q.num_waiting == 2 + ths.first(&:alive?).join(not_after - now) + end + + assert pool.connections.empty?, "thread sessions should still be holding to the connections" + assert pool.origin_counters[uri.origin] <= 2 + + 3.times { q << :done } + ths.each(&:join) + + assert responses.size == 3 + responses.each do |res| + verify_status(res, 200) + end + end + + def test_pool_pool_timeout + uri = URI(build_uri("/")) + q = Queue.new + Thread::Mutex.new + + pool = Pool.new(max_connections_per_origin: 2, pool_timeout: 1) + + ths = 3.times.map do |_i| + Thread.start do + res = nil + HTTPX.with(pool_options: { max_connections_per_origin: 2, pool_timeout: 1 }) do |http| + begin + http.instance_variable_set(:@pool, pool) + res = http.get(uri).tap { q.pop } + rescue StandardError => e + res = e + end + end + res + end + end + + not_after = Process.clock_gettime(Process::CLOCK_MONOTONIC) + 3 + until (now = Process.clock_gettime(Process::CLOCK_MONOTONIC)) > not_after || q.num_waiting == 2 + ths.first(&:alive?).join(not_after - now) + end + sleep 1 + 3.times { q << :done } + ths.each(&:join) + + results = ths.map(&:value) + + assert(results.one?(ErrorResponse)) + err_res = results.find { |r| r.is_a?(ErrorResponse) } + verify_error_response(err_res, PoolTimeoutError) + end private diff --git a/test/proxy_test.rb b/test/proxy_test.rb index 6e9c3b4b..d1f41f72 100644 --- a/test/proxy_test.rb +++ b/test/proxy_test.rb @@ -4,6 +4,7 @@ require_relative "test_helper" require "httpx/plugins/proxy" class ProxyTest < Minitest::Test + include HTTPHelpers include HTTPX def test_parameters_equality @@ -29,10 +30,9 @@ class ProxyTest < Minitest::Test end def test_proxy_unsupported_scheme - ex = assert_raises(HTTPX::HTTPProxyError) do - HTTPX.plugin(:proxy).with_proxy(uri: "https://proxy:123").get("http://smth.com") - end - assert ex.message == "https: unsupported proxy protocol" + res = HTTPX.plugin(:proxy).with_proxy(uri: "https://proxy:123").get("http://smth.com") + verify_error_response(res, HTTPX::HTTPProxyError) + verify_error_response(res, "https: unsupported proxy protocol") end private diff --git a/test/support/requests/plugins/proxy.rb b/test/support/requests/plugins/proxy.rb index 59c8ccb7..1f0bb467 100644 --- a/test/support/requests/plugins/proxy.rb +++ b/test/support/requests/plugins/proxy.rb @@ -13,7 +13,8 @@ module Requests def test_plugin_no_proxy_defined http = HTTPX.plugin(:proxy) uri = build_uri("/get") - assert_raises(HTTPX::HTTPProxyError) { http.with_proxy(uri: []).get(uri) } + res = http.with_proxy(uri: []).get(uri) + verify_error_response(res, HTTPX::HTTPProxyError) end def test_plugin_http_http_proxy diff --git a/test/support/requests/resolvers.rb b/test/support/requests/resolvers.rb index 717dfb36..00ac7c48 100644 --- a/test/support/requests/resolvers.rb +++ b/test/support/requests/resolvers.rb @@ -163,7 +163,7 @@ module Requests response = session.get(uri, resolver_class: resolver_type, resolver_options: options.merge(resolver_opts)) verify_status(response, 200) - resolver = session.pool.resolver + resolver = session.resolver assert resolver.instance_variable_get(:@ns_index) == 1 end end diff --git a/test/support/session_with_pool.rb b/test/support/session_with_pool.rb index 510b4468..acb00f6a 100644 --- a/test/support/session_with_pool.rb +++ b/test/support/session_with_pool.rb @@ -2,20 +2,7 @@ module SessionWithPool module PoolMethods - include HTTPX - attr_reader :resolvers - - def resolver - resolver_type = @options.resolver_class - resolver_type = Resolver.resolver_for(resolver_type) - - resolver = @resolvers[resolver_type].first - - resolver = resolver.resolvers[0] if resolver.is_a?(Resolver::Multi) - - resolver - end end module InstanceMethods @@ -29,6 +16,16 @@ module SessionWithPool super end + def resolver + resolver_type = HTTPX::Resolver.resolver_for(@options.resolver_class) + + resolver = @pool.resolvers[resolver_type].first + + resolver = resolver.resolvers[0] if resolver.is_a?(HTTPX::Resolver::Multi) + + resolver + end + private def do_init_connection(connection, *)