From cb07abc94ea57212d316dae352a82748659c9d82 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Fri, 12 Feb 2021 01:07:48 +0000 Subject: [PATCH 1/9] unused condition in pool --- lib/httpx/pool.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/httpx/pool.rb b/lib/httpx/pool.rb index 95fc23de..e615c02e 100644 --- a/lib/httpx/pool.rb +++ b/lib/httpx/pool.rb @@ -135,7 +135,6 @@ module HTTPX connection.on(:close) do unregister_connection(connection) end - return if connection.state == :open end def unregister_connection(connection) From 981824bf47864f97b079699dc2a632847e11c409 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Fri, 12 Feb 2021 01:11:27 +0000 Subject: [PATCH 2/9] fixing how to load plugins on boot, such as the http_proxy when the system proxy is set --- lib/httpx.rb | 3 ++- lib/httpx/session.rb | 8 +++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/httpx.rb b/lib/httpx.rb index 40879b0e..15d86e8c 100644 --- a/lib/httpx.rb +++ b/lib/httpx.rb @@ -19,7 +19,6 @@ require "httpx/headers" require "httpx/request" require "httpx/response" require "httpx/chainable" -require "httpx/session" # Top-Level Namespace # @@ -59,3 +58,5 @@ module HTTPX extend Chainable end + +require "httpx/session" diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index 66d88a7b..03b32953 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -269,7 +269,13 @@ module HTTPX end # :nocov: end + end + + unless ENV.grep(/https?_proxy$/i).empty? + proxy_session = plugin(:proxy) + ::HTTPX.send(:remove_const, :Session) + ::HTTPX.send(:const_set, :Session, proxy_session.class) + end - plugin(:proxy) unless ENV.grep(/https?_proxy$/i).empty? end end From bfe77fe92cefca01566c6538b952679dfe608c7a Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Fri, 12 Feb 2021 01:13:23 +0000 Subject: [PATCH 3/9] fixing initializationn of connect timeout, which needs the failed timeout value --- lib/httpx/io/tcp.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/httpx/io/tcp.rb b/lib/httpx/io/tcp.rb index ba2dfaf1..2d915ba9 100644 --- a/lib/httpx/io/tcp.rb +++ b/lib/httpx/io/tcp.rb @@ -69,7 +69,7 @@ module HTTPX @ip_index -= 1 retry rescue Errno::ETIMEDOUT => e - raise ConnectTimeoutError, e.message if @ip_index <= 0 + raise ConnectTimeoutError.new(@options.timeout.connect_timeout, e.message) if @ip_index <= 0 @ip_index -= 1 retry From ad88aa27cfe5c3953b88d736f6dc96ccdeecd743 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Fri, 12 Feb 2021 10:31:03 +0000 Subject: [PATCH 4/9] added more level 3 logging using a custom telemetry plugin, which is internal use only this will therefore not be documented. --- .rubocop.yml | 4 ++ lib/httpx/connection.rb | 2 + lib/httpx/plugins/internal_telemetry.rb | 93 +++++++++++++++++++++++++ lib/httpx/session.rb | 4 ++ 4 files changed, 103 insertions(+) create mode 100644 lib/httpx/plugins/internal_telemetry.rb diff --git a/.rubocop.yml b/.rubocop.yml index 161bf933..ca0779d0 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -78,6 +78,10 @@ Style/TrailingUnderscoreVariable: Style/AccessModifierDeclarations: Enabled: false +Style/GlobalVars: + Exclude: + - lib/httpx/plugins/internal_telemetry.rb + Performance/TimesMap: Enabled: false diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 2a55e444..68a5db8d 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -268,6 +268,7 @@ module HTTPX # dread loop do siz = @io.read(@window_size, @read_buffer) + log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." } unless siz ex = EOFError.new("descriptor closed") ex.set_backtrace(caller) @@ -296,6 +297,7 @@ module HTTPX end siz = @io.write(@write_buffer) + log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." } unless siz ex = EOFError.new("descriptor closed") ex.set_backtrace(caller) diff --git a/lib/httpx/plugins/internal_telemetry.rb b/lib/httpx/plugins/internal_telemetry.rb new file mode 100644 index 00000000..bf5b64d6 --- /dev/null +++ b/lib/httpx/plugins/internal_telemetry.rb @@ -0,0 +1,93 @@ +# frozen_string_literal: true + +module HTTPX + module Plugins + # + # The InternalTelemetry plugin is for internal use only. It is therefore undocumented, and + # its use is disencouraged, as API compatiblity will **not be guaranteed**. + # + # The gist of it is: when debug_level of logger is enabled to 3 or greater, considered internal-only + # supported log levels, it'll be loaded by default. + # + # Against a specific point of time, which will be by default the session initialization, but can be set + # by the end user in $http_init_time, different diff metrics can be shown. The "point of time" is calculated + # using the monotonic clock. + module InternalTelemetry + module TrackTimeMethods + private + + def elapsed_time + yield + ensure + meter_elapsed_time("#{self.class.superclass}##{caller_locations(1, 1)[0].label}") + end + + def meter_elapsed_time(label) + $http_init_time ||= Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) + prev_time = $http_init_time + after_time = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) + # $http_init_time = after_time + elapsed = after_time - prev_time + warn(+"\e[31m" << "[ELAPSED TIME]: #{label}: #{elapsed} (ms)" << "\e[0m") + end + end + + module InstanceMethods + def self.included(klass) + klass.prepend TrackTimeMethods + super + end + + def initialize(*) + meter_elapsed_time("Session: initializing...") + super + meter_elapsed_time("Session: initialized!!!") + end + + private + + def build_requests(*) + elapsed_time { super } + end + + def fetch_response(*) + response = super + meter_elapsed_time("Session -> response") if response + response + end + + def close(*) + super + meter_elapsed_time("Session -> close") + end + end + + module RequestMethods + def self.included(klass) + klass.prepend TrackTimeMethods + super + end + + def transition(nextstate) + state = @state + super + meter_elapsed_time("Request[#{@verb} #{@uri}: #{state}] -> #{nextstate}") if nextstate == @state + end + end + + module ConnectionMethods + def self.included(klass) + klass.prepend TrackTimeMethods + super + end + + def transition(nextstate) + state = @state + super + meter_elapsed_time("Connection[#{@origin}]: #{state} -> #{nextstate}") if nextstate == @state + end + end + end + register_plugin :internal_telemetry, InternalTelemetry + end +end diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index 03b32953..46530b22 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -277,5 +277,9 @@ module HTTPX ::HTTPX.send(:const_set, :Session, proxy_session.class) end + if Session.default_options.debug_level > 2 + proxy_session = plugin(:internal_telemetry) + ::HTTPX.send(:remove_const, :Session) + ::HTTPX.send(:const_set, :Session, proxy_session.class) end end From 4bab923f2564304740d104f50be18749ad745d8a Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Fri, 12 Feb 2021 10:33:16 +0000 Subject: [PATCH 5/9] experimental optimizations in the connection call loop. skip entering the read/write hot loops when the connection interests specifically say so. this led to a overhaul of how interests are calculated. --- lib/httpx/connection.rb | 58 ++++++++++++++++++++++++------- lib/httpx/connection/http2.rb | 12 ++++--- lib/httpx/plugins/proxy/socks4.rb | 8 +++++ lib/httpx/plugins/proxy/socks5.rb | 8 +++++ sig/connection/http2.rbs | 2 ++ 5 files changed, 71 insertions(+), 17 deletions(-) diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 68a5db8d..2279b57e 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -170,7 +170,7 @@ module HTTPX end # if the write buffer is full, we drain it - return :w if @write_buffer.full? + return :w unless @write_buffer.empty? return @parser.interests if @parser @@ -254,8 +254,14 @@ module HTTPX loop do parser.consume - # we exit if there's no more data to process - if @pending.size.zero? && @inflight.zero? + # we exit if there's no more requests to process + # + # this condition takes into account: + # + # * the number of inflight requests + # * the number of pending requests + # * whether the write buffer has bytes (i.e. for close handshake) + if @pending.size.zero? && @inflight.zero? && @write_buffer.empty? log(level: 3) { "NO MORE REQUESTS..." } return end @@ -265,7 +271,14 @@ module HTTPX read_drained = false write_drained = nil - # dread + # + # tight read loop. + # + # read as much of the socket as possible. + # + # this tight loop reads all the data it can from the socket and pipes it to + # its parser. + # loop do siz = @io.read(@window_size, @read_buffer) log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." } @@ -276,6 +289,7 @@ module HTTPX return end + # socket has been drained. mark and exit the read loop. if siz.zero? read_drained = @read_buffer.empty? break @@ -283,16 +297,27 @@ module HTTPX parser << @read_buffer.to_s + # continue reading if possible. + break if interests == :w + + # exit the read loop if connection is preparing to be closed break if @state == :closing || @state == :closed - # for HTTP/2, we just want to write goaway frame - end unless @state == :closing + # exit #consume altogether if all outstanding requests have been dealt with + return if @pending.size.zero? && @inflight.zero? + end unless (interests == :w || @state == :closing) && !epiped - # dwrite + # + # tight write loop. + # + # flush as many bytes as the sockets allow. + # loop do + # buffer has been drainned, mark and exit the write loop. if @write_buffer.empty? # we only mark as drained on the first loop write_drained = write_drained.nil? && @inflight.positive? + break end @@ -305,21 +330,28 @@ module HTTPX return end + # socket closed for writing. mark and exit the write loop. if siz.zero? write_drained = !@write_buffer.empty? break end - break if @state == :closing || @state == :closed + # exit write loop if marked to consume from peer, or is closing. + break if interests == :r || @state == :closing || @state == :closed write_drained = false - end + end unless interests == :r # return if socket is drained - if read_drained && write_drained - log(level: 3) { "WAITING FOR EVENTS..." } - return - end + next unless (interests != :r || read_drained) && + (interests != :w || write_drained) + + # gotta go back to the event loop. It happens when: + # + # * the socket is drained of bytes or it's not the interest of the conn to read; + # * theres nothing more to write, or it's not in the interest of the conn to write; + log(level: 3) { "(#{interests}): WAITING FOR EVENTS..." } + return end end end diff --git a/lib/httpx/connection/http2.rb b/lib/httpx/connection/http2.rb index 72e4c377..3d3dcff2 100644 --- a/lib/httpx/connection/http2.rb +++ b/lib/httpx/connection/http2.rb @@ -42,7 +42,7 @@ module HTTPX return @buffer.empty? ? :r : :rw end - return :w unless @pending.empty? + return :w if !@pending.empty? && can_buffer_more_requests? return :w if @streams.each_key.any? { |r| r.interests == :w } @@ -70,10 +70,14 @@ module HTTPX @connection << data end + def can_buffer_more_requests? + @handshake_completed && + @streams.size < @max_concurrent_requests && + @streams.size < @max_requests + end + def send(request) - if !@handshake_completed || - @streams.size >= @max_concurrent_requests || - @streams.size >= @max_requests + unless can_buffer_more_requests? @pending << request return end diff --git a/lib/httpx/plugins/proxy/socks4.rb b/lib/httpx/plugins/proxy/socks4.rb index 06596613..379d6211 100644 --- a/lib/httpx/plugins/proxy/socks4.rb +++ b/lib/httpx/plugins/proxy/socks4.rb @@ -16,6 +16,14 @@ module HTTPX Error = Socks4Error module ConnectionMethods + def interests + if @state == :connecting + return @write_buffer.empty? ? :r : :w + end + + super + end + private def transition(nextstate) diff --git a/lib/httpx/plugins/proxy/socks5.rb b/lib/httpx/plugins/proxy/socks5.rb index 6cae8102..0f3ec833 100644 --- a/lib/httpx/plugins/proxy/socks5.rb +++ b/lib/httpx/plugins/proxy/socks5.rb @@ -35,6 +35,14 @@ module HTTPX super || @state == :authenticating || @state == :negotiating end + def interests + if @state == :connecting || @state == :authenticating || @state == :negotiating + return @write_buffer.empty? ? :r : :w + end + + super + end + private def transition(nextstate) diff --git a/sig/connection/http2.rbs b/sig/connection/http2.rbs index 50a794d9..18c71038 100644 --- a/sig/connection/http2.rbs +++ b/sig/connection/http2.rbs @@ -23,6 +23,8 @@ module HTTPX def <<: (String) -> void + def can_buffer_more_requests: () -> bool + def send: (Request) -> void def consume: () -> void From 7082f63e4e5ccda05657e26c57ef6b65473d98d2 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Sun, 14 Feb 2021 16:45:51 +0000 Subject: [PATCH 6/9] more explicit error message in multipart test --- lib/httpx/response.rb | 10 ++++++++-- test/support/requests/plugins/multipart.rb | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/lib/httpx/response.rb b/lib/httpx/response.rb index 12042830..7ee2c9a4 100644 --- a/lib/httpx/response.rb +++ b/lib/httpx/response.rb @@ -268,8 +268,14 @@ module HTTPX @error.message end - def to_s - @error.backtrace.join("\n") + if Exception.method_defined?(:full_message) + def to_s + @error.full_message + end + else + def to_s + "#{@error.message} (#{@error.class})\n#{@error.backtrace.join("\n")}" + end end def raise_for_status diff --git a/test/support/requests/plugins/multipart.rb b/test/support/requests/plugins/multipart.rb index e1845162..c9684c82 100644 --- a/test/support/requests/plugins/multipart.rb +++ b/test/support/requests/plugins/multipart.rb @@ -174,7 +174,7 @@ module Requests .with_timeout(total_timeout: 2) .plugin(:multipart) retries_response = retries_session.post(uri, retry_change_requests: true, form: { image: File.new(fixture_file_path) }) - assert check_error[retries_response] + assert check_error[retries_response], "expected #{retries_response} to be an error response" assert retries_session.calls == 1, "expect request to be retried 1 time (was #{retries_session.calls})" end From 19dc5280305e24e9daf3fe67e4cbee57921a39ca Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Mon, 15 Feb 2021 10:20:20 +0000 Subject: [PATCH 7/9] fixed broken pipe issue on quick error response from server in some cases where the client is sending a request with a lot of bytes (i.e. file uploads), and the server can't consume it (because authorization, or wrong endpoint), the server stops processing the request altogether and sends an error response immediately, in which case the client should pivot and read the error response. Not doing this was causing the Errno::EPIPE error. The mitigation is therefore to rescue the error, and mark the consumption loop to read the response immediately. --- lib/httpx/connection.rb | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 2279b57e..2b40cfef 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -251,6 +251,7 @@ module HTTPX def consume catch(:called) do + epiped = false loop do parser.consume @@ -321,7 +322,19 @@ module HTTPX break end - siz = @io.write(@write_buffer) + begin + siz = @io.write(@write_buffer) + rescue Errno::EPIPE + # this can happen if we still have bytes in the buffer to send to the server, but + # the server wants to respond immediately with some message, or an error. An example is + # when one's uploading a big file to an unintended endpoint, and the server stops the + # consumption, and responds immediately with an authorization of even method not allowed error. + # at this point, we have to let the connection switch to read-mode. + log(level: 2) { "pipe broken, could not flush buffer..." } + epiped = true + read_drained = false + break + end log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." } unless siz ex = EOFError.new("descriptor closed") From d3cc6d2d962de13a021c6a93c971fe48d6e039b1 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Tue, 16 Feb 2021 14:39:38 +0000 Subject: [PATCH 8/9] making sure that every response is returned although a connection might correctly emit an error response, the returned responses are still defined by the fetch_response loop in the session. When the pool is actually empty, this had the side-effect of leaving error responses behind and exiting with just the first one. This fixes it popping all available responses in such cases. --- lib/httpx/session.rb | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index 46530b22..90c1ec21 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -199,7 +199,18 @@ module HTTPX responses << response requests.shift - break if requests.empty? || pool.empty? + break if requests.empty? + + next unless pool.empty? + + # in some cases, the pool of connections might have been drained because there was some + # handshake error, and the error responses have already been emitted, but there was no + # opportunity to traverse the requests, hence we're returning only a fraction of the errors + # we were supposed to. This effectively fetches the existing responses and return them. + while (request = requests.shift) + responses << fetch_response(request, connections, request_options) + end + break end responses ensure From d4fe89094e4bf2a904a579b4412a67a65fde6a7a Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Tue, 16 Feb 2021 14:42:42 +0000 Subject: [PATCH 9/9] fixing TCP connection inprogress on SSL connection bug The Errno::INPROGRESS error signals that the TCP handshake has been signaled to the peer already, by which locally we just have wait for it to be writable. For simple plaintext requests, this was working correctly, because the interest was always writable no matter what. However, when wrapped in the SSL conn, and with the OS tcp stack under more stress, the interest could be switched to readable, and by reuse, never reset; if, by subsequent reconnection, EINPROGRESS would be emitted, the socket would wait for readable instead, resulting in a loop and subsequent connectionnn timeout. --- lib/httpx/io/ssl.rb | 5 +---- lib/httpx/io/tcp.rb | 15 ++++++++------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/lib/httpx/io/ssl.rb b/lib/httpx/io/ssl.rb index 8b64f287..fa898496 100644 --- a/lib/httpx/io/ssl.rb +++ b/lib/httpx/io/ssl.rb @@ -21,10 +21,6 @@ module HTTPX @state = :negotiated if @keep_open end - def interests - @interests || super - end - def protocol @io.alpn_protocol || super rescue StandardError @@ -66,6 +62,7 @@ module HTTPX @io.connect_nonblock @io.post_connection_check(@sni_hostname) if @ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE transition(:negotiated) + @interests = :w rescue ::IO::WaitReadable @interests = :r rescue ::IO::WaitWritable diff --git a/lib/httpx/io/tcp.rb b/lib/httpx/io/tcp.rb index 2d915ba9..a43495dd 100644 --- a/lib/httpx/io/tcp.rb +++ b/lib/httpx/io/tcp.rb @@ -7,7 +7,7 @@ module HTTPX class TCP include Loggable - attr_reader :ip, :port, :addresses, :state + attr_reader :ip, :port, :addresses, :state, :interests alias_method :host, :ip @@ -18,6 +18,7 @@ module HTTPX @options = Options.new(options) @fallback_protocol = @options.fallback_protocol @port = origin.port + @interests = :w if @options.io @io = case @options.io when Hash @@ -39,10 +40,6 @@ module HTTPX @io ||= build_socket end - def interests - :w - end - def to_io @io.to_io end @@ -62,6 +59,8 @@ module HTTPX @io.connect_nonblock(Socket.sockaddr_in(@port, @ip.to_s)) rescue Errno::EISCONN end + @interests = :w + transition(:connected) rescue Errno::EHOSTUNREACH => e raise e if @ip_index <= 0 @@ -74,8 +73,10 @@ module HTTPX @ip_index -= 1 retry rescue Errno::EINPROGRESS, - Errno::EALREADY, - ::IO::WaitReadable + Errno::EALREADY + @interests = :w + rescue ::IO::WaitReadable + @interests = :r end if RUBY_VERSION < "2.3"