diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 68b186ed..fc08adce 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -43,11 +43,14 @@ module HTTPX attr_reader :type, :io, :origin, :origins, :state, :pending, :options, :ssl_session - attr_writer :timers + attr_writer :current_selector, :coalesced_connection - attr_accessor :family + attr_accessor :current_session, :family def initialize(uri, options) + @current_session = @current_selector = @coalesced_connection = nil + @exhausted = @cloned = false + @origins = [uri.origin] @origin = Utils.to_uri(uri.origin) @options = Options.new(options) @@ -58,6 +61,7 @@ module HTTPX @read_buffer = Buffer.new(@options.buffer_size) @write_buffer = Buffer.new(@options.buffer_size) @pending = [] + on(:error, &method(:on_error)) if @options.io # if there's an already open IO, get its @@ -68,6 +72,40 @@ module HTTPX else transition(:idle) end + on(:activate) do + @current_session.select_connection(self, @current_selector) + end + on(:close) do + next if @exhausted # it'll reset + + # may be called after ":close" above, so after the connection has been checked back in. + # next unless @current_session + + next unless @current_session + + @current_session.deselect_connection(self, @current_selector, @cloned) + end + on(:terminate) do + next if @exhausted # it'll reset + + # may be called after ":close" above, so after the connection has been checked back in. + next unless @current_session + + @current_session.deselect_connection(self, @current_selector) + end + + # sets the callbacks on the +connection+ required to process certain specific + # connection lifecycle events which deal with request rerouting. + on(:misdirected) do |misdirected_request| + other_connection = @current_session.find_connection(@origin, @current_selector, + @options.merge(ssl: { alpn_protocols: %w[http/1.1] })) + other_connection.merge(self) + misdirected_request.transition(:idle) + other_connection.send(misdirected_request) + end + on(:altsvc) do |alt_origin, origin, alt_params| + build_altsvc_connection(alt_origin, origin, alt_params) + end @inflight = 0 @keep_alive_timeout = @options.timeout[:keep_alive_timeout] @@ -168,7 +206,12 @@ module HTTPX end def inflight? - @parser && !@parser.empty? && !@write_buffer.empty? + @parser && ( + # parser may be dealing with other requests (possibly started from a different fiber) + !@parser.empty? || + # connection may be doing connection termination handshake + !@write_buffer.empty? + ) end def interests @@ -184,6 +227,9 @@ module HTTPX return @parser.interests if @parser + nil + rescue StandardError => e + emit(:error, e) nil end @@ -205,6 +251,9 @@ module HTTPX consume end nil + rescue StandardError => e + emit(:error, e) + raise e end def close @@ -221,8 +270,9 @@ module HTTPX # bypasses the state machine to force closing of connections still connecting. # **only** used for Happy Eyeballs v2. - def force_reset + def force_reset(cloned = false) @state = :closing + @cloned = cloned transition(:closed) end @@ -235,6 +285,8 @@ module HTTPX end def send(request) + return @coalesced_connection.send(request) if @coalesced_connection + if @parser && !@write_buffer.full? if @response_received_at && @keep_alive_timeout && Utils.elapsed_time(@response_received_at) > @keep_alive_timeout @@ -255,6 +307,8 @@ module HTTPX end def timeout + return if @state == :closed || @state == :inactive + return @timeout if @timeout return @options.timeout[:connect_timeout] if @state == :idle @@ -301,6 +355,12 @@ module HTTPX transition(:open) end + def disconnect + emit(:close) + @current_session = nil + @current_selector = nil + end + def consume return unless @io @@ -475,8 +535,25 @@ module HTTPX request.emit(:promise, parser, stream) end parser.on(:exhausted) do + @exhausted = true + current_session = @current_session + current_selector = @current_selector + parser.close @pending.concat(parser.pending) - emit(:exhausted) + case @state + when :closed + idling + @exhausted = false + @current_session = current_session + @current_selector = current_selector + when :closing + once(:close) do + idling + @exhausted = false + @current_session = current_session + @current_selector = current_selector + end + end end parser.on(:origin) do |origin| @origins |= [origin] @@ -492,8 +569,14 @@ module HTTPX end parser.on(:reset) do @pending.concat(parser.pending) unless parser.empty? + current_session = @current_session + current_selector = @current_selector reset - idling unless @pending.empty? + unless @pending.empty? + idling + @current_session = current_session + @current_selector = current_selector + end end parser.on(:current_timeout) do @current_timeout = @timeout = parser.timeout @@ -531,13 +614,13 @@ module HTTPX error.set_backtrace(e.backtrace) connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, error) : handle_error(error) @state = :closed - emit(:close) + disconnect rescue TLSError, ::HTTP2::Error::ProtocolError, ::HTTP2::Error::HandshakeError => e # connect errors, exit gracefully handle_error(e) connecting? && callbacks_for?(:connect_error) ? emit(:connect_error, e) : handle_error(e) @state = :closed - emit(:close) + disconnect end def handle_transition(nextstate) @@ -582,7 +665,7 @@ module HTTPX return unless @write_buffer.empty? purge_after_closed - emit(:close) if @pending.empty? + disconnect if @pending.empty? when :already_open nextstate = :open # the first check for given io readiness must still use a timeout. @@ -617,6 +700,34 @@ module HTTPX end end + # returns an HTTPX::Connection for the negotiated Alternative Service (or none). + def build_altsvc_connection(alt_origin, origin, alt_params) + # do not allow security downgrades on altsvc negotiation + return if @origin.scheme == "https" && alt_origin.scheme != "https" + + altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin)) + + # altsvc already exists, somehow it wasn't advertised, probably noop + return unless altsvc + + alt_options = @options.merge(ssl: @options.ssl.merge(hostname: URI(origin).host)) + + connection = @current_session.find_connection(alt_origin, @current_selector, alt_options) + + # advertised altsvc is the same origin being used, ignore + return if connection == self + + connection.extend(AltSvc::ConnectionMixin) unless connection.is_a?(AltSvc::ConnectionMixin) + + log(level: 1) { "#{origin} alt-svc: #{alt_origin}" } + + connection.merge(self) + terminate + rescue UnsupportedSchemeError + altsvc["noop"] = true + nil + end + def build_socket(addrs = nil) case @type when "tcp" @@ -734,7 +845,7 @@ module HTTPX def set_request_timeout(request, timeout, start_event, finish_events, &callback) request.once(start_event) do - interval = @timers.after(timeout, callback) + interval = @current_selector.after(timeout, callback) Array(finish_events).each do |event| # clean up request timeouts if the connection errors out diff --git a/lib/httpx/connection/http2.rb b/lib/httpx/connection/http2.rb index 630a9697..742107a4 100644 --- a/lib/httpx/connection/http2.rb +++ b/lib/httpx/connection/http2.rb @@ -327,10 +327,14 @@ module HTTPX end end send(@pending.shift) unless @pending.empty? + return unless @streams.empty? && exhausted? - close - emit(:exhausted) unless @pending.empty? + if @pending.empty? + close + else + emit(:exhausted) + end end def on_frame(bytes) diff --git a/lib/httpx/resolver/https.rb b/lib/httpx/resolver/https.rb index 9efdd810..133659bf 100644 --- a/lib/httpx/resolver/https.rb +++ b/lib/httpx/resolver/https.rb @@ -27,7 +27,7 @@ module HTTPX use_get: false, }.freeze - def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close, :terminate + def_delegators :@resolver_connection, :state, :connecting?, :to_io, :call, :close, :terminate, :inflight? def initialize(_, options) super @@ -66,23 +66,15 @@ module HTTPX end def resolver_connection - @resolver_connection ||= @pool.find_connection(@uri, @options) || begin - @building_connection = true - connection = @options.connection_class.new(@uri, @options.merge(ssl: { alpn_protocols: %w[h2] })) - @pool.init_connection(connection, @options) - # only explicity emit addresses if connection didn't pre-resolve, i.e. it's not an IP. - catch(:coalesced) do - @building_connection = false - emit_addresses(connection, @family, @uri_addresses) unless connection.addresses - connection - end + @resolver_connection ||= @current_session.find_connection(@uri, @current_selector, + @options.merge(ssl: { alpn_protocols: %w[h2] })).tap do |conn| + emit_addresses(conn, @family, @uri_addresses) unless conn.addresses end end private def resolve(connection = @connections.first, hostname = nil) - return if @building_connection return unless connection hostname ||= @queries.key(connection) @@ -176,7 +168,7 @@ module HTTPX alias_address = answers[address["alias"]] if alias_address.nil? reset_hostname(address["name"]) - if catch(:coalesced) { early_resolve(connection, hostname: address["alias"]) } + if early_resolve(connection, hostname: address["alias"]) @connections.delete(connection) else resolve(connection, address["alias"]) diff --git a/lib/httpx/resolver/multi.rb b/lib/httpx/resolver/multi.rb index e238a0a3..8b38fbe2 100644 --- a/lib/httpx/resolver/multi.rb +++ b/lib/httpx/resolver/multi.rb @@ -8,27 +8,45 @@ module HTTPX include Callbacks using ArrayExtensions::FilterMap - attr_reader :resolvers + attr_reader :resolvers, :options def initialize(resolver_type, options) + @current_selector = nil + @current_session = nil @options = options @resolver_options = @options.resolver_options @resolvers = options.ip_families.map do |ip_family| resolver = resolver_type.new(ip_family, options) - resolver.on(:resolve, &method(:on_resolver_connection)) - resolver.on(:error, &method(:on_resolver_error)) - resolver.on(:close) { on_resolver_close(resolver) } + resolver.multi = self resolver end @errors = Hash.new { |hs, k| hs[k] = [] } end + def current_selector=(s) + @current_selector = s + @resolvers.each { |r| r.__send__(__method__, s) } + end + + def current_session=(s) + @current_session = s + @resolvers.each { |r| r.__send__(__method__, s) } + end + def closed? @resolvers.all?(&:closed?) end + def empty? + @resolvers.all?(&:empty?) + end + + def inflight? + @resolvers.any(&:inflight?) + end + def timeout @resolvers.filter_map(&:timeout).min end @@ -58,18 +76,13 @@ module HTTPX end end - private + def lazy_resolve(connection) + @resolvers.each do |resolver| + resolver << @current_session.try_clone_connection(connection, @current_selector, resolver.family) + next if resolver.empty? - def on_resolver_connection(connection) - emit(:resolve, connection) - end - - def on_resolver_error(connection, error) - emit(:error, connection, error) - end - - def on_resolver_close(resolver) - emit(:close, resolver) + @current_session.select_resolver(resolver, @current_selector) + end end end end diff --git a/lib/httpx/resolver/native.rb b/lib/httpx/resolver/native.rb index 49c9e6cb..eb2c5481 100644 --- a/lib/httpx/resolver/native.rb +++ b/lib/httpx/resolver/native.rb @@ -164,7 +164,10 @@ module HTTPX @connections.delete(connection) # This loop_time passed to the exception is bogus. Ideally we would pass the total # resolve timeout, including from the previous retries. - raise ResolveTimeoutError.new(loop_time, "Timed out while resolving #{connection.origin.host}") + ex = ResolveTimeoutError.new(loop_time, "Timed out while resolving #{connection.origin.host}") + ex.set_backtrace(ex ? ex.backtrace : caller) + emit_resolve_error(connection, host, ex) + emit(:close, self) end end @@ -248,7 +251,10 @@ module HTTPX unless @queries.value?(connection) @connections.delete(connection) - raise NativeResolveError.new(connection, connection.origin.host, "name or service not known") + ex = NativeResolveError.new(connection, connection.origin.host, "name or service not known") + ex.set_backtrace(ex ? ex.backtrace : caller) + emit_resolve_error(connection, connection.origin.host, ex) + emit(:close, self) end resolve @@ -311,7 +317,7 @@ module HTTPX # clean up intermediate queries @timeouts.delete(name) unless connection.origin.host == name - if catch(:coalesced) { early_resolve(connection, hostname: hostname_alias) } + if early_resolve(connection, hostname: hostname_alias) @connections.delete(connection) else if @socket_type == :tcp @@ -332,7 +338,7 @@ module HTTPX catch(:coalesced) { emit_addresses(connection, @family, addresses.map { |addr| addr["data"] }) } end end - return emit(:close) if @connections.empty? + return emit(:close, self) if @connections.empty? resolve end @@ -358,7 +364,10 @@ module HTTPX begin @write_buffer << encode_dns_query(hostname) rescue Resolv::DNS::EncodeError => e + reset_hostname(hostname, connection: connection) + @connections.delete(connection) emit_resolve_error(connection, hostname, e) + emit(:close, self) if @connections.empty? end end @@ -435,12 +444,17 @@ module HTTPX def handle_error(error) if error.respond_to?(:connection) && error.respond_to?(:host) + reset_hostname(error.host, connection: error.connection) + @connections.delete(error.connection) emit_resolve_error(error.connection, error.host, error) else @queries.each do |host, connection| + reset_hostname(host, connection: connection) + @connections.delete(connection) emit_resolve_error(connection, host, error) end end + emit(:close, self) if @connections.empty? end def reset_hostname(hostname, connection: @queries.delete(hostname), reset_candidates: true) diff --git a/lib/httpx/resolver/resolver.rb b/lib/httpx/resolver/resolver.rb index 4f95c944..d8df2abb 100644 --- a/lib/httpx/resolver/resolver.rb +++ b/lib/httpx/resolver/resolver.rb @@ -26,14 +26,26 @@ module HTTPX end end - attr_reader :family + attr_reader :family, :options - attr_writer :pool + attr_writer :current_selector, :current_session + + attr_accessor :multi def initialize(family, options) @family = family @record_type = RECORD_TYPES[family] @options = options + + set_resolver_callbacks + end + + def each_connection(&block) + enum_for(__method__) unless block + + return unless @connections + + @connections.each(&block) end def close; end @@ -48,6 +60,10 @@ module HTTPX true end + def inflight? + false + end + def emit_addresses(connection, family, addresses, early_resolve = false) addresses.map! do |address| address.is_a?(IPAddr) ? address : IPAddr.new(address.to_s) @@ -57,13 +73,13 @@ module HTTPX return if !early_resolve && connection.addresses && !addresses.intersect?(connection.addresses) log { "resolver: answer #{FAMILY_TYPES[RECORD_TYPES[family]]} #{connection.origin.host}: #{addresses.inspect}" } - if @pool && # if triggered by early resolve, pool may not be here yet + if @current_selector && # if triggered by early resolve, session may not be here yet !connection.io && connection.options.ip_families.size > 1 && family == Socket::AF_INET && addresses.first.to_s != connection.origin.host.to_s log { "resolver: A response, applying resolution delay..." } - @pool.after(0.05) do + @current_selector.after(0.05) do unless connection.state == :closed || # double emission check (connection.addresses && addresses.intersect?(connection.addresses)) @@ -102,10 +118,12 @@ module HTTPX return if addresses.empty? emit_addresses(connection, @family, addresses, true) + + addresses end def emit_resolve_error(connection, hostname = connection.origin.host, ex = nil) - emit(:error, connection, resolve_error(hostname, ex)) + emit_connection_error(connection, resolve_error(hostname, ex)) end def resolve_error(hostname, ex = nil) @@ -116,5 +134,25 @@ module HTTPX error.set_backtrace(ex ? ex.backtrace : caller) error end + + def set_resolver_callbacks + on(:resolve, &method(:resolve_connection)) + on(:error, &method(:emit_connection_error)) + on(:close, &method(:close_resolver)) + end + + def resolve_connection(connection) + @current_session.__send__(:on_resolver_connection, connection, @current_selector) + end + + def emit_connection_error(connection, error) + return connection.emit(:connect_error, error) if connection.connecting? && connection.callbacks_for?(:connect_error) + + connection.emit(:error, error) + end + + def close_resolver(resolver) + @current_session.__send__(:on_resolver_close, resolver, @current_selector) + end end end diff --git a/lib/httpx/resolver/system.rb b/lib/httpx/resolver/system.rb index cad856f6..2c2cb617 100644 --- a/lib/httpx/resolver/system.rb +++ b/lib/httpx/resolver/system.rb @@ -47,8 +47,12 @@ module HTTPX yield self end - def connections - EMPTY + def multi + self + end + + def empty? + true end def close @@ -92,6 +96,11 @@ module HTTPX resolve end + def early_resolve(connection) + self << connection + true + end + def handle_socket_timeout(interval) error = HTTPX::ResolveTimeoutError.new(interval, "timed out while waiting on select") error.set_backtrace(caller) @@ -120,23 +129,26 @@ module HTTPX def consume return if @connections.empty? - while @pipe_read.ready? && (event = @pipe_read.getbyte) + if @pipe_read.wait_readable + event = @pipe_read.getbyte + case event when DONE *pair, addrs = @pipe_mutex.synchronize { @ips.pop } @queries.delete(pair) + _, connection = pair + @connections.delete(connection) family, connection = pair catch(:coalesced) { emit_addresses(connection, family, addrs) } when ERROR *pair, error = @pipe_mutex.synchronize { @ips.pop } @queries.delete(pair) + @connections.delete(connection) - family, connection = pair + _, connection = pair emit_resolve_error(connection, connection.origin.host, error) end - - @connections.delete(connection) if @queries.empty? end return emit(:close, self) if @connections.empty? @@ -210,5 +222,11 @@ module HTTPX def __addrinfo_resolve(host, scheme) Addrinfo.getaddrinfo(host, scheme, Socket::AF_UNSPEC, Socket::SOCK_STREAM) end + + def emit_connection_error(_, error) + throw(:resolve_error, error) + end + + def close_resolver(resolver); end end end diff --git a/lib/httpx/selector.rb b/lib/httpx/selector.rb index fe21e6f3..d20932eb 100644 --- a/lib/httpx/selector.rb +++ b/lib/httpx/selector.rb @@ -2,137 +2,206 @@ require "io/wait" -class HTTPX::Selector - READABLE = %i[rw r].freeze - WRITABLE = %i[rw w].freeze +module HTTPX + class Selector + extend Forwardable - private_constant :READABLE - private_constant :WRITABLE + READABLE = %i[rw r].freeze + WRITABLE = %i[rw w].freeze - def initialize - @selectables = [] - end + private_constant :READABLE + private_constant :WRITABLE - # deregisters +io+ from selectables. - def deregister(io) - @selectables.delete(io) - end + def_delegator :@timers, :after - # register +io+. - def register(io) - return if @selectables.include?(io) + def_delegator :@selectables, :empty? - @selectables << io - end + def initialize + @timers = Timers.new + @selectables = [] + end - private + def each(&blk) + @selectables.each(&blk) + end - def select_many(interval, &block) - selectables, r, w = nil + def next_tick + catch(:jump_tick) do + timeout = next_timeout + if timeout && timeout.negative? + @timers.fire + throw(:jump_tick) + end - # first, we group IOs based on interest type. On call to #interests however, - # things might already happen, and new IOs might be registered, so we might - # have to start all over again. We do this until we group all selectables - begin - loop do begin - r = nil - w = nil + select(timeout, &:call) + @timers.fire + rescue TimeoutError => e + @timers.fire(e) + end + end + rescue StandardError => e + emit_error(e) + rescue Exception # rubocop:disable Lint/RescueException + each_connection(&:force_reset) + raise + end - selectables = @selectables - @selectables = [] + def terminate + # array may change during iteration + selectables = @selectables.reject(&:inflight?) - selectables.delete_if do |io| - interests = io.interests + selectables.each(&:terminate) - (r ||= []) << io if READABLE.include?(interests) - (w ||= []) << io if WRITABLE.include?(interests) + until selectables.empty? + next_tick - io.state == :closed - end + selectables &= @selectables + end + end - if @selectables.empty? - @selectables = selectables + def find_resolver(options) + res = @selectables.find do |c| + c.is_a?(Resolver::Resolver) && options == c.options + end - # do not run event loop if there's nothing to wait on. - # this might happen if connect failed and connection was unregistered. - return if (!r || r.empty?) && (!w || w.empty?) && !selectables.empty? + res.multi if res + end - break - else - @selectables.concat(selectables) - end - rescue StandardError - @selectables = selectables if selectables - raise + def each_connection(&block) + return enum_for(__method__) unless block + + @selectables.each do |c| + if c.is_a?(Resolver::Resolver) + c.each_connection(&block) + else + yield c + end + end + end + + def find_connection(request_uri, options) + each_connection.find do |connection| + connection.match?(request_uri, options) + end + end + + def find_mergeable_connection(connection) + each_connection.find do |ch| + ch != connection && ch.mergeable?(connection) + end + end + + def empty? + @selectables.empty? + end + + # deregisters +io+ from selectables. + def deregister(io) + @selectables.delete(io) + end + + # register +io+. + def register(io) + return if @selectables.include?(io) + + @selectables << io + end + + private + + def select(interval, &block) + # do not cause an infinite loop here. + # + # this may happen if timeout calculation actually triggered an error which causes + # the connections to be reaped (such as the total timeout error) before #select + # gets called. + return if interval.nil? && @selectables.empty? + + return select_one(interval, &block) if @selectables.size == 1 + + select_many(interval, &block) + end + + def select_many(interval, &block) + r, w = nil + + # first, we group IOs based on interest type. On call to #interests however, + # things might already happen, and new IOs might be registered, so we might + # have to start all over again. We do this until we group all selectables + begin + @selectables.delete_if do |io| + interests = io.interests + + (r ||= []) << io if READABLE.include?(interests) + (w ||= []) << io if WRITABLE.include?(interests) + + io.state == :closed + end + + # TODO: what to do if there are no selectables? + + readers, writers = IO.select(r, w, nil, interval) + + if readers.nil? && writers.nil? && interval + [*r, *w].each { |io| io.handle_socket_timeout(interval) } + return end end - # TODO: what to do if there are no selectables? + if writers + readers.each do |io| + yield io - readers, writers = IO.select(r, w, nil, interval) + # so that we don't yield 2 times + writers.delete(io) + end if readers - if readers.nil? && writers.nil? && interval - [*r, *w].each { |io| io.handle_socket_timeout(interval) } + writers.each(&block) + else + readers.each(&block) if readers + end + end + + def select_one(interval) + io = @selectables.first + + return unless io + + interests = io.interests + + result = case interests + when :r then io.to_io.wait_readable(interval) + when :w then io.to_io.wait_writable(interval) + when :rw then io.to_io.wait(interval, :read_write) + when nil then return + end + + unless result || interval.nil? + io.handle_socket_timeout(interval) return end - rescue IOError, SystemCallError - @selectables.reject!(&:closed?) - retry + # raise TimeoutError.new(interval, "timed out while waiting on select") + + yield io + # rescue IOError, SystemCallError + # @selectables.reject!(&:closed?) + # raise unless @selectables.empty? end - if writers - readers.each do |io| - yield io + def next_timeout + [ + @timers.wait_interval, + @selectables.filter_map(&:timeout).min, + ].compact.min + end - # so that we don't yield 2 times - writers.delete(io) - end if readers + def emit_error(e) + @selectables.each do |c| + next if c.is_a?(Resolver::Resolver) - writers.each(&block) - else - readers.each(&block) if readers + c.emit(:error, e) + end end end - - def select_one(interval) - io = @selectables.first - - return unless io - - interests = io.interests - - result = case interests - when :r then io.to_io.wait_readable(interval) - when :w then io.to_io.wait_writable(interval) - when :rw then io.to_io.wait(interval, :read_write) - when nil then return - end - - unless result || interval.nil? - io.handle_socket_timeout(interval) - return - end - # raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") - - yield io - rescue IOError, SystemCallError - @selectables.reject!(&:closed?) - raise unless @selectables.empty? - end - - def select(interval, &block) - # do not cause an infinite loop here. - # - # this may happen if timeout calculation actually triggered an error which causes - # the connections to be reaped (such as the total timeout error) before #select - # gets called. - return if interval.nil? && @selectables.empty? - - return select_one(interval, &block) if @selectables.size == 1 - - select_many(interval, &block) - end - - public :select end diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index 4bb1b08f..7b7ec1e4 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -19,6 +19,7 @@ module HTTPX @options = self.class.default_options.merge(options) @responses = {} @persistent = @options.persistent + @wrapped = false wrap(&blk) if blk end @@ -28,21 +29,49 @@ module HTTPX # http.get("https://wikipedia.com") # end # wikipedia connection closes here def wrap - prev_persistent = @persistent - @persistent = true - pool.wrap do - begin - yield self - ensure - @persistent = prev_persistent - close unless @persistent + prev_wrapped = @wrapped + @wrapped = true + prev_selector = Thread.current[:httpx_selector] + Thread.current[:httpx_selector] = current_selector = Selector.new + begin + yield self + ensure + unless prev_wrapped + if @persistent + deactivate(current_selector) + else + close(current_selector) + end end + @wrapped = prev_wrapped + Thread.current[:httpx_selector] = prev_selector end end - # closes all the active connections from the session - def close(*args) - pool.close(*args) + # closes all the active connections from the session. + # + # when called directly with +selector+ as nil, all available connections + # will be picked up from the connection pool and closed. Connections in use + # by other sessions, or same session in a different thread, will not be reaped. + def close(selector = nil) + if selector.nil? + selector = Selector.new + + while (connection = pool.checkout_by_options(@options)) + connection.current_session = self + connection.current_selector = selector + select_connection(connection, selector) + end + + return close(selector) + end + + begin + @closing = true + selector.terminate + ensure + @closing = false + end end # performs one, or multple requests; it accepts: @@ -89,8 +118,106 @@ module HTTPX request end + def select_connection(connection, selector) + selector.register(connection) + end + + alias_method :select_resolver, :select_connection + + def deselect_connection(connection, selector, cloned = false) + selector.deregister(connection) + + return if cloned + + pool.checkin_connection(connection) + end + + def deselect_resolver(resolver, selector) + selector.deregister(resolver) + + pool.checkin_resolver(resolver) + end + + def try_clone_connection(connection, selector, family) + connection.family ||= family + + return connection if connection.family == family + + new_connection = connection.class.new(connection.origin, connection.options) + + new_connection.family = family + new_connection.current_session = self + new_connection.current_selector = selector + + connection.once(:tcp_open) { new_connection.force_reset(true) } + connection.once(:connect_error) do |err| + if new_connection.connecting? + new_connection.merge(connection) + connection.emit(:cloned, new_connection) + connection.force_reset(true) + else + connection.__send__(:handle_error, err) + end + end + + new_connection.once(:tcp_open) do |new_conn| + if new_conn != connection + new_conn.merge(connection) + connection.force_reset(true) + end + end + new_connection.once(:connect_error) do |err| + if connection.connecting? + # main connection has the requests + connection.merge(new_connection) + new_connection.emit(:cloned, connection) + new_connection.force_reset(true) + else + new_connection.__send__(:handle_error, err) + end + end + + do_init_connection(new_connection, selector) + end + + # returns the HTTPX::Connection through which the +request+ should be sent through. + def find_connection(request_uri, selector, options) + if (connection = selector.find_connection(request_uri, options)) + return connection + end + + connection = pool.checkout_connection(request_uri, options) + + connection.current_session = self + connection.current_selector = selector + + case connection.state + when :idle + do_init_connection(connection, selector) + when :open + select_connection(connection, selector) if options.io + when :closed + connection.idling + select_connection(connection, selector) + when :closing + connection.once(:close) do + connection.idling + select_connection(connection, selector) + end + end + + connection + end + private + def deactivate(selector) + selector.each_connection do |connection| + connection.deactivate + deselect_connection(connection, selector) if connection.state == :inactive + end + end + # returns the HTTPX::Pool object which manages the networking required to # perform requests. def pool @@ -109,26 +236,14 @@ module HTTPX end # returns the corresponding HTTP::Response to the given +request+ if it has been received. - def fetch_response(request, _, _) + def fetch_response(request, _selector, _options) @responses.delete(request) end - # returns the HTTPX::Connection through which the +request+ should be sent through. - def find_connection(request, connections, options) - uri = request.uri - - connection = pool.find_connection(uri, options) || init_connection(uri, options) - unless connections.nil? || connections.include?(connection) - connections << connection - set_connection_callbacks(connection, connections, options) - end - connection - end - # sends the +request+ to the corresponding HTTPX::Connection - def send_request(request, connections, options = request.options) + def send_request(request, selector, options = request.options) error = catch(:resolve_error) do - connection = find_connection(request, connections, options) + connection = find_connection(request.uri, selector, options) connection.send(request) end return unless error.is_a?(Error) @@ -136,61 +251,6 @@ module HTTPX request.emit(:response, ErrorResponse.new(request, error)) end - # sets the callbacks on the +connection+ required to process certain specific - # connection lifecycle events which deal with request rerouting. - def set_connection_callbacks(connection, connections, options, cloned: false) - connection.only(:misdirected) do |misdirected_request| - other_connection = connection.create_idle(ssl: { alpn_protocols: %w[http/1.1] }) - other_connection.merge(connection) - catch(:coalesced) do - pool.init_connection(other_connection, options) - end - set_connection_callbacks(other_connection, connections, options) - connections << other_connection - misdirected_request.transition(:idle) - other_connection.send(misdirected_request) - end - connection.only(:altsvc) do |alt_origin, origin, alt_params| - other_connection = build_altsvc_connection(connection, connections, alt_origin, origin, alt_params, options) - connections << other_connection if other_connection - end - connection.only(:cloned) do |cloned_conn| - set_connection_callbacks(cloned_conn, connections, options, cloned: true) - connections << cloned_conn - end unless cloned - end - - # returns an HTTPX::Connection for the negotiated Alternative Service (or none). - def build_altsvc_connection(existing_connection, connections, alt_origin, origin, alt_params, options) - # do not allow security downgrades on altsvc negotiation - return if existing_connection.origin.scheme == "https" && alt_origin.scheme != "https" - - altsvc = AltSvc.cached_altsvc_set(origin, alt_params.merge("origin" => alt_origin)) - - # altsvc already exists, somehow it wasn't advertised, probably noop - return unless altsvc - - alt_options = options.merge(ssl: options.ssl.merge(hostname: URI(origin).host)) - - connection = pool.find_connection(alt_origin, alt_options) || init_connection(alt_origin, alt_options) - - # advertised altsvc is the same origin being used, ignore - return if connection == existing_connection - - connection.extend(AltSvc::ConnectionMixin) unless connection.is_a?(AltSvc::ConnectionMixin) - - set_connection_callbacks(connection, connections, alt_options) - - log(level: 1) { "#{origin} alt-svc: #{alt_origin}" } - - connection.merge(existing_connection) - existing_connection.terminate - connection - rescue UnsupportedSchemeError - altsvc["noop"] = true - nil - end - # returns a set of HTTPX::Request objects built from the given +args+ and +options+. def build_requests(*args, params) requests = if args.size == 1 @@ -224,12 +284,9 @@ module HTTPX request.on(:promise, &method(:on_promise)) end - def init_connection(uri, options) - connection = options.connection_class.new(uri, options) - catch(:coalesced) do - pool.init_connection(connection, options) - connection - end + def do_init_connection(connection, selector) + resolve_connection(connection, selector) unless connection.family + connection end def deactivate_connection(request, connections, options) @@ -242,64 +299,128 @@ module HTTPX # sends an array of HTTPX::Request +requests+, returns the respective array of HTTPX::Response objects. def send_requests(*requests) - connections = _send_requests(requests) - receive_requests(requests, connections) + selector = Thread.current[:httpx_selector] || Selector.new + _send_requests(requests, selector) + receive_requests(requests, selector) + ensure + unless @wrapped + if @persistent + deactivate(selector) + else + close(selector) + end + end end # sends an array of HTTPX::Request objects - def _send_requests(requests) - connections = [] - + def _send_requests(requests, selector) requests.each do |request| - send_request(request, connections) + send_request(request, selector) end - - connections end # returns the array of HTTPX::Response objects corresponding to the array of HTTPX::Request +requests+. - def receive_requests(requests, connections) + def receive_requests(requests, selector) # @type var responses: Array[response] responses = [] - begin - # guarantee ordered responses - loop do - request = requests.first + # guarantee ordered responses + loop do + request = requests.first - return responses unless request + return responses unless request - catch(:coalesced) { pool.next_tick(connections) } until (response = fetch_response(request, connections, request.options)) - request.emit(:complete, response) + catch(:coalesced) { selector.next_tick } until (response = fetch_response(request, selector, request.options)) + request.emit(:complete, response) + responses << response + requests.shift + + break if requests.empty? + + next unless selector.empty? + + # in some cases, the pool of connections might have been drained because there was some + # handshake error, and the error responses have already been emitted, but there was no + # opportunity to traverse the requests, hence we're returning only a fraction of the errors + # we were supposed to. This effectively fetches the existing responses and return them. + while (request = requests.shift) + response = fetch_response(request, selector, request.options) + request.emit(:complete, response) if response responses << response - requests.shift - - break if requests.empty? - - next unless pool.empty? - - # in some cases, the pool of connections might have been drained because there was some - # handshake error, and the error responses have already been emitted, but there was no - # opportunity to traverse the requests, hence we're returning only a fraction of the errors - # we were supposed to. This effectively fetches the existing responses and return them. - while (request = requests.shift) - response = fetch_response(request, connections, request.options) - request.emit(:complete, response) if response - responses << response - end - break end - responses - ensure - if @persistent - pool.deactivate(*connections) - else - close(connections) + break + end + responses + end + + def resolve_connection(connection, selector) + if connection.addresses || connection.open? + # + # there are two cases in which we want to activate initialization of + # connection immediately: + # + # 1. when the connection already has addresses, i.e. it doesn't need to + # resolve a name (not the same as name being an IP, yet) + # 2. when the connection is initialized with an external already open IO. + # + connection.once(:connect_error, &connection.method(:handle_error)) + on_resolver_connection(connection, selector) + return + end + + resolver = find_resolver_for(connection, selector) + + resolver.early_resolve(connection) || resolver.lazy_resolve(connection) + end + + def on_resolver_connection(connection, selector) + found_connection = selector.find_mergeable_connection(connection) || + pool.checkout_mergeable_connection(connection) + + return select_connection(connection, selector) unless found_connection + + if found_connection.open? + coalesce_connections(found_connection, connection, selector) + else + found_connection.once(:open) do + coalesce_connections(found_connection, connection, selector) end end end + def on_resolver_close(resolver, selector) + return if resolver.closed? + + deselect_resolver(resolver, selector) + resolver.close unless resolver.closed? + end + + def find_resolver_for(connection, selector) + resolver = selector.find_resolver(connection.options) + + unless resolver + resolver = pool.checkout_resolver(connection.options) + resolver.current_session = self + resolver.current_selector = selector + end + + resolver + end + + def coalesce_connections(conn1, conn2, selector) + unless conn1.coalescable?(conn2) + select_connection(conn2, selector) + return false + end + + conn2.emit(:tcp_open, conn1) + conn1.merge(conn2) + conn2.coalesced_connection = conn1 + deselect_connection(conn2, selector) + true + end + @default_options = Options.new @default_options.freeze @plugins = [] diff --git a/sig/connection.rbs b/sig/connection.rbs index ae5ac06e..36fb48bf 100644 --- a/sig/connection.rbs +++ b/sig/connection.rbs @@ -26,8 +26,9 @@ module HTTPX attr_reader pending: Array[Request] attr_reader options: Options attr_reader ssl_session: OpenSSL::SSL::Session? - attr_writer timers: Timers - + attr_writer current_selector: Selector? + attr_writer coalesced_connection: instance? + attr_accessor current_session: Session? attr_accessor family: Integer? @window_size: Integer @@ -42,6 +43,8 @@ module HTTPX @connected_at: Float @response_received_at: Float @intervals: Array[Timers::Interval] + @exhausted: bool + @cloned: bool def addresses: () -> Array[ipaddr]? @@ -57,7 +60,7 @@ module HTTPX def coalescable?: (Connection connection) -> bool - def create_idle: (?Hash[Symbol, untyped] options) -> Connection + def create_idle: (?Hash[Symbol, untyped] options) -> instance def merge: (Connection connection) -> void @@ -77,6 +80,8 @@ module HTTPX def close: () -> void + def force_reset: (?bool cloned) -> void + def reset: () -> void def timeout: () -> Numeric? @@ -99,6 +104,8 @@ module HTTPX def connect: () -> void + def disconnect: () -> void + def exhausted?: () -> boolish def consume: () -> void @@ -117,6 +124,8 @@ module HTTPX def handle_transition: (Symbol nextstate) -> void + def build_altsvc_connection: (URI::Generic alt_origin, String origin, Hash[String, String] alt_params) -> void + def build_socket: (?Array[ipaddr]? addrs) -> (TCP | SSL | UNIX) def on_error: (HTTPX::TimeoutError | Error | StandardError error, ?Request? request) -> void diff --git a/sig/resolver/multi.rbs b/sig/resolver/multi.rbs index 3873cd75..45f1c484 100644 --- a/sig/resolver/multi.rbs +++ b/sig/resolver/multi.rbs @@ -1,7 +1,32 @@ module HTTPX module Resolver class Multi + attr_reader resolvers: Array[Native | HTTPS] + + attr_reader options: Options + + @current_selector: Selector? + @current_session: Session? + @resolver_options: Hash[Symbol, untyped] + # @errors: Hash[Symbol, untyped] + + def current_selector=: (Selector s) -> void + + def current_session=: (Session s) -> void + + def closed?: () -> bool + + def empty?: () -> bool + + def timeout: () -> Numeric? + + def close: () -> void + + def connections: () -> Array[Connection] + def early_resolve: (Connection connection, ?hostname: String) -> void + + def lazy_resolve: (Connection connection) -> void end end end \ No newline at end of file diff --git a/sig/resolver/native.rbs b/sig/resolver/native.rbs index d8013496..3467413a 100644 --- a/sig/resolver/native.rbs +++ b/sig/resolver/native.rbs @@ -7,8 +7,6 @@ module HTTPX DEFAULTS: Hash[Symbol, untyped] DNS_PORT: Integer - attr_reader family: ip_family - @options: Options @ns_index: Integer @nameserver: Array[String] diff --git a/sig/resolver/resolver.rbs b/sig/resolver/resolver.rbs index 2912e26b..60c9595e 100644 --- a/sig/resolver/resolver.rbs +++ b/sig/resolver/resolver.rbs @@ -6,8 +6,17 @@ module HTTPX RECORD_TYPES: Hash[Integer, singleton(Resolv::DNS::Resource)] + attr_reader family: ip_family + + attr_reader options: Options + + attr_writer current_selector: Selector? + + attr_writer current_session: Session? + + attr_accessor multi: Multi? + @record_type: singleton(Resolv::DNS::Resource) - @options: Options @resolver_options: Hash[Symbol, untyped] @queries: Hash[String, Connection] @system_resolver: Resolv::Hosts @@ -20,6 +29,8 @@ module HTTPX def empty?: () -> bool + def each_connection: () { (Connection connection) -> void } -> void + def emit_addresses: (Connection connection, ip_family family, Array[IPAddr], ?bool early_resolve) -> void private @@ -28,7 +39,15 @@ module HTTPX def initialize: (ip_family? family, Options options) -> void - def early_resolve: (Connection connection, ?hostname: String) -> boolish + def early_resolve: (Connection connection, ?hostname: String) -> Array[IPAddr]? + + def set_resolver_callbacks: () -> void + + def resolve_connection: (Connection connection) -> void + + def emit_connection_error: (Connection connection, StandardError error) -> void + + def close_resolver: (Resolver resolver) -> void def emit_resolve_error: (Connection connection, ?String hostname, ?StandardError) -> void diff --git a/sig/selector.rbs b/sig/selector.rbs index a9cc33f4..00009098 100644 --- a/sig/selector.rbs +++ b/sig/selector.rbs @@ -1,22 +1,48 @@ module HTTPX + type selectable = Connection | Resolver::Native + class Selector - type selectable = Connection | Resolver::Native | Resolver::System + include _Each[selectable] READABLE: Array[Symbol] WRITABLE: Array[Symbol] + + @timers: Timers + @selectables: Array[selectable] - def register: (selectable io) -> void - def deregister: (selectable io) -> selectable? + def next_tick: () -> void - def select: (Numeric? interval) { (selectable) -> void } -> void + def terminate: () -> void + + def find_resolver: (Options options) -> Resolver::Resolver? + + def find_connection: (http_uri request_uri, Options options) -> Connection? + + def each_connection: () { (Connection) -> void} -> void + | () -> Enumerable[Connection] + + def find_mergeable_connection: (Connection connection) -> Connection? + + def empty?: () -> bool + + def register: (selectable io) -> void + + def deregister: (selectable io) -> selectable? private - def initialize: () -> untyped + def initialize: () -> void + + def select: (Numeric? interval) { (selectable) -> void } -> void def select_many: (Numeric? interval) { (selectable) -> void } -> void + def select_one: (Numeric? interval) { (selectable) -> void } -> void + + def next_timeout: () -> Numeric? + + def emit_error: (StandardError e) -> void end type io_interests = :r | :w | :rw diff --git a/sig/session.rbs b/sig/session.rbs index 316eb49d..ca85087d 100644 --- a/sig/session.rbs +++ b/sig/session.rbs @@ -7,20 +7,35 @@ module HTTPX @options: Options @responses: Hash[Request, response] - @persistent: bool? + @persistent: bool + @wrapped: bool def self.plugin: (Symbol | Module plugin, ?options? options) ?{ (Class) -> void } -> singleton(Session) def wrap: () { (instance) -> void } -> void - def close: (*untyped) -> void + def close: (?Selector selector) -> void def build_request: (verb verb, generic_uri uri, ?request_params params, ?Options options) -> Request + def select_connection: (Connection connection, Selector selector) -> void + + def deselect_connection: (Connection connection, Selector selector, ?bool cloned) -> void + + def select_resolver: (Resolver::Native | Resolver::HTTPS resolver, Selector selector) -> void + + def deselect_resolver: (Resolver::Resolver resolver, Selector selector) -> void + + def try_clone_connection: (Connection connection, Selector selector, Integer? family) -> Connection + + def find_connection: (http_uri request_uri, Selector selector, Options options) -> Connection + + private + def initialize: (?options) { (self) -> void } -> void | (?options) -> void - private + def deactivate: (Selector selector) -> void def pool: -> Pool @@ -28,33 +43,35 @@ module HTTPX def on_promise: (untyped, untyped) -> void - def fetch_response: (Request request, Array[Connection] connections, untyped options) -> response? + def fetch_response: (Request request, Selector selector, Options options) -> response? - def find_connection: (Request request, Array[Connection] connections, Options options) -> Connection - - def deactivate_connection: (Request request, Array[Connection] connections, Options options) -> void - - def send_request: (Request request, Array[Connection] connections, ?Options options) -> void - - def set_connection_callbacks: (Connection connection, Array[Connection] connections, Options options, ?cloned: bool) -> void + def send_request: (Request request, Selector selector, ?Options options) -> void def set_request_callbacks: (Request request) -> void - def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options) -> (Connection & AltSvc::ConnectionMixin)? - def build_requests: (verb, uri, request_params) -> Array[Request] | (Array[[verb, uri, request_params]], Hash[Symbol, untyped]) -> Array[Request] | (Array[[verb, uri]], request_params) -> Array[Request] | (verb, _Each[[uri, request_params]], Hash[Symbol, untyped]) -> Array[Request] | (verb, _Each[uri], request_params) -> Array[Request] - def init_connection: (http_uri uri, Options options) -> Connection + def do_init_connection: (Connection connection, Selector selector) -> Connection def send_requests: (*Request) -> Array[response] - def _send_requests: (Array[Request] requests) -> Array[Connection] + def _send_requests: (Array[Request] requests, Selector selector) -> void - def receive_requests: (Array[Request] requests, Array[Connection] connections) -> Array[response] + def receive_requests: (Array[Request] requests, Selector selector) -> Array[response] + + def resolve_connection: (Connection connection, Selector selector) -> void + + def on_resolve_connection: (Connection connection, Selector selector) -> void + + def on_resolver_close: (Resolver::Resolver resolver, Selector selector) -> void + + def find_resolver_for: (Connection connection, Selector selector) -> (Resolver::Multi | Resolver::Resolver) + + def coalesce_connections: (Connection conn1, Connection conn2, Selector selector) -> bool attr_reader self.default_options: Options end diff --git a/test/support/minitest_extensions.rb b/test/support/minitest_extensions.rb index 75339b36..7241017a 100644 --- a/test/support/minitest_extensions.rb +++ b/test/support/minitest_extensions.rb @@ -13,7 +13,7 @@ module MinitestExtensions module FirstFailedTestInThread def self.prepended(*) super - HTTPX::Session.include SessionExtensions + HTTPX::Connection.include ConnectionExtensions end def setup @@ -21,11 +21,10 @@ module MinitestExtensions extend(OnTheFly) end - module SessionExtensions - def find_connection(request, connections, _) - connection = super + module ConnectionExtensions + def send(request) request.instance_variable_set(:@connection, connection) - connection + super end end diff --git a/test/support/requests/plugins/persistent.rb b/test/support/requests/plugins/persistent.rb index 331f24ab..b74e7dc7 100644 --- a/test/support/requests/plugins/persistent.rb +++ b/test/support/requests/plugins/persistent.rb @@ -36,41 +36,6 @@ module Requests assert options.persistent end - def test_persistent_with_wrap - return unless origin.start_with?("https") - - uri = build_uri("/get") - session1 = HTTPX.plugin(:persistent) - - begin - pool = session1.send(:pool) - - initial_size = pool.instance_variable_get(:@connections).size - response = session1.get(uri) - verify_status(response, 200) - - connections = pool.instance_variable_get(:@connections) - pool_size = connections.size - assert pool_size == initial_size + 1 - - HTTPX.wrap do |s| - response = s.get(uri) - verify_status(response, 200) - wrapped_connections = pool.instance_variable_get(:@connections) - pool_size = wrapped_connections.size - assert pool_size == 1 - assert (connections - wrapped_connections) == connections - end - - final_connections = pool.instance_variable_get(:@connections) - pool_size = final_connections.size - assert pool_size == initial_size + 1 - assert (connections - final_connections).empty? - ensure - session1.close - end - end - def test_persistent_retry_http2_goaway return unless origin.start_with?("https") diff --git a/test/support/websocket_test_plugin.rb b/test/support/websocket_test_plugin.rb index 09ff4c1a..b418ea9b 100644 --- a/test/support/websocket_test_plugin.rb +++ b/test/support/websocket_test_plugin.rb @@ -130,17 +130,11 @@ module WSTestPlugin end end - module InstanceMethods - def find_connection(request, *) - return super if request.websocket + module ConnectionMethods + def send(request) + request.init_websocket(self) unless request.websocket || @upgrade_protocol - conn = super - - return conn unless conn && !conn.upgrade_protocol - - request.init_websocket(conn) - - conn + super end end