adapted plugins to the new structure

This commit is contained in:
HoneyryderChuck 2024-08-16 12:48:45 +01:00
parent 11d197ff24
commit 4a351bc095
24 changed files with 244 additions and 218 deletions

View File

@ -52,16 +52,18 @@ module WebMock
end
module InstanceMethods
def init_connection(*)
connection = super
connection.once(:unmock_connection) do
unless connection.addresses
connection.__send__(:callbacks)[:connect_error].clear
pool.__send__(:unregister_connection, connection)
private
def do_init_connection(connection, selector)
super.tap |conn|
conn.once(:unmock_connection) do
unless conn.addresses
conn.__send__(:callbacks)[:connect_error].clear
deselect_connection(conn, selector)
end
resolve_connection(conn, selector)
end
pool.__send__(:resolve_connection, connection)
end
connection
end
end
@ -100,6 +102,10 @@ module WebMock
super
end
def terminate
force_reset
end
def send(request)
request_signature = Plugin.build_webmock_request_signature(request)
WebMock::RequestRegistry.instance.requested_signatures.put(request_signature)

View File

@ -15,11 +15,6 @@ module HTTPX
self
end
def only(type, &block)
callbacks(type).clear
on(type, &block)
end
def emit(type, *args)
callbacks(type).delete_if { |pr| :delete == pr.call(*args) } # rubocop:disable Style/YodaCondition
end

View File

@ -97,6 +97,7 @@ module HTTPX
# sets the callbacks on the +connection+ required to process certain specific
# connection lifecycle events which deal with request rerouting.
on(:misdirected) do |misdirected_request|
# TODO: leaks connection object into the pool
other_connection = @current_session.find_connection(@origin, @current_selector,
@options.merge(ssl: { alpn_protocols: %w[http/1.1] }))
other_connection.merge(self)
@ -712,6 +713,7 @@ module HTTPX
alt_options = @options.merge(ssl: @options.ssl.merge(hostname: URI(origin).host))
# TODO: leaks connection object into the pool
connection = @current_session.find_connection(alt_origin, @current_selector, alt_options)
# advertised altsvc is the same origin being used, ignore

View File

@ -31,12 +31,16 @@ module HTTPX
private
def init_connection(uri, options)
connection = super
def do_init_connection(connection, selector)
super
connection.on(:open) do
next unless connection.current_session == self
emit_or_callback_error(:connection_opened, connection.origin, connection.io.socket)
end
connection.on(:close) do
next unless connection.current_session == self
emit_or_callback_error(:connection_closed, connection.origin) if connection.used?
end
@ -84,6 +88,12 @@ module HTTPX
rescue CallbackError => e
raise e.cause
end
def close(*)
super
rescue CallbackError => e
raise e.cause
end
end
end
register_plugin :callbacks, Callbacks

View File

@ -96,15 +96,16 @@ module HTTPX
end
module InstanceMethods
def fetch_response(request, connections, options)
response = @responses.delete(request)
def fetch_response(request, selector, options)
response = super
return unless response
if response.is_a?(Response) && response.status == 417 && request.headers.key?("expect")
response.close
request.headers.delete("expect")
request.transition(:idle)
send_request(request, connections, options)
send_request(request, selector, options)
return
end

View File

@ -64,9 +64,9 @@ module HTTPX
private
def fetch_response(request, connections, options)
def fetch_response(request, selector, options)
redirect_request = request.redirect_request
response = super(redirect_request, connections, options)
response = super(redirect_request, selector, options)
return unless response
max_redirects = redirect_request.max_redirects
@ -146,20 +146,19 @@ module HTTPX
#
redirect_after = Utils.parse_retry_after(redirect_after)
retry_start = Utils.now
log { "redirecting after #{redirect_after} secs..." }
deactivate_connection(request, connections, options)
pool.after(redirect_after) do
selector.after(redirect_after) do
if request.response
# request has terminated abruptly meanwhile
retry_request.emit(:response, request.response)
else
send_request(retry_request, connections, options)
log { "redirecting (elapsed time: #{Utils.elapsed_time(retry_start)})!!" }
send_request(retry_request, selector, options)
end
end
else
send_request(retry_request, connections, options)
send_request(retry_request, selector, options)
end
nil
end

