moving seek-or-init connection logic fully to the pool

refactoring towards letting the pool figure out synchronization mechanisms
This commit is contained in:
HoneyryderChuck 2024-07-18 16:25:42 +01:00
parent f74ab7e167
commit 4c44f40511
10 changed files with 68 additions and 49 deletions

View File

@ -52,16 +52,16 @@ module WebMock
end
module InstanceMethods
def init_connection(*)
connection = super
connection.once(:unmock_connection) do
unless connection.addresses
connection.__send__(:callbacks)[:connect_error].clear
pool.__send__(:unregister_connection, connection)
def find_connection(*)
super do |connection|
connection.once(:unmock_connection) do
unless connection.addresses
connection.__send__(:callbacks)[:connect_error].clear
pool.__send__(:unregister_connection, connection)
end
pool.__send__(:resolve_connection, connection)
end
pool.__send__(:resolve_connection, connection)
end
connection
end
end
@ -86,6 +86,15 @@ module WebMock
def initialize(*)
super
@mocked = true
connection.once(:unmock_connection) do
unless connection.addresses
connection.__send__(:callbacks)[:connect_error].clear
pool.__send__(:unregister_connection, connection)
end
pool.__send__(:resolve_connection, connection)
end
connection
end
def open?
@ -124,7 +133,7 @@ module WebMock
end
end
@mocked = false
emit(:unmock_connection, self)
emit(:unmock_connection)
super
else
raise WebMock::NetConnectNotAllowedError, request_signature

View File

@ -132,10 +132,6 @@ module HTTPX
end
end
def create_idle(options = {})
self.class.new(@origin, @options.merge(options))
end
def merge(connection)
@origins |= connection.instance_variable_get(:@origins)
if connection.ssl_session

View File

@ -31,16 +31,15 @@ module HTTPX
private
def init_connection(uri, options)
connection = super
connection.on(:open) do
emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket)
def find_connection(*)
super do |connection|
connection.on(:open) do
emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket)
end
connection.on(:close) do
emit_or_callback_error(:connection_closed, connection.origin) if connection.used?
end
end
connection.on(:close) do
emit_or_callback_error(:connection_closed, connection.origin) if connection.used?
end
connection
end
def set_request_callbacks(request)

View File

@ -143,7 +143,7 @@ module HTTPX
proxy = Parameters.new(**proxy_opts)
proxy_options = options.merge(proxy: proxy)
connection = pool.find_connection(uri, proxy_options) || init_connection(uri, proxy_options)
connection = pool.find_or_new_connection(uri, proxy_options)
unless connections.nil? || connections.include?(connection)
connections << connection
set_connection_callbacks(connection, connections, options)

View File

@ -91,6 +91,32 @@ module HTTPX
next_tick(resolver_connections) until resolver_connections.none? { |c| c.state != :idle && @connections.include?(c) }
end
def find_or_new_connection(uri, options, &blk)
find_connection(uri, options) || new_connection(uri, options, &blk)
end
def find_or_new_idle_connection(connection, extra_options)
options = connection.options.merge(extra_options)
find_connection(connection.origin, options) || begin
other_connection = connection.class.new(connection.origin, options)
other_connection.merge(connection)
catch(:coalesced) do
init_connection(other_connection, options)
end
other_connection
end
end
def new_connection(uri, options, &blk)
connection = options.connection_class.new(uri, options)
catch(:coalesced) do
init_connection(connection, options)
blk.call(connection) if blk
connection
end
end
def init_connection(connection, _options)
connection.timers = @timers
connection.on(:activate) do

View File

@ -68,8 +68,7 @@ module HTTPX
def resolver_connection
@resolver_connection ||= @pool.find_connection(@uri, @options) || begin
@building_connection = true
connection = @options.connection_class.new(@uri, @options.merge(ssl: { alpn_protocols: %w[h2] }))
@pool.init_connection(connection, @options)
connection = @pool.new_connection(@uri, @options.merge(ssl: { alpn_protocols: %w[h2] }))
# only explicity emit addresses if connection didn't pre-resolve, i.e. it's not an IP.
emit_addresses(connection, @family, @uri_addresses) unless connection.addresses
@building_connection = false

View File

