From 4c44f405117516e118e54740d1ce3b559123a35d Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Thu, 18 Jul 2024 16:25:42 +0100 Subject: [PATCH] moving seek-or-init connection logic fully to the pool refactoring towards letting the pool figure out synchronization mechanisms --- lib/httpx/adapters/webmock.rb | 27 ++++++++++++++++++--------- lib/httpx/connection.rb | 4 ---- lib/httpx/plugins/callbacks.rb | 17 ++++++++--------- lib/httpx/plugins/proxy.rb | 2 +- lib/httpx/pool.rb | 26 ++++++++++++++++++++++++++ lib/httpx/resolver/https.rb | 3 +-- lib/httpx/session.rb | 22 +++++----------------- sig/connection.rbs | 2 -- sig/pool.rbs | 8 +++++++- sig/session.rbs | 6 ++---- 10 files changed, 68 insertions(+), 49 deletions(-) diff --git a/lib/httpx/adapters/webmock.rb b/lib/httpx/adapters/webmock.rb index 970fbcf0..51321cc2 100644 --- a/lib/httpx/adapters/webmock.rb +++ b/lib/httpx/adapters/webmock.rb @@ -52,16 +52,16 @@ module WebMock end module InstanceMethods - def init_connection(*) - connection = super - connection.once(:unmock_connection) do - unless connection.addresses - connection.__send__(:callbacks)[:connect_error].clear - pool.__send__(:unregister_connection, connection) + def find_connection(*) + super do |connection| + connection.once(:unmock_connection) do + unless connection.addresses + connection.__send__(:callbacks)[:connect_error].clear + pool.__send__(:unregister_connection, connection) + end + pool.__send__(:resolve_connection, connection) end - pool.__send__(:resolve_connection, connection) end - connection end end @@ -86,6 +86,15 @@ module WebMock def initialize(*) super @mocked = true + + connection.once(:unmock_connection) do + unless connection.addresses + connection.__send__(:callbacks)[:connect_error].clear + pool.__send__(:unregister_connection, connection) + end + pool.__send__(:resolve_connection, connection) + end + connection end def open? @@ -124,7 +133,7 @@ module WebMock end end @mocked = false - emit(:unmock_connection, self) + emit(:unmock_connection) super else raise WebMock::NetConnectNotAllowedError, request_signature diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 8fdd4116..eadb5a5a 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -132,10 +132,6 @@ module HTTPX end end - def create_idle(options = {}) - self.class.new(@origin, @options.merge(options)) - end - def merge(connection) @origins |= connection.instance_variable_get(:@origins) if connection.ssl_session diff --git a/lib/httpx/plugins/callbacks.rb b/lib/httpx/plugins/callbacks.rb index 8105944e..ce74f71e 100644 --- a/lib/httpx/plugins/callbacks.rb +++ b/lib/httpx/plugins/callbacks.rb @@ -31,16 +31,15 @@ module HTTPX private - def init_connection(uri, options) - connection = super - connection.on(:open) do - emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket) + def find_connection(*) + super do |connection| + connection.on(:open) do + emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket) + end + connection.on(:close) do + emit_or_callback_error(:connection_closed, connection.origin) if connection.used? + end end - connection.on(:close) do - emit_or_callback_error(:connection_closed, connection.origin) if connection.used? - end - - connection end def set_request_callbacks(request) diff --git a/lib/httpx/plugins/proxy.rb b/lib/httpx/plugins/proxy.rb index eb729619..50886ced 100644 --- a/lib/httpx/plugins/proxy.rb +++ b/lib/httpx/plugins/proxy.rb @@ -143,7 +143,7 @@ module HTTPX proxy = Parameters.new(**proxy_opts) proxy_options = options.merge(proxy: proxy) - connection = pool.find_connection(uri, proxy_options) || init_connection(uri, proxy_options) + connection = pool.find_or_new_connection(uri, proxy_options) unless connections.nil? || connections.include?(connection) connections << connection set_connection_callbacks(connection, connections, options) diff --git a/lib/httpx/pool.rb b/lib/httpx/pool.rb index 23910c69..1a1e626f 100644 --- a/lib/httpx/pool.rb +++ b/lib/httpx/pool.rb @@ -91,6 +91,32 @@ module HTTPX next_tick(resolver_connections) until resolver_connections.none? { |c| c.state != :idle && @connections.include?(c) } end + def find_or_new_connection(uri, options, &blk) + find_connection(uri, options) || new_connection(uri, options, &blk) + end + + def find_or_new_idle_connection(connection, extra_options) + options = connection.options.merge(extra_options) + + find_connection(connection.origin, options) || begin + other_connection = connection.class.new(connection.origin, options) + other_connection.merge(connection) + catch(:coalesced) do + init_connection(other_connection, options) + end + other_connection + end + end + + def new_connection(uri, options, &blk) + connection = options.connection_class.new(uri, options) + catch(:coalesced) do + init_connection(connection, options) + blk.call(connection) if blk + connection + end + end + def init_connection(connection, _options) connection.timers = @timers connection.on(:activate) do diff --git a/lib/httpx/resolver/https.rb b/lib/httpx/resolver/https.rb index ca30f4fe..16c7fa52 100644 --- a/lib/httpx/resolver/https.rb +++ b/lib/httpx/resolver/https.rb @@ -68,8 +68,7 @@ module HTTPX def resolver_connection @resolver_connection ||= @pool.find_connection(@uri, @options) || begin @building_connection = true - connection = @options.connection_class.new(@uri, @options.merge(ssl: { alpn_protocols: %w[h2] })) - @pool.init_connection(connection, @options) + connection = @pool.new_connection(@uri, @options.merge(ssl: { alpn_protocols: %w[h2] })) # only explicity emit addresses if connection didn't pre-resolve, i.e. it's not an IP. emit_addresses(connection, @family, @uri_addresses) unless connection.addresses @building_connection = false diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index f89c4b4a..1ce2dfad 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -114,10 +114,10 @@ module HTTPX end # returns the HTTPX::Connection through which the +request+ should be sent through. - def find_connection(request, connections, options) + def find_connection(request, connections, options, &blk) uri = request.uri - connection = pool.find_connection(uri, options) || init_connection(uri, options) + connection = pool.find_or_new_connection(uri, options, &blk) unless connections.nil? || connections.include?(connection) connections << connection set_connection_callbacks(connection, connections, options) @@ -139,11 +139,7 @@ module HTTPX # connection lifecycle events which deal with request rerouting. def set_connection_callbacks(connection, connections, options, cloned: false) connection.only(:misdirected) do |misdirected_request| - other_connection = connection.create_idle(ssl: { alpn_protocols: %w[http/1.1] }) - other_connection.merge(connection) - catch(:coalesced) do - pool.init_connection(other_connection, options) - end + other_connection = pool.find_or_new_idle_connection(connection, ssl: { alpn_protocols: %w[http/1.1] }) set_connection_callbacks(other_connection, connections, options) connections << other_connection misdirected_request.transition(:idle) @@ -160,7 +156,7 @@ module HTTPX end # returns an HTTPX::Connection for the negotiated Alternative Service (or none). - def build_altsvc_connection(existing_connection, connections, alt_origin, origin, alt_params, options) + def build_altsvc_connection(existing_connection, connections, alt_origin, origin, alt_params, options, &blk) # do not allow security downgrades on altsvc negotiation return if existing_connection.origin.scheme == "https" && alt_origin.scheme != "https" @@ -171,7 +167,7 @@ module HTTPX alt_options = options.merge(ssl: options.ssl.merge(hostname: URI(origin).host)) - connection = pool.find_connection(alt_origin, alt_options) || init_connection(alt_origin, alt_options) + connection = pool.find_or_new_connection(alt_origin, alt_options, &blk) # advertised altsvc is the same origin being used, ignore return if connection == existing_connection @@ -223,14 +219,6 @@ module HTTPX request.on(:promise, &method(:on_promise)) end - def init_connection(uri, options) - connection = options.connection_class.new(uri, options) - catch(:coalesced) do - pool.init_connection(connection, options) - connection - end - end - # sends an array of HTTPX::Request +requests+, returns the respective array of HTTPX::Response objects. def send_requests(*requests) connections = _send_requests(requests) diff --git a/sig/connection.rbs b/sig/connection.rbs index 81cc7fee..59211129 100644 --- a/sig/connection.rbs +++ b/sig/connection.rbs @@ -57,8 +57,6 @@ module HTTPX def coalescable?: (Connection connection) -> bool - def create_idle: (?Hash[Symbol, untyped] options) -> Connection - def merge: (Connection connection) -> void def purge_pending: () { (Request request) -> void } -> void diff --git a/sig/pool.rbs b/sig/pool.rbs index 98e78f41..5bccf9a3 100644 --- a/sig/pool.rbs +++ b/sig/pool.rbs @@ -15,6 +15,12 @@ module HTTPX def close: (?Array[Connection] connections) -> void + def find_or_new_connection: (URI::Generic uri, Options options) ?{ (Connection new_connection) -> void } -> Connection + + def find_or_idle_connection: (Connection connection, ?Hash[Symbol, untyped] extra_options) -> Connection + + def new_connection: (URI::Generic uri, Options options) ?{ (Connection new_connection) -> void } -> Connection + def init_connection: (Connection connection, Options options) -> void def find_connection: (URI::Generic uri, Options options) -> Connection? @@ -45,7 +51,7 @@ module HTTPX def coalesce_connections: (Connection coalescable, Connection coalescing) -> void - def next_timeout: () -> Numeric? + def next_timeout: (Array[Connection] connections) -> Numeric? def find_resolver_for: (Connection) { (Resolver::Resolver resolver) -> void } -> resolver_manager end diff --git a/sig/session.rbs b/sig/session.rbs index 5cc08d17..40694738 100644 --- a/sig/session.rbs +++ b/sig/session.rbs @@ -30,7 +30,7 @@ module HTTPX def fetch_response: (Request request, Array[Connection] connections, untyped options) -> response? - def find_connection: (Request request, Array[Connection] connections, Options options) -> Connection + def find_connection: (Request request, Array[Connection] connections, Options options) ?{ (Connection new_connection) -> void } -> Connection def send_request: (Request request, Array[Connection] connections, ?Options options) -> void @@ -38,7 +38,7 @@ module HTTPX def set_request_callbacks: (Request request) -> void - def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options) -> (Connection & AltSvc::ConnectionMixin)? + def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options)(URI::Generic uri, Options options) ?{ (Connection new_connection) -> void } -> (Connection & AltSvc::ConnectionMixin)? def build_requests: (verb, uri, request_params) -> Array[Request] | (Array[[verb, uri, request_params]], Hash[Symbol, untyped]) -> Array[Request] @@ -46,8 +46,6 @@ module HTTPX | (verb, _Each[[uri, request_params]], Hash[Symbol, untyped]) -> Array[Request] | (verb, _Each[uri], request_params) -> Array[Request] - def init_connection: (http_uri uri, Options options) -> Connection - def send_requests: (*Request) -> Array[response] def _send_requests: (Array[Request] requests) -> Array[Connection]