changed internal session structure, so that it uses local selectors directly

pools are then used only to fetch new conenctions; selectors are discarded when not needed anymore; HTTPX.wrap is for now patched, but would ideally be done with in the future
This commit is contained in:
HoneyryderChuck 2024-08-16 11:32:47 +01:00
parent 12fbca468b
commit 11d197ff24
18 changed files with 794 additions and 362 deletions

View File

@ -43,11 +43,14 @@ module HTTPX
attr_reader :type, :io, :origin, :origins, :state, :pending, :options, :ssl_session
attr_writer :timers
attr_writer :current_selector, :coalesced_connection
attr_accessor :family
attr_accessor :current_session, :family
def initialize(uri, options)
@current_session = @current_selector = @coalesced_connection = nil
@exhausted = @cloned = false
@origins = [uri.origin]
@origin = Utils.to_uri(uri.origin)
@options = Options.new(options)
@ -58,6 +61,7 @@ module HTTPX
@read_buffer = Buffer.new(@options.buffer_size)
@write_buffer = Buffer.new(@options.buffer_size)
@pending = []
on(:error, &method(:on_error))
if @options.io
# if there's an already open IO, get its
@ -68,6 +72,40 @@ module HTTPX
else
transition(:idle)
end
on(:activate) do
@current_session.select_connection(self, @current_selector)
end
on(:close) do
next if @exhausted # it'll reset
# may be called after ":close" above, so after the connection has been checked back in.
# next unless @current_session
next unless @current_session
@current_session.deselect_connection(self, @current_selector, @cloned)
end
on(:terminate) do
next if @exhausted # it'll reset
# may be called after ":close" above, so after the connection has been checked back in.
next unless @current_session
@current_session.deselect_connection(self, @current_selector)
end
# sets the callbacks on the +connection+ required to process certain specific
# connection lifecycle events which deal with request rerouting.
on(:misdirected) do |misdirected_request|
other_connection = @current_session.find_connection(@origin, @current_selector,
@options.merge(ssl: { alpn_protocols: %w[http/1.1] }))
other_connection.merge(self)
misdirected_request.transition(:idle)
other_connection.send(misdirected_request)
end
on(:altsvc) do |alt_origin, origin, alt_params|
build_altsvc_connection(alt_origin, origin, alt_params)
end
@inflight = 0
@keep_alive_timeout = @options.timeout[:keep_alive_timeout]
@ -168,7 +206,12 @@ module HTTPX
end
def inflight?
@parser && !@parser.empty? && !@write_buffer.empty?
@parser && (
# parser may be dealing with other requests (possibly started from a different fiber)
!@parser.empty? ||
# connection may be doing connection termination handshake
!@write_buffer.empty?
)
end
def interests
@ -184,6 +227,9 @@ module HTTPX
return @parser.interests if @parser
nil
rescue StandardError => e
emit(:error, e)
nil
end
@ -205,6 +251,9 @@ module HTTPX
consume
end
nil
rescue StandardError => e
emit(:error, e)
raise e
end
def close
@ -221,8 +270,9 @@ module HTTPX
# bypasses the state machine to force closing of connections still connecting.
# **only** used for Happy Eyeballs v2.
def force_reset
def force_reset(cloned = false)
@state = :closing
@cloned = cloned
transition(:closed)
end
@ -235,6 +285,8 @@ module HTTPX
end
def send(request)
return @coalesced_connection.send(request) if @coalesced_connection
if @parser && !@write_buffer.full?
if @response_received_at && @keep_alive_timeout &&
Utils.elapsed_time(@response_received_at) > @keep_alive_timeout
@ -255,6 +307,8 @@ module HTTPX
end
def timeout
return if @state == :closed || @state == :inactive
return @timeout if @timeout
return @options.timeout[:connect_timeout] if @state == :idle
@ -301,6 +355,12 @@ module HTTPX
transition(:open)
end
def disconnect
emit(:close)
@current_session = nil
@current_selector = nil
end
def consume
return unless @io
@ -475,8 +535,25 @@ module HTTPX
request.emit(:promise, parser, stream)
end
parser.on(:exhausted) do
@exhausted = true
current_session = @current_session
current_selector = @current_selector
parser.close
@pending.concat(parser.pending)
emit(:exhausted)
case @state
when :closed
idling
@exhausted = false
@current_session = current_session
@current_selector = current_selector
when :closing
once(:close) do
idling
@exhausted = false
@current_session = current_session
@current_selector = current_selector
end
end
end
parser.on(:origin) do |origin|
@origins |= [origin]
@ -492,8 +569,14 @@ module HTTPX
end
parser.on(:reset) do
@pending.concat(parser.pending) unless parser.empty?
current_session = @current_session
current_selector = @current_selector
reset
idling unless @pending.empty?
unless @pending.empty?
idling
@current_session = current_session
@current_selector = current_selector
end
end
parser.on(:current_timeout) do
@current_timeout = @timeout = parser.timeout
@ -531,13 +614,13 @@ module HTTPX
error.set_backtrace(e.backtrace)
connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, error) : handle_error(error)
@state = :closed
emit(:close)
disconnect
rescue TLSError, ::HTTP2::Error::ProtocolError, ::HTTP2::Error::HandshakeError => e
# connect errors, exit gracefully
handle_error(e)
connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, e) : handle_error(e)
@state = :closed
emit(:close)
disconnect
end
def handle_transition(nextstate)
@ -582,7 +665,7 @@ module HTTPX
return unless @write_buffer.empty?
purge_after_closed
emit(:close) if @pending.empty?
disconnect if @pending.empty?
when :already_open
nextstate = :open
# the first check for given io readiness must still use a timeout.
@ -617,6 +700,34 @@ module HTTPX
end
end
# returns an HTTPX::Connection for the negotiated Alternative Service (or none).
def build_altsvc_connection(alt_origin, origin, alt_params)
# do not allow security downgrades on altsvc negotiation
return if @origin.scheme == "https" && alt_origin.scheme != "https"
altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin))
# altsvc already exists, somehow it wasn't advertised, probably noop
return unless altsvc
alt_options = @options.merge(ssl: @options.ssl.merge(hostname: URI(origin).host))
connection = @current_session.find_connection(alt_origin, @current_selector, alt_options)
# advertised altsvc is the same origin being used, ignore
return if connection == self
connection.extend(AltSvc::ConnectionMixin) unless connection.is_a?(AltSvc::ConnectionMixin)
log(level: 1) { "#{origin} alt-svc: #{alt_origin}" }
connection.merge(self)
terminate
rescue UnsupportedSchemeError
altsvc["noop"] = true
nil
end
def build_socket(addrs = nil)
case @type
when "tcp"
@ -734,7 +845,7 @@ module HTTPX
def set_request_timeout(request, timeout, start_event, finish_events, &callback)
request.once(start_event) do
interval = @timers.after(timeout, callback)
interval = @current_selector.after(timeout, callback)
Array(finish_events).each do |event|
# clean up request timeouts if the connection errors out