@ -114,10 +114,10 @@ module HTTPX
end
# returns the HTTPX::Connection through which the +request+ should be sent through.
def find_connection(request, connections, options)
def find_connection(request, connections, options, &blk)
uri = request.uri
connection = pool.find_connection(uri, options) || init_connection(uri, options)
connection = pool.find_or_new_connection(uri, options, &blk)
unless connections.nil? || connections.include?(connection)
connections << connection
set_connection_callbacks(connection, connections, options)
@ -139,11 +139,7 @@ module HTTPX
# connection lifecycle events which deal with request rerouting.
def set_connection_callbacks(connection, connections, options, cloned: false)
connection.only(:misdirected) do |misdirected_request|
other_connection = connection.create_idle(ssl: { alpn_protocols: %w[http/1.1] })
other_connection.merge(connection)
catch(:coalesced) do
pool.init_connection(other_connection, options)
end
other_connection = pool.find_or_new_idle_connection(connection, ssl: { alpn_protocols: %w[http/1.1] })
set_connection_callbacks(other_connection, connections, options)
connections << other_connection
misdirected_request.transition(:idle)
@ -160,7 +156,7 @@ module HTTPX
end
# returns an HTTPX::Connection for the negotiated Alternative Service (or none).
def build_altsvc_connection(existing_connection, connections, alt_origin, origin, alt_params, options)
def build_altsvc_connection(existing_connection, connections, alt_origin, origin, alt_params, options, &blk)
# do not allow security downgrades on altsvc negotiation
return if existing_connection.origin.scheme == "https" && alt_origin.scheme != "https"
@ -171,7 +167,7 @@ module HTTPX
alt_options = options.merge(ssl: options.ssl.merge(hostname: URI(origin).host))
connection = pool.find_connection(alt_origin, alt_options) || init_connection(alt_origin, alt_options)
connection = pool.find_or_new_connection(alt_origin, alt_options, &blk)
# advertised altsvc is the same origin being used, ignore
return if connection == existing_connection
@ -223,14 +219,6 @@ module HTTPX
request.on(:promise, &method(:on_promise))
end
def init_connection(uri, options)
connection = options.connection_class.new(uri, options)
catch(:coalesced) do
pool.init_connection(connection, options)
connection
end
end
# sends an array of HTTPX::Request +requests+, returns the respective array of HTTPX::Response objects.
def send_requests(*requests)
connections = _send_requests(requests)

View File

@ -57,8 +57,6 @@ module HTTPX
def coalescable?: (Connection connection) -> bool
def create_idle: (?Hash[Symbol, untyped] options) -> Connection
def merge: (Connection connection) -> void
def purge_pending: () { (Request request) -> void } -> void

View File

@ -15,6 +15,12 @@ module HTTPX
def close: (?Array[Connection] connections) -> void
def find_or_new_connection: (URI::Generic uri, Options options) ?{ (Connection new_connection) -> void } -> Connection
def find_or_idle_connection: (Connection connection, ?Hash[Symbol, untyped] extra_options) -> Connection
def new_connection: (URI::Generic uri, Options options) ?{ (Connection new_connection) -> void } -> Connection
def init_connection: (Connection connection, Options options) -> void
def find_connection: (URI::Generic uri, Options options) -> Connection?
@ -45,7 +51,7 @@ module HTTPX
def coalesce_connections: (Connection coalescable, Connection coalescing) -> void
def next_timeout: () -> Numeric?
def next_timeout: (Array[Connection] connections) -> Numeric?
def find_resolver_for: (Connection) { (Resolver::Resolver resolver) -> void } -> resolver_manager
end

View File

@ -30,7 +30,7 @@ module HTTPX
def fetch_response: (Request request, Array[Connection] connections, untyped options) -> response?
def find_connection: (Request request, Array[Connection] connections, Options options) -> Connection
def find_connection: (Request request, Array[Connection] connections, Options options) ?{ (Connection new_connection) -> void } -> Connection
def send_request: (Request request, Array[Connection] connections, ?Options options) -> void
@ -38,7 +38,7 @@ module HTTPX
def set_request_callbacks: (Request request) -> void
def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options) -> (Connection & AltSvc::ConnectionMixin)?
def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options)(URI::Generic uri, Options options) ?{ (Connection new_connection) -> void } -> (Connection & AltSvc::ConnectionMixin)?
def build_requests: (verb, uri, request_params) -> Array[Request]
| (Array[[verb, uri, request_params]], Hash[Symbol, untyped]) -> Array[Request]
@ -46,8 +46,6 @@ module HTTPX
| (verb, _Each[[uri, request_params]], Hash[Symbol, untyped]) -> Array[Request]
| (verb, _Each[uri], request_params) -> Array[Request]
def init_connection: (http_uri uri, Options options) -> Connection
def send_requests: (*Request) -> Array[response]
def _send_requests: (Array[Request] requests) -> Array[Connection]