View File

@ -25,26 +25,6 @@ module HTTPX
end
end
module InstanceMethods
def send_requests(*requests)
upgrade_request, *remainder = requests
return super unless VALID_H2C_VERBS.include?(upgrade_request.verb) && upgrade_request.scheme == "http"
connection = pool.find_connection(upgrade_request.uri, upgrade_request.options)
return super if connection && connection.upgrade_protocol == "h2c"
# build upgrade request
upgrade_request.headers.add("connection", "upgrade")
upgrade_request.headers.add("connection", "http2-settings")
upgrade_request.headers["upgrade"] = "h2c"
upgrade_request.headers["http2-settings"] = ::HTTP2::Client.settings_header(upgrade_request.options.http2_settings)
super(upgrade_request, *remainder)
end
end
class H2CParser < Connection::HTTP2
def upgrade(request, response)
# skip checks, it is assumed that this is the first
@ -65,6 +45,29 @@ module HTTPX
module ConnectionMethods
using URIExtensions
def initialize(*)
super
@h2c_handshake = false
end
def send(request)
return super if @h2c_handshake
return super unless VALID_H2C_VERBS.include?(request.verb) && request.scheme == "http"
return super if @upgrade_protocol == "h2c"
@h2c_handshake = true
# build upgrade request
request.headers.add("connection", "upgrade")
request.headers.add("connection", "http2-settings")
request.headers["upgrade"] = "h2c"
request.headers["http2-settings"] = ::HTTP2::Client.settings_header(request.options.http2_settings)
super
end
def upgrade_to_h2c(request, response)
prev_parser = @parser

View File

@ -76,6 +76,14 @@ module HTTPX
meter_elapsed_time("Session -> response") if response
response
end
def coalesce_connections(conn1, conn2, selector)
result = super
meter_elapsed_time("Connection##{conn2.object_id} coalescing to Connection##{conn1.object_id}") if result
result
end
end
module RequestMethods

View File