View File

@ -327,10 +327,14 @@ module HTTPX
end
end
send(@pending.shift) unless @pending.empty?
return unless @streams.empty? && exhausted?
close
emit(:exhausted) unless @pending.empty?
if @pending.empty?
close
else
emit(:exhausted)
end
end
def on_frame(bytes)

View File

@ -27,7 +27,7 @@ module HTTPX
use_get: false,
}.freeze
def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close, :terminate
def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close, :terminate, :inflight?
def initialize(_, options)
super
@ -66,23 +66,15 @@ module HTTPX
end
def resolver_connection
@resolver_connection ||= @pool.find_connection(@uri, @options) || begin
@building_connection = true
connection = @options.connection_class.new(@uri, @options.merge(ssl: { alpn_protocols: %w[h2] }))
@pool.init_connection(connection, @options)
# only explicity emit addresses if connection didn't pre-resolve, i.e. it's not an IP.
catch(:coalesced) do
@building_connection = false
emit_addresses(connection, @family, @uri_addresses) unless connection.addresses
connection
end
@resolver_connection ||= @current_session.find_connection(@uri, @current_selector,
@options.merge(ssl: { alpn_protocols: %w[h2] })).tap do |conn|
emit_addresses(conn, @family, @uri_addresses) unless conn.addresses
end
end
private
def resolve(connection = @connections.first, hostname = nil)
return if @building_connection
return unless connection
hostname ||= @queries.key(connection)
@ -176,7 +168,7 @@ module HTTPX
alias_address = answers[address["alias"]]
if alias_address.nil?
reset_hostname(address["name"])
if catch(:coalesced) { early_resolve(connection, hostname: address["alias"]) }
if early_resolve(connection, hostname: address["alias"])
@connections.delete(connection)
else
resolve(connection, address["alias"])

View File

@ -8,27 +8,45 @@ module HTTPX
include Callbacks
using ArrayExtensions::FilterMap
attr_reader :resolvers
attr_reader :resolvers, :options
def initialize(resolver_type, options)
@current_selector = nil
@current_session = nil
@options = options
@resolver_options = @options.resolver_options
@resolvers = options.ip_families.map do |ip_family|
resolver = resolver_type.new(ip_family, options)
resolver.on(:resolve, &method(:on_resolver_connection))
resolver.on(:error, &method(:on_resolver_error))
resolver.on(:close) { on_resolver_close(resolver) }
resolver.multi = self
resolver
end
@errors = Hash.new { |hs, k| hs[k] = [] }
end
def current_selector=(s)
@current_selector = s
@resolvers.each { |r| r.__send__(__method__, s) }
end
def current_session=(s)
@current_session = s
@resolvers.each { |r| r.__send__(__method__, s) }
end
def closed?
@resolvers.all?(&:closed?)
end
def empty?
@resolvers.all?(&:empty?)
end
def inflight?
@resolvers.any(&:inflight?)
end
def timeout
@resolvers.filter_map(&:timeout).min
end
@ -58,18 +76,13 @@ module HTTPX
end
end
private
def lazy_resolve(connection)
@resolvers.each do |resolver|
resolver << @current_session.try_clone_connection(connection, @current_selector, resolver.family)
next if resolver.empty?
def on_resolver_connection(connection)
emit(:resolve, connection)
end
def on_resolver_error(connection, error)
emit(:error, connection, error)
end
def on_resolver_close(resolver)
emit(:close, resolver)
@current_session.select_resolver(resolver, @current_selector)
end
end
end
end

View File

@ -164,7 +164,10 @@ module HTTPX
@connections.delete(connection)
# This loop_time passed to the exception is bogus. Ideally we would pass the total
# resolve timeout, including from the previous retries.
raise ResolveTimeoutError.new(loop_time, "Timed out while resolving #{connection.origin.host}")
ex = ResolveTimeoutError.new(loop_time, "Timed out while resolving #{connection.origin.host}")
ex.set_backtrace(ex ? ex.backtrace : caller)
emit_resolve_error(connection, host, ex)
emit(:close, self)
end
end
@ -248,7 +251,10 @@ module HTTPX
unless @queries.value?(connection)
@connections.delete(connection)
raise NativeResolveError.new(connection, connection.origin.host, "name or service not known")
ex = NativeResolveError.new(connection, connection.origin.host, "name or service not known")
ex.set_backtrace(ex ? ex.backtrace : caller)
emit_resolve_error(connection, connection.origin.host, ex)
emit(:close, self)
end
resolve
@ -311,7 +317,7 @@ module HTTPX
# clean up intermediate queries
@timeouts.delete(name) unless connection.origin.host == name
if catch(:coalesced) { early_resolve(connection, hostname: hostname_alias) }
if early_resolve(connection, hostname: hostname_alias)
@connections.delete(connection)
else
if @socket_type == :tcp
@ -332,7 +338,7 @@ module HTTPX
catch(:coalesced) { emit_addresses(connection, @family, addresses.map { |addr| addr["data"] }) }
end
end
return emit(:close) if @connections.empty?
return emit(:close, self) if @connections.empty?
resolve
end
@ -358,7 +364,10 @@ module HTTPX
begin
@write_buffer << encode_dns_query(hostname)
rescue Resolv::DNS::EncodeError => e
reset_hostname(hostname, connection: connection)
@connections.delete(connection)
emit_resolve_error(connection, hostname, e)
emit(:close, self) if @connections.empty?
end
end
@ -435,12 +444,17 @@ module HTTPX
def handle_error(error)
if error.respond_to?(:connection) &&
error.respond_to?(:host)
reset_hostname(error.host, connection: error.connection)
@connections.delete(error.connection)
emit_resolve_error(error.connection, error.host, error)
else
@queries.each do |host, connection|
reset_hostname(host, connection: connection)
@connections.delete(connection)
emit_resolve_error(connection, host, error)
end
end
emit(:close, self) if @connections.empty?
end
def reset_hostname(hostname, connection: @queries.delete(hostname), reset_candidates: true)

View File

@ -26,14 +26,26 @@ module HTTPX
end
end
attr_reader :family
attr_reader :family, :options
attr_writer :pool
attr_writer :current_selector, :current_session
attr_accessor :multi
def initialize(family, options)
@family = family
@record_type = RECORD_TYPES[family]
@options = options
set_resolver_callbacks
end
def each_connection(&block)
enum_for(__method__) unless block
return unless @connections
@connections.each(&block)
end
def close; end
@ -48,6 +60,10 @@ module HTTPX
true
end
def inflight?
false
end
def emit_addresses(connection, family, addresses, early_resolve = false)
addresses.map! do |address|
address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s)
@ -57,13 +73,13 @@ module HTTPX
return if !early_resolve && connection.addresses && !addresses.intersect?(connection.addresses)
log { "resolver: answer #{FAMILY_TYPES[RECORD_TYPES[family]]} #{connection.origin.host}: #{addresses.inspect}" }
if @pool && # if triggered by early resolve, pool may not be here yet
if @current_selector && # if triggered by early resolve, session may not be here yet
!connection.io &&
connection.options.ip_families.size > 1 &&
family == Socket::AF_INET &&
addresses.first.to_s != connection.origin.host.to_s
log { "resolver: A response, applying resolution delay..." }
@pool.after(0.05) do
@current_selector.after(0.05) do
unless connection.state == :closed ||
# double emission check
(connection.addresses && addresses.intersect?(connection.addresses))
@ -102,10 +118,12 @@ module HTTPX
return if addresses.empty?
emit_addresses(connection, @family, addresses, true)
addresses
end
def emit_resolve_error(connection, hostname = connection.origin.host, ex = nil)
emit(:error, connection, resolve_error(hostname, ex))
emit_connection_error(connection, resolve_error(hostname, ex))
end
def resolve_error(hostname, ex = nil)
@ -116,5 +134,25 @@ module HTTPX
error.set_backtrace(ex ? ex.backtrace : caller)
error
end
def set_resolver_callbacks
on(:resolve, &method(:resolve_connection))
on(:error, &method(:emit_connection_error))
on(:close, &method(:close_resolver))
end
def resolve_connection(connection)
@current_session.__send__(:on_resolver_connection, connection, @current_selector)
end
def emit_connection_error(connection, error)
return connection.emit(:connect_error, error) if connection.connecting? && connection.callbacks_for?(:connect_error)
connection.emit(:error, error)
end
def close_resolver(resolver)
@current_session.__send__(:on_resolver_close, resolver, @current_selector)
end
end
end

View File

@ -47,8 +47,12 @@ module HTTPX
yield self
end
def connections
EMPTY
def multi
self
end
def empty?
true
end
def close
@ -92,6 +96,11 @@ module HTTPX
resolve
end
def early_resolve(connection)
self << connection
true
end
def handle_socket_timeout(interval)
error = HTTPX::ResolveTimeoutError.new(interval, "timed out while waiting on select")
error.set_backtrace(caller)
@ -120,23 +129,26 @@ module HTTPX
def consume
return if @connections.empty?
while @pipe_read.ready? && (event = @pipe_read.getbyte)
if @pipe_read.wait_readable
event = @pipe_read.getbyte
case event
when DONE
*pair, addrs = @pipe_mutex.synchronize { @ips.pop }
@queries.delete(pair)
_, connection = pair
@connections.delete(connection)
family, connection = pair
catch(:coalesced) { emit_addresses(connection, family, addrs) }
when ERROR
*pair, error = @pipe_mutex.synchronize { @ips.pop }
@queries.delete(pair)
@connections.delete(connection)
family, connection = pair
_, connection = pair
emit_resolve_error(connection, connection.origin.host, error)
end
@connections.delete(connection) if @queries.empty?
end
return emit(:close, self) if @connections.empty?
@ -210,5 +222,11 @@ module HTTPX
def __addrinfo_resolve(host, scheme)
Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM)
end
def emit_connection_error(_, error)
throw(:resolve_error, error)
end
def close_resolver(resolver); end
end
end