@ -31,31 +31,53 @@ module HTTPX
end
class Parameters
attr_reader :uri, :username, :password, :scheme
attr_reader :uri, :username, :password, :scheme, :no_proxy
def initialize(uri:, scheme: nil, username: nil, password: nil, **extra)
def initialize(uri: nil, scheme: nil, username: nil, password: nil, no_proxy: nil, **extra)
@no_proxy = Array(no_proxy) if no_proxy
@uris = Array(uri)
uri = @uris.first
@username = username
@password = password
@ns = 0
if uri
@uri = uri.is_a?(URI::Generic) ? uri : URI(uri)
@username = username || @uri.user
@password = password || @uri.password
return unless @username && @password
scheme ||= case @uri.scheme
when "socks5"
@uri.scheme
when "http", "https"
"basic"
else
return
@username ||= @uri.user
@password ||= @uri.password
end
@scheme = scheme
auth_scheme = scheme.to_s.capitalize
return unless @uri && @username && @password
require_relative "auth/#{scheme}" unless defined?(Authentication) && Authentication.const_defined?(auth_scheme, false)
@authenticator = nil
@scheme ||= infer_default_auth_scheme(@uri)
@authenticator = Authentication.const_get(auth_scheme).new(@username, @password, **extra)
return unless @scheme
@authenticator = load_authenticator(@scheme, @username, @password, **extra)
end
def shift
# TODO: this operation must be synchronized
@ns += 1
@uri = @uris[@ns]
return unless @uri
@uri = URI(@uri) unless @uri.is_a?(URI::Generic)
scheme = infer_default_auth_scheme(@uri)
return unless scheme != @scheme
@scheme = scheme
@username = username || @uri.user
@password = password || @uri.password
@authenticator = load_authenticator(scheme, @username, @password)
end
def can_authenticate?(*args)
@ -87,6 +109,25 @@ module HTTPX
super
end
end
private
def infer_default_auth_scheme(uri)
case uri.scheme
when "socks5"
uri.scheme
when "http", "https"
"basic"
end
end
def load_authenticator(scheme, username, password, **extra)
auth_scheme = scheme.to_s.capitalize
require_relative "auth/#{scheme}" unless defined?(Authentication) && Authentication.const_defined?(auth_scheme, false)
Authentication.const_get(auth_scheme).new(username, password, **extra)
end
end
# adds support for the following options:
@ -95,7 +136,7 @@ module HTTPX
# *:scheme* (i.e. <tt>{ uri: "http://proxy" }</tt>)
module OptionsMethods
def option_proxy(value)
value.is_a?(Parameters) ? value : Hash[value]
value.is_a?(Parameters) ? value : Parameters.new(**Hash[value])
end
def option_supported_proxy_protocols(value)
@ -106,97 +147,67 @@ module HTTPX
end
module InstanceMethods
private
def find_connection(request, connections, options)
def find_connection(request_uri, selector, options)
return super unless options.respond_to?(:proxy)
uri = request.uri
proxy_options = proxy_options(uri, options)
return super(request, connections, proxy_options) unless proxy_options.proxy
connection = pool.find_connection(uri, proxy_options) || init_connection(uri, proxy_options)
unless connections.nil? || connections.include?(connection)
connections << connection
set_connection_callbacks(connection, connections, options)
end
connection
if (next_proxy = request_uri.find_proxy)
return super(request_uri, selector, options.merge(proxy: Parameters.new(uri: next_proxy)))
end
def proxy_options(request_uri, options)
proxy_opts = if (next_proxy = request_uri.find_proxy)
{ uri: next_proxy }
else
proxy = options.proxy
return options unless proxy
return super unless proxy
return options.merge(proxy: nil) unless proxy.key?(:uri)
next_proxy = proxy.uri
@_proxy_uris ||= Array(proxy[:uri])
next_proxy = @_proxy_uris.first
raise Error, "Failed to connect to proxy" unless next_proxy
next_proxy = URI(next_proxy)
raise Error,
"#{next_proxy.scheme}: unsupported proxy protocol" unless options.supported_proxy_protocols.include?(next_proxy.scheme)
if proxy.key?(:no_proxy)
no_proxy = proxy[:no_proxy]
if (no_proxy = proxy.no_proxy)
no_proxy = no_proxy.join(",") if no_proxy.is_a?(Array)
return options.merge(proxy: nil) unless URI::Generic.use_proxy?(request_uri.host, next_proxy.host,
return super(request_uri, selector, options.merge(proxy: nil)) unless URI::Generic.use_proxy?(request_uri.host, next_proxy.host,
next_proxy.port, no_proxy)
end
proxy.merge(uri: next_proxy)
super(request_uri, selector, options.merge(proxy: proxy))
end
proxy = Parameters.new(**proxy_opts)
private
options.merge(proxy: proxy)
end
def fetch_response(request, connections, options)
def fetch_response(request, selector, options)
response = super
if response.is_a?(ErrorResponse) && proxy_error?(request, response)
return response unless @_proxy_uris
@_proxy_uris.shift
if response.is_a?(ErrorResponse) && proxy_error?(request, response, options)
options.proxy.shift
# return last error response if no more proxies to try
return response if @_proxy_uris.empty?
return response if options.proxy.uri.nil?
log { "failed connecting to proxy, trying next..." }
request.transition(:idle)
send_request(request, connections, options)
send_request(request, selector, options)
return
end
response
end
def proxy_error?(_request, response)
def proxy_error?(_request, response, options)
return false unless options.proxy
error = response.error
case error
when NativeResolveError
return false unless @_proxy_uris && !@_proxy_uris.empty?
proxy_uri = URI(@_proxy_uris.first)
proxy_uri = URI(options.proxy.uri)
origin = error.connection.origin
# failed resolving proxy domain
origin.host == proxy_uri.host && origin.port == proxy_uri.port
when ResolveError
return false unless @_proxy_uris && !@_proxy_uris.empty?
proxy_uri = URI(@_proxy_uris.first)
proxy_uri = URI(options.proxy.uri)
error.message.end_with?(proxy_uri.to_s)
when *PROXY_ERRORS
@ -261,7 +272,7 @@ module HTTPX
@state = :open
super
emit(:close)
# emit(:close)
end
private

View File

@ -23,30 +23,20 @@ module HTTPX
with(proxy: opts.merge(scheme: "ntlm"))
end
def fetch_response(request, connections, options)
def fetch_response(request, selector, options)
response = super
if response &&
response.is_a?(Response) &&
response.status == 407 &&
!request.headers.key?("proxy-authorization") &&
response.headers.key?("proxy-authenticate")
uri = request.uri
proxy_options = proxy_options(uri, options)
connection = connections.find do |conn|
conn.match?(uri, proxy_options)
end
if connection && connection.options.proxy.can_authenticate?(response.headers["proxy-authenticate"])
response.headers.key?("proxy-authenticate") && options.proxy.can_authenticate?(response.headers["proxy-authenticate"])
request.transition(:idle)
request.headers["proxy-authorization"] =
connection.options.proxy.authenticate(request, response.headers["proxy-authenticate"])
send_request(request, connections)
options.proxy.authenticate(request, response.headers["proxy-authenticate"])
send_request(request, selector, options)
return
end
end
response
end
@ -74,7 +64,14 @@ module HTTPX
parser = @parser
parser.extend(ProxyParser)
parser.on(:response, &method(:__http_on_connect))
parser.on(:close) { transition(:closing) }
parser.on(:close) do |force|
next unless @parser
if force
reset
emit(:terminate)
end
end
parser.on(:reset) do
if parser.empty?
reset
@ -95,8 +92,9 @@ module HTTPX
case @state
when :connecting
@parser.close
parser = @parser
@parser = nil
parser.close
when :idle
@parser.callbacks.clear
set_parser_callbacks(@parser)

View File

@ -94,7 +94,7 @@ module HTTPX
private
def fetch_response(request, connections, options)
def fetch_response(request, selector, options)
response = super
if response &&
@ -124,20 +124,17 @@ module HTTPX
retry_start = Utils.now
log { "retrying after #{retry_after} secs..." }
deactivate_connection(request, connections, options)
pool.after(retry_after) do
selector.after(retry_after) do
if request.response
# request has terminated abruptly meanwhile
request.emit(:response, request.response)
else
log { "retrying (elapsed time: #{Utils.elapsed_time(retry_start)})!!" }
send_request(request, connections, options)
send_request(request, selector, options)
end
end
else
send_request(request, connections, options)
send_request(request, selector, options)
end
return
@ -153,7 +150,7 @@ module HTTPX
RETRYABLE_ERRORS.any? { |klass| ex.is_a?(klass) }
end
def proxy_error?(request, response)
def proxy_error?(request, response, _)
super && !request.retries.positive?
end

View File

@ -28,7 +28,7 @@ module HTTPX
end
module InstanceMethods
def fetch_response(request, connections, options)
def fetch_response(request, selector, options)
response = super
if response
@ -45,7 +45,7 @@ module HTTPX
return response unless protocol_handler
log { "upgrading to #{upgrade_protocol}..." }
connection = find_connection(request, connections, options)
connection = find_connection(request.uri, selector, options)
# do not upgrade already upgraded connections
return if connection.upgrade_protocol == upgrade_protocol
@ -60,14 +60,6 @@ module HTTPX
response
end
def close(*args)
return super if args.empty?
connections, = args
pool.close(connections.reject(&:hijacked))
end
end
module ConnectionMethods
@ -75,6 +67,9 @@ module HTTPX
def hijack_io
@hijacked = true
# connection is taken away from selector and not given back to the pool.
@current_session.deselect_connection(self, @current_selector, true)
end
end
end

View File

@ -13,10 +13,8 @@ module HTTPX
@connections = []
end
def checkout_by_options(options)
def checkout_connection_by_options(options)
conn = @connections.find do |connection|
next if connection.state == :closed
connection.options == options
end
return unless conn

View File

@ -66,6 +66,7 @@ module HTTPX
end
def resolver_connection
# TODO: leaks connection object into the pool
@resolver_connection ||= @current_session.find_connection(@uri, @current_selector,
@options.merge(ssl: { alpn_protocols: %w[h2] })).tap do |conn|
emit_addresses(conn, @family, @uri_addresses) unless conn.addresses

View File

@ -20,6 +20,7 @@ module HTTPX
@responses = {}
@persistent = @options.persistent
@wrapped = false
@closing = false
wrap(&blk) if blk
end
@ -50,22 +51,21 @@ module HTTPX
# closes all the active connections from the session.
#
# when called directly with +selector+ as nil, all available connections
# when called directly without specifying +selector+, all available connections
# will be picked up from the connection pool and closed. Connections in use
# by other sessions, or same session in a different thread, will not be reaped.
def close(selector = nil)
if selector.nil?
selector = Selector.new
def close(selector = Selector.new)
# throw resolver away from the pool
pool.checkout_resolver(@options)
# preparing to throw away connections
while (connection = pool.checkout_connection_by_options(@options))
next if connection.state == :closed
while (connection = pool.checkout_by_options(@options))
connection.current_session = self
connection.current_selector = selector
select_connection(connection, selector)
end
return close(selector)
end
begin
@closing = true
selector.terminate
@ -129,12 +129,16 @@ module HTTPX
return if cloned
return if @closing && connection.state == :closed
pool.checkin_connection(connection)
end
def deselect_resolver(resolver, selector)
selector.deregister(resolver)
return if @closing && resolver.closed?
pool.checkin_resolver(resolver)
end

View File

@ -21,7 +21,7 @@ class Bug_0_14_4_Test < Minitest::Test
conn_header = ((idx + 1) % 100).zero? ? "close" : "Keep-Alive"
assert verify_header(response.headers, "connection", conn_header)
end
connection_count = http.pool.connection_count
connection_count = http.connection_count
assert connection_count == 4, "expected to have 4 connections (+ an idle one), instead have #{connection_count}"
end
ensure
@ -44,7 +44,7 @@ class Bug_0_14_4_Test < Minitest::Test
conn_header = ((idx + 1) % 2).zero? ? "close" : "Keep-Alive"
assert verify_header(response.headers, "connection", conn_header)
end
connection_count = http.pool.connection_count
connection_count = http.connection_count
assert connection_count == 100, "expected to have 100 connections (+ an idle one), instead have #{connection_count}"
end
ensure

View File

@ -6,7 +6,6 @@ module HTTPX
module Callbacks
def on: (Symbol) { (*untyped) -> void } -> self
def once: (Symbol) { (*untyped) -> void } -> self
def only: (Symbol) { (*untyped) -> void } -> self
def emit: (Symbol, *untyped) -> void
def callbacks_for?: (Symbol) -> bool

View File

@ -6,15 +6,25 @@ module HTTPX
end
module Plugins
interface _Authenticator
def authenticate: (Request request, String authenticate) -> String
end
module Proxy
Error: singleton(HTTPProxyError)
PROXY_ERRORS: Array[singleton(StandardError)]
class Parameters
attr_reader uri: URI::Generic
attr_reader uri: URI::Generic?
attr_reader username: String?
attr_reader password: String?
attr_reader scheme: String?
attr_reader no_proxy: Array[String]?
@uris: Array[URI::Generic | String]
@authenticator: _Authenticator
def shift: () -> void
def can_authenticate?: (*untyped) -> boolish
@ -24,15 +34,17 @@ module HTTPX
private
def initialize: (uri: generic_uri, ?scheme: String, ?username: String, ?password: String, **untyped) -> untyped
def initialize: (?uri: generic_uri | Array[generic_uri], ?scheme: String, ?username: String, ?password: String, ?no_proxy: Array[generic_uri] | generic_uri, **untyped) -> void
def infer_default_auth_scheme: (URI::Generic uri) -> String?
def load_authenticator: (String scheme, String username, String password, **untyped) -> _Authenticator
end
def self.configure: (singleton(Session)) -> void
type proxyParam = Parameters | Hash[Symbol, untyped]
interface _ProxyOptions
def proxy: () -> proxyParam?
def proxy: () -> Parameters?
end
def self.extra_options: (Options) -> (Options & _ProxyOptions)
@ -42,9 +54,7 @@ module HTTPX
private
def proxy_error?: (Request request, response) -> bool
def proxy_options: (http_uri request_uri, Options & _ProxyOptions options) -> (Options & _ProxyOptions)
def proxy_error?: (Request request, response, Options options) -> bool
end
module ConnectionMethods

View File

@ -29,7 +29,7 @@ module HTTPX
private
def fetch_response: (retriesRequest request, Array[Connection] connections, retriesOptions options) -> (retriesResponse | ErrorResponse)?
def fetch_response: (retriesRequest request, Selector selector, retriesOptions options) -> (retriesResponse | ErrorResponse)?
def __repeatable_request?: (retriesRequest request, retriesOptions options) -> boolish

View File

@ -34,13 +34,12 @@ class EnvProxyTest < Minitest::Test
end
def test_env_proxy_coalescing
HTTPX.plugin(SessionWithPool).wrap do |session|
response = session.get("https://#{httpbin}/get")
HTTPX.plugin(SessionWithPool).wrap do |http|
response = http.get("https://#{httpbin}/get")
verify_status(response, 200)
verify_body_length(response)
pool = session.pool
connections = pool.connections
connections = http.conn_store
assert connections.size == 1
connection = connections.first
@ -60,8 +59,7 @@ class EnvProxyTest < Minitest::Test
verify_status(response2, 200)
verify_body_length(response2)
pool = http.pool
connections = pool.connections
connections = http.conn_store
assert connections.size == 1
connections.each do |connection|

View File

@ -5,16 +5,7 @@ require_relative "test_helper"
class PoolTest < Minitest::Test
include HTTPHelpers
def test_pool_timers_cleanup
uri = build_uri("/get")
HTTPX.plugin(SessionWithPool).wrap do |http|
response = http.get(uri)
verify_status(response, 200)
timers = http.pool.timers
assert timers.intervals.empty?, "there should be no timers left"
end
end unless RUBY_ENGINE == "jruby" && JRUBY_VERSION < "9.4.5.0"
# TODO: add connection pool tests
private

View File

@ -22,9 +22,9 @@ class ProxyTest < Minitest::Test
basic_proxy_opts = HTTPX.plugin(:proxy).__send__(:"with_proxy_#{auth_method}_auth", username: "user",
password: "pass").instance_variable_get(:@options)
proxy = basic_proxy_opts.proxy
assert proxy[:username] == "user"
assert proxy[:password] == "pass"
assert proxy[:scheme] == auth_method
assert proxy.username == "user"
assert proxy.password == "pass"
assert proxy.scheme == auth_method
end
end

View File

@ -98,12 +98,12 @@ module Requests
assert chunks.positive?
end
def test_callbacks_bug_inside_callback
%i[
connection_opened connection_closed
request_started request_completed
response_started response_body_chunk response_completed
].each do |callback|
define_method :"test_callbacks_bug_inside_#{callback}_callback" do
assert_raises(NameError) do
HTTPX.plugin(SessionWithPool).plugin(:callbacks).send(:"on_#{callback}") { i_dont_exist }.get(build_uri("/get"))
end

View File

@ -10,10 +10,10 @@ module Requests
RESOLVER = Resolv::DNS.new
def test_plugin_no_proxy
def test_plugin_no_proxy_defined
http = HTTPX.plugin(:proxy)
uri = build_uri("/get")
session = HTTPX.plugin(:proxy).with_proxy(uri: [])
assert_raises(HTTPX::HTTPProxyError) { session.get(uri) }
assert_raises(HTTPX::HTTPProxyError) { http.with_proxy(uri: []).get(uri) }
end
def test_plugin_http_http_proxy