View File

@ -2,137 +2,206 @@
require "io/wait"
class HTTPX::Selector
READABLE = %i[rw r].freeze
WRITABLE = %i[rw w].freeze
module HTTPX
class Selector
extend Forwardable
private_constant :READABLE
private_constant :WRITABLE
READABLE = %i[rw r].freeze
WRITABLE = %i[rw w].freeze
def initialize
@selectables = []
end
private_constant :READABLE
private_constant :WRITABLE
# deregisters +io+ from selectables.
def deregister(io)
@selectables.delete(io)
end
def_delegator :@timers, :after
# register +io+.
def register(io)
return if @selectables.include?(io)
def_delegator :@selectables, :empty?
@selectables << io
end
def initialize
@timers = Timers.new
@selectables = []
end
private
def each(&blk)
@selectables.each(&blk)
end
def select_many(interval, &block)
selectables, r, w = nil
def next_tick
catch(:jump_tick) do
timeout = next_timeout
if timeout && timeout.negative?
@timers.fire
throw(:jump_tick)
end
# first, we group IOs based on interest type. On call to #interests however,
# things might already happen, and new IOs might be registered, so we might
# have to start all over again. We do this until we group all selectables
begin
loop do
begin
r = nil
w = nil
select(timeout, &:call)
@timers.fire
rescue TimeoutError => e
@timers.fire(e)
end
end
rescue StandardError => e
emit_error(e)
rescue Exception # rubocop:disable Lint/RescueException
each_connection(&:force_reset)
raise
end
selectables = @selectables
@selectables = []
def terminate
# array may change during iteration
selectables = @selectables.reject(&:inflight?)
selectables.delete_if do |io|
interests = io.interests
selectables.each(&:terminate)
(r ||= []) << io if READABLE.include?(interests)
(w ||= []) << io if WRITABLE.include?(interests)
until selectables.empty?
next_tick
io.state == :closed
end
selectables &= @selectables
end
end
if @selectables.empty?
@selectables = selectables
def find_resolver(options)
res = @selectables.find do |c|
c.is_a?(Resolver::Resolver) && options == c.options
end
# do not run event loop if there's nothing to wait on.
# this might happen if connect failed and connection was unregistered.
return if (!r || r.empty?) && (!w || w.empty?) && !selectables.empty?
res.multi if res
end
break
else
@selectables.concat(selectables)
end
rescue StandardError
@selectables = selectables if selectables
raise
def each_connection(&block)
return enum_for(__method__) unless block
@selectables.each do |c|
if c.is_a?(Resolver::Resolver)
c.each_connection(&block)
else
yield c
end
end
end
def find_connection(request_uri, options)
each_connection.find do |connection|
connection.match?(request_uri, options)
end
end
def find_mergeable_connection(connection)
each_connection.find do |ch|
ch != connection && ch.mergeable?(connection)
end
end
def empty?
@selectables.empty?
end
# deregisters +io+ from selectables.
def deregister(io)
@selectables.delete(io)
end
# register +io+.
def register(io)
return if @selectables.include?(io)
@selectables << io
end
private
def select(interval, &block)
# do not cause an infinite loop here.
#
# this may happen if timeout calculation actually triggered an error which causes
# the connections to be reaped (such as the total timeout error) before #select
# gets called.
return if interval.nil? && @selectables.empty?
return select_one(interval, &block) if @selectables.size == 1
select_many(interval, &block)
end
def select_many(interval, &block)
r, w = nil
# first, we group IOs based on interest type. On call to #interests however,
# things might already happen, and new IOs might be registered, so we might
# have to start all over again. We do this until we group all selectables
begin
@selectables.delete_if do |io|
interests = io.interests
(r ||= []) << io if READABLE.include?(interests)
(w ||= []) << io if WRITABLE.include?(interests)
io.state == :closed
end
# TODO: what to do if there are no selectables?
readers, writers = IO.select(r, w, nil, interval)
if readers.nil? && writers.nil? && interval
[*r, *w].each { |io| io.handle_socket_timeout(interval) }
return
end
end
# TODO: what to do if there are no selectables?
if writers
readers.each do |io|
yield io
readers, writers = IO.select(r, w, nil, interval)
# so that we don't yield 2 times
writers.delete(io)
end if readers
if readers.nil? && writers.nil? && interval
[*r, *w].each { |io| io.handle_socket_timeout(interval) }
writers.each(&block)
else
readers.each(&block) if readers
end
end
def select_one(interval)
io = @selectables.first
return unless io
interests = io.interests
result = case interests
when :r then io.to_io.wait_readable(interval)
when :w then io.to_io.wait_writable(interval)
when :rw then io.to_io.wait(interval, :read_write)
when nil then return
end
unless result || interval.nil?
io.handle_socket_timeout(interval)
return
end
rescue IOError, SystemCallError
@selectables.reject!(&:closed?)
retry
# raise TimeoutError.new(interval, "timed out while waiting on select")
yield io
# rescue IOError, SystemCallError
# @selectables.reject!(&:closed?)
# raise unless @selectables.empty?
end
if writers
readers.each do |io|
yield io
def next_timeout
[
@timers.wait_interval,
@selectables.filter_map(&:timeout).min,
].compact.min
end
# so that we don't yield 2 times
writers.delete(io)
end if readers
def emit_error(e)
@selectables.each do |c|
next if c.is_a?(Resolver::Resolver)
writers.each(&block)
else
readers.each(&block) if readers
c.emit(:error, e)
end
end
end
def select_one(interval)
io = @selectables.first
return unless io
interests = io.interests
result = case interests
when :r then io.to_io.wait_readable(interval)
when :w then io.to_io.wait_writable(interval)
when :rw then io.to_io.wait(interval, :read_write)
when nil then return
end
unless result || interval.nil?
io.handle_socket_timeout(interval)
return
end
# raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select")
yield io
rescue IOError, SystemCallError
@selectables.reject!(&:closed?)
raise unless @selectables.empty?
end
def select(interval, &block)
# do not cause an infinite loop here.
#
# this may happen if timeout calculation actually triggered an error which causes
# the connections to be reaped (such as the total timeout error) before #select
# gets called.
return if interval.nil? && @selectables.empty?
return select_one(interval, &block) if @selectables.size == 1
select_many(interval, &block)
end
public :select
end

View File

@ -19,6 +19,7 @@ module HTTPX
@options = self.class.default_options.merge(options)
@responses = {}
@persistent = @options.persistent
@wrapped = false
wrap(&blk) if blk
end
@ -28,21 +29,49 @@ module HTTPX
# http.get("https://wikipedia.com")
# end # wikipedia connection closes here
def wrap
prev_persistent = @persistent
@persistent = true
pool.wrap do
begin
yield self
ensure
@persistent = prev_persistent
close unless @persistent
prev_wrapped = @wrapped
@wrapped = true
prev_selector = Thread.current[:httpx_selector]
Thread.current[:httpx_selector] = current_selector = Selector.new
begin
yield self
ensure
unless prev_wrapped
if @persistent
deactivate(current_selector)
else
close(current_selector)
end
end
@wrapped = prev_wrapped
Thread.current[:httpx_selector] = prev_selector
end
end
# closes all the active connections from the session
def close(*args)
pool.close(*args)
# closes all the active connections from the session.
#
# when called directly with +selector+ as nil, all available connections
# will be picked up from the connection pool and closed. Connections in use
# by other sessions, or same session in a different thread, will not be reaped.
def close(selector = nil)
if selector.nil?
selector = Selector.new
while (connection = pool.checkout_by_options(@options))
connection.current_session = self
connection.current_selector = selector
select_connection(connection, selector)
end
return close(selector)
end
begin
@closing = true
selector.terminate
ensure
@closing = false
end
end
# performs one, or multple requests; it accepts:
@ -89,8 +118,106 @@ module HTTPX
request
end
def select_connection(connection, selector)
selector.register(connection)
end
alias_method :select_resolver, :select_connection
def deselect_connection(connection, selector, cloned = false)
selector.deregister(connection)
return if cloned
pool.checkin_connection(connection)
end
def deselect_resolver(resolver, selector)
selector.deregister(resolver)
pool.checkin_resolver(resolver)
end
def try_clone_connection(connection, selector, family)
connection.family ||= family
return connection if connection.family == family
new_connection = connection.class.new(connection.origin, connection.options)
new_connection.family = family
new_connection.current_session = self
new_connection.current_selector = selector
connection.once(:tcp_open) { new_connection.force_reset(true) }
connection.once(:connect_error) do |err|
if new_connection.connecting?
new_connection.merge(connection)
connection.emit(:cloned, new_connection)
connection.force_reset(true)
else
connection.__send__(:handle_error, err)
end
end
new_connection.once(:tcp_open) do |new_conn|
if new_conn != connection
new_conn.merge(connection)
connection.force_reset(true)
end
end
new_connection.once(:connect_error) do |err|
if connection.connecting?
# main connection has the requests
connection.merge(new_connection)
new_connection.emit(:cloned, connection)
new_connection.force_reset(true)
else
new_connection.__send__(:handle_error, err)
end
end
do_init_connection(new_connection, selector)
end
# returns the HTTPX::Connection through which the +request+ should be sent through.
def find_connection(request_uri, selector, options)
if (connection = selector.find_connection(request_uri, options))
return connection
end
connection = pool.checkout_connection(request_uri, options)
connection.current_session = self
connection.current_selector = selector
case connection.state
when :idle
do_init_connection(connection, selector)
when :open
select_connection(connection, selector) if options.io
when :closed
connection.idling
select_connection(connection, selector)
when :closing
connection.once(:close) do
connection.idling
select_connection(connection, selector)
end
end
connection
end
private
def deactivate(selector)
selector.each_connection do |connection|
connection.deactivate
deselect_connection(connection, selector) if connection.state == :inactive
end
end
# returns the HTTPX::Pool object which manages the networking required to
# perform requests.
def pool
@ -109,26 +236,14 @@ module HTTPX
end
# returns the corresponding HTTP::Response to the given +request+ if it has been received.
def fetch_response(request, _, _)
def fetch_response(request, _selector, _options)
@responses.delete(request)
end
# returns the HTTPX::Connection through which the +request+ should be sent through.
def find_connection(request, connections, options)
uri = request.uri
connection = pool.find_connection(uri, options) || init_connection(uri, options)
unless connections.nil? || connections.include?(connection)
connections << connection
set_connection_callbacks(connection, connections, options)
end
connection
end
# sends the +request+ to the corresponding HTTPX::Connection
def send_request(request, connections, options = request.options)
def send_request(request, selector, options = request.options)
error = catch(:resolve_error) do
connection = find_connection(request, connections, options)
connection = find_connection(request.uri, selector, options)
connection.send(request)
end
return unless error.is_a?(Error)
@ -136,61 +251,6 @@ module HTTPX
request.emit(:response, ErrorResponse.new(request, error))
end
# sets the callbacks on the +connection+ required to process certain specific
# connection lifecycle events which deal with request rerouting.
def set_connection_callbacks(connection, connections, options, cloned: false)
connection.only(:misdirected) do |misdirected_request|
other_connection = connection.create_idle(ssl: { alpn_protocols: %w[http/1.1] })
other_connection.merge(connection)
catch(:coalesced) do
pool.init_connection(other_connection, options)
end
set_connection_callbacks(other_connection, connections, options)
connections << other_connection
misdirected_request.transition(:idle)
other_connection.send(misdirected_request)
end
connection.only(:altsvc) do |alt_origin, origin, alt_params|
other_connection = build_altsvc_connection(connection, connections, alt_origin, origin, alt_params, options)
connections << other_connection if other_connection
end
connection.only(:cloned) do |cloned_conn|
set_connection_callbacks(cloned_conn, connections, options, cloned: true)
connections << cloned_conn
end unless cloned
end
# returns an HTTPX::Connection for the negotiated Alternative Service (or none).
def build_altsvc_connection(existing_connection, connections, alt_origin, origin, alt_params, options)
# do not allow security downgrades on altsvc negotiation
return if existing_connection.origin.scheme == "https" && alt_origin.scheme != "https"
altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin))
# altsvc already exists, somehow it wasn't advertised, probably noop
return unless altsvc
alt_options = options.merge(ssl: options.ssl.merge(hostname: URI(origin).host))
connection = pool.find_connection(alt_origin, alt_options) || init_connection(alt_origin, alt_options)
# advertised altsvc is the same origin being used, ignore
return if connection == existing_connection
connection.extend(AltSvc::ConnectionMixin) unless connection.is_a?(AltSvc::ConnectionMixin)
set_connection_callbacks(connection, connections, alt_options)
log(level: 1) { "#{origin} alt-svc: #{alt_origin}" }
connection.merge(existing_connection)
existing_connection.terminate
connection
rescue UnsupportedSchemeError
altsvc["noop"] = true
nil
end
# returns a set of HTTPX::Request objects built from the given +args+ and +options+.
def build_requests(*args, params)
requests = if args.size == 1
@ -224,12 +284,9 @@ module HTTPX
request.on(:promise, &method(:on_promise))
end
def init_connection(uri, options)
connection = options.connection_class.new(uri, options)
catch(:coalesced) do
pool.init_connection(connection, options)
connection
end
def do_init_connection(connection, selector)
resolve_connection(connection, selector) unless connection.family
connection
end
def deactivate_connection(request, connections, options)
@ -242,64 +299,128 @@ module HTTPX
# sends an array of HTTPX::Request +requests+, returns the respective array of HTTPX::Response objects.
def send_requests(*requests)
connections = _send_requests(requests)
receive_requests(requests, connections)
selector = Thread.current[:httpx_selector] || Selector.new
_send_requests(requests, selector)
receive_requests(requests, selector)
ensure
unless @wrapped
if @persistent
deactivate(selector)
else
close(selector)
end
end
end
# sends an array of HTTPX::Request objects
def _send_requests(requests)
connections = []
def _send_requests(requests, selector)
requests.each do |request|
send_request(request, connections)
send_request(request, selector)
end
connections
end
# returns the array of HTTPX::Response objects corresponding to the array of HTTPX::Request +requests+.
def receive_requests(requests, connections)
def receive_requests(requests, selector)
# @type var responses: Array[response]
responses = []
begin
# guarantee ordered responses
loop do
request = requests.first
# guarantee ordered responses
loop do
request = requests.first
return responses unless request
return responses unless request
catch(:coalesced) { pool.next_tick(connections) } until (response = fetch_response(request, connections, request.options))
request.emit(:complete, response)
catch(:coalesced) { selector.next_tick } until (response = fetch_response(request, selector, request.options))
request.emit(:complete, response)
responses << response
requests.shift
break if requests.empty?
next unless selector.empty?
# in some cases, the pool of connections might have been drained because there was some
# handshake error, and the error responses have already been emitted, but there was no
# opportunity to traverse the requests, hence we're returning only a fraction of the errors
# we were supposed to. This effectively fetches the existing responses and return them.
while (request = requests.shift)
response = fetch_response(request, selector, request.options)
request.emit(:complete, response) if response
responses << response
requests.shift
break if requests.empty?
next unless pool.empty?
# in some cases, the pool of connections might have been drained because there was some
# handshake error, and the error responses have already been emitted, but there was no
# opportunity to traverse the requests, hence we're returning only a fraction of the errors
# we were supposed to. This effectively fetches the existing responses and return them.
while (request = requests.shift)
response = fetch_response(request, connections, request.options)
request.emit(:complete, response) if response
responses << response
end
break
end
responses
ensure
if @persistent
pool.deactivate(*connections)
else
close(connections)
break
end
responses
end
def resolve_connection(connection, selector)
if connection.addresses || connection.open?
#
# there are two cases in which we want to activate initialization of
# connection immediately:
#
# 1. when the connection already has addresses, i.e. it doesn't need to
# resolve a name (not the same as name being an IP, yet)
# 2. when the connection is initialized with an external already open IO.
#
connection.once(:connect_error, &connection.method(:handle_error))
on_resolver_connection(connection, selector)
return
end
resolver = find_resolver_for(connection, selector)
resolver.early_resolve(connection) || resolver.lazy_resolve(connection)
end
def on_resolver_connection(connection, selector)
found_connection = selector.find_mergeable_connection(connection) ||
pool.checkout_mergeable_connection(connection)
return select_connection(connection, selector) unless found_connection
if found_connection.open?
coalesce_connections(found_connection, connection, selector)
else
found_connection.once(:open) do
coalesce_connections(found_connection, connection, selector)
end
end
end
def on_resolver_close(resolver, selector)
return if resolver.closed?
deselect_resolver(resolver, selector)
resolver.close unless resolver.closed?
end
def find_resolver_for(connection, selector)
resolver = selector.find_resolver(connection.options)
unless resolver
resolver = pool.checkout_resolver(connection.options)
resolver.current_session = self
resolver.current_selector = selector
end
resolver
end
def coalesce_connections(conn1, conn2, selector)
unless conn1.coalescable?(conn2)
select_connection(conn2, selector)
return false
end
conn2.emit(:tcp_open, conn1)
conn1.merge(conn2)
conn2.coalesced_connection = conn1
deselect_connection(conn2, selector)
true
end
@default_options = Options.new
@default_options.freeze
@plugins = []

View File

@ -26,8 +26,9 @@ module HTTPX
attr_reader pending: Array[Request]
attr_reader options: Options
attr_reader ssl_session: OpenSSL::SSL::Session?
attr_writer timers: Timers
attr_writer current_selector: Selector?
attr_writer coalesced_connection: instance?
attr_accessor current_session: Session?
attr_accessor family: Integer?
@window_size: Integer
@ -42,6 +43,8 @@ module HTTPX
@connected_at: Float
@response_received_at: Float
@intervals: Array[Timers::Interval]
@exhausted: bool
@cloned: bool
def addresses: () -> Array[ipaddr]?
@ -57,7 +60,7 @@ module HTTPX
def coalescable?: (Connection connection) -> bool
def create_idle: (?Hash[Symbol, untyped] options) -> Connection
def create_idle: (?Hash[Symbol, untyped] options) -> instance
def merge: (Connection connection) -> void
@ -77,6 +80,8 @@ module HTTPX
def close: () -> void
def force_reset: (?bool cloned) -> void
def reset: () -> void
def timeout: () -> Numeric?
@ -99,6 +104,8 @@ module HTTPX
def connect: () -> void
def disconnect: () -> void
def exhausted?: () -> boolish
def consume: () -> void
@ -117,6 +124,8 @@ module HTTPX
def handle_transition: (Symbol nextstate) -> void
def build_altsvc_connection: (URI::Generic alt_origin, String origin, Hash[String, String] alt_params) -> void
def build_socket: (?Array[ipaddr]? addrs) -> (TCP | SSL | UNIX)
def on_error: (HTTPX::TimeoutError | Error | StandardError error, ?Request? request) -> void

View File

@ -1,7 +1,32 @@
module HTTPX
module Resolver
class Multi
attr_reader resolvers: Array[Native | HTTPS]
attr_reader options: Options
@current_selector: Selector?
@current_session: Session?
@resolver_options: Hash[Symbol, untyped]
# @errors: Hash[Symbol, untyped]
def current_selector=: (Selector s) -> void
def current_session=: (Session s) -> void
def closed?: () -> bool
def empty?: () -> bool
def timeout: () -> Numeric?
def close: () -> void
def connections: () -> Array[Connection]
def early_resolve: (Connection connection, ?hostname: String) -> void
def lazy_resolve: (Connection connection) -> void
end
end
end

View File

@ -7,8 +7,6 @@ module HTTPX
DEFAULTS: Hash[Symbol, untyped]
DNS_PORT: Integer
attr_reader family: ip_family
@options: Options
@ns_index: Integer
@nameserver: Array[String]

View File

@ -6,8 +6,17 @@ module HTTPX
RECORD_TYPES: Hash[Integer, singleton(Resolv::DNS::Resource)]
attr_reader family: ip_family
attr_reader options: Options
attr_writer current_selector: Selector?
attr_writer current_session: Session?
attr_accessor multi: Multi?
@record_type: singleton(Resolv::DNS::Resource)
@options: Options
@resolver_options: Hash[Symbol, untyped]
@queries: Hash[String, Connection]
@system_resolver: Resolv::Hosts
@ -20,6 +29,8 @@ module HTTPX
def empty?: () -> bool
def each_connection: () { (Connection connection) -> void } -> void
def emit_addresses: (Connection connection, ip_family family, Array[IPAddr], ?bool early_resolve) -> void
private
@ -28,7 +39,15 @@ module HTTPX
def initialize: (ip_family? family, Options options) -> void
def early_resolve: (Connection connection, ?hostname: String) -> boolish
def early_resolve: (Connection connection, ?hostname: String) -> Array[IPAddr]?
def set_resolver_callbacks: () -> void
def resolve_connection: (Connection connection) -> void
def emit_connection_error: (Connection connection, StandardError error) -> void
def close_resolver: (Resolver resolver) -> void
def emit_resolve_error: (Connection connection, ?String hostname, ?StandardError) -> void

View File

@ -1,22 +1,48 @@
module HTTPX
type selectable = Connection | Resolver::Native
class Selector
type selectable = Connection | Resolver::Native | Resolver::System
include _Each[selectable]
READABLE: Array[Symbol]
WRITABLE: Array[Symbol]
@timers: Timers
@selectables: Array[selectable]
def register: (selectable io) -> void
def deregister: (selectable io) -> selectable?
def next_tick: () -> void
def select: (Numeric? interval) { (selectable) -> void } -> void
def terminate: () -> void
def find_resolver: (Options options) -> Resolver::Resolver?
def find_connection: (http_uri request_uri, Options options) -> Connection?
def each_connection: () { (Connection) -> void} -> void
| () -> Enumerable[Connection]
def find_mergeable_connection: (Connection connection) -> Connection?
def empty?: () -> bool
def register: (selectable io) -> void
def deregister: (selectable io) -> selectable?
private
def initialize: () -> untyped
def initialize: () -> void
def select: (Numeric? interval) { (selectable) -> void } -> void
def select_many: (Numeric? interval) { (selectable) -> void } -> void
def select_one: (Numeric? interval) { (selectable) -> void } -> void
def next_timeout: () -> Numeric?
def emit_error: (StandardError e) -> void
end
type io_interests = :r | :w | :rw

View File

@ -7,20 +7,35 @@ module HTTPX
@options: Options
@responses: Hash[Request, response]
@persistent: bool?
@persistent: bool
@wrapped: bool
def self.plugin: (Symbol | Module plugin, ?options? options) ?{ (Class) -> void } -> singleton(Session)
def wrap: () { (instance) -> void } -> void
def close: (*untyped) -> void
def close: (?Selector selector) -> void
def build_request: (verb verb, generic_uri uri, ?request_params params, ?Options options) -> Request
def select_connection: (Connection connection, Selector selector) -> void
def deselect_connection: (Connection connection, Selector selector, ?bool cloned) -> void
def select_resolver: (Resolver::Native | Resolver::HTTPS resolver, Selector selector) -> void
def deselect_resolver: (Resolver::Resolver resolver, Selector selector) -> void
def try_clone_connection: (Connection connection, Selector selector, Integer? family) -> Connection
def find_connection: (http_uri request_uri, Selector selector, Options options) -> Connection
private
def initialize: (?options) { (self) -> void } -> void
| (?options) -> void
private
def deactivate: (Selector selector) -> void
def pool: -> Pool
@ -28,33 +43,35 @@ module HTTPX
def on_promise: (untyped, untyped) -> void
def fetch_response: (Request request, Array[Connection] connections, untyped options) -> response?
def fetch_response: (Request request, Selector selector, Options options) -> response?
def find_connection: (Request request, Array[Connection] connections, Options options) -> Connection
def deactivate_connection: (Request request, Array[Connection] connections, Options options) -> void
def send_request: (Request request, Array[Connection] connections, ?Options options) -> void
def set_connection_callbacks: (Connection connection, Array[Connection] connections, Options options, ?cloned: bool) -> void
def send_request: (Request request, Selector selector, ?Options options) -> void
def set_request_callbacks: (Request request) -> void
def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options) -> (Connection & AltSvc::ConnectionMixin)?
def build_requests: (verb, uri, request_params) -> Array[Request]
| (Array[[verb, uri, request_params]], Hash[Symbol, untyped]) -> Array[Request]
| (Array[[verb, uri]], request_params) -> Array[Request]
| (verb, _Each[[uri, request_params]], Hash[Symbol, untyped]) -> Array[Request]
| (verb, _Each[uri], request_params) -> Array[Request]
def init_connection: (http_uri uri, Options options) -> Connection
def do_init_connection: (Connection connection, Selector selector) -> Connection
def send_requests: (*Request) -> Array[response]
def _send_requests: (Array[Request] requests) -> Array[Connection]
def _send_requests: (Array[Request] requests, Selector selector) -> void
def receive_requests: (Array[Request] requests, Array[Connection] connections) -> Array[response]
def receive_requests: (Array[Request] requests, Selector selector) -> Array[response]
def resolve_connection: (Connection connection, Selector selector) -> void
def on_resolve_connection: (Connection connection, Selector selector) -> void
def on_resolver_close: (Resolver::Resolver resolver, Selector selector) -> void
def find_resolver_for: (Connection connection, Selector selector) -> (Resolver::Multi | Resolver::Resolver)
def coalesce_connections: (Connection conn1, Connection conn2, Selector selector) -> bool
attr_reader self.default_options: Options
end

View File

@ -13,7 +13,7 @@ module MinitestExtensions
module FirstFailedTestInThread
def self.prepended(*)
super
HTTPX::Session.include SessionExtensions
HTTPX::Connection.include ConnectionExtensions
end
def setup
@ -21,11 +21,10 @@ module MinitestExtensions
extend(OnTheFly)
end
module SessionExtensions
def find_connection(request, connections, _)
connection = super
module ConnectionExtensions
def send(request)
request.instance_variable_set(:@connection, connection)
connection
super
end
end

View File

@ -36,41 +36,6 @@ module Requests
assert options.persistent
end
def test_persistent_with_wrap
return unless origin.start_with?("https")
uri = build_uri("/get")
session1 = HTTPX.plugin(:persistent)
begin
pool = session1.send(:pool)
initial_size = pool.instance_variable_get(:@connections).size
response = session1.get(uri)
verify_status(response, 200)
connections = pool.instance_variable_get(:@connections)
pool_size = connections.size
assert pool_size == initial_size + 1
HTTPX.wrap do |s|
response = s.get(uri)
verify_status(response, 200)
wrapped_connections = pool.instance_variable_get(:@connections)
pool_size = wrapped_connections.size
assert pool_size == 1
assert (connections - wrapped_connections) == connections
end
final_connections = pool.instance_variable_get(:@connections)
pool_size = final_connections.size
assert pool_size == initial_size + 1
assert (connections - final_connections).empty?
ensure
session1.close
end
end
def test_persistent_retry_http2_goaway
return unless origin.start_with?("https")

View File

@ -130,17 +130,11 @@ module WSTestPlugin
end
end
module InstanceMethods
def find_connection(request, *)
return super if request.websocket
module ConnectionMethods
def send(request)
request.init_websocket(self) unless request.websocket || @upgrade_protocol
conn = super
return conn unless conn && !conn.upgrade_protocol
request.init_websocket(conn)
conn
super
end
end