Merge branch 'issue-90' into 'master'

syscall improvements

Closes #90

See merge request honeyryderchuck/httpx!120
This commit is contained in:
HoneyryderChuck 2021-02-16 20:26:32 +00:00
commit 3b2d822af9
14 changed files with 228 additions and 37 deletions

View File

@ -78,6 +78,10 @@ Style/TrailingUnderscoreVariable:
Style/AccessModifierDeclarations:
Enabled: false
Style/GlobalVars:
Exclude:
- lib/httpx/plugins/internal_telemetry.rb
Performance/TimesMap:
Enabled: false

View File

@ -19,7 +19,6 @@ require "httpx/headers"
require "httpx/request"
require "httpx/response"
require "httpx/chainable"
require "httpx/session"
# Top-Level Namespace
#
@ -59,3 +58,5 @@ module HTTPX
extend Chainable
end
require "httpx/session"

View File

@ -170,7 +170,7 @@ module HTTPX
end
# if the write buffer is full, we drain it
return :w if @write_buffer.full?
return :w unless @write_buffer.empty?
return @parser.interests if @parser
@ -251,11 +251,18 @@ module HTTPX
def consume
catch(:called) do
epiped = false
loop do
parser.consume
# we exit if there's no more data to process
if @pending.size.zero? && @inflight.zero?
# we exit if there's no more requests to process
#
# this condition takes into account:
#
# * the number of inflight requests
# * the number of pending requests
# * whether the write buffer has bytes (i.e. for close handshake)
if @pending.size.zero? && @inflight.zero? && @write_buffer.empty?
log(level: 3) { "NO MORE REQUESTS..." }
return
end
@ -265,9 +272,17 @@ module HTTPX
read_drained = false
write_drained = nil
# dread
#
# tight read loop.
#
# read as much of the socket as possible.
#
# this tight loop reads all the data it can from the socket and pipes it to
# its parser.
#
loop do
siz = @io.read(@window_size, @read_buffer)
log(level: 3, color: :cyan) { "IO READ: #{siz} bytes..." }
unless siz
ex = EOFError.new("descriptor closed")
ex.set_backtrace(caller)
@ -275,6 +290,7 @@ module HTTPX
return
end
# socket has been drained. mark and exit the read loop.
if siz.zero?
read_drained = @read_buffer.empty?
break
@ -282,20 +298,44 @@ module HTTPX
parser << @read_buffer.to_s
# continue reading if possible.
break if interests == :w
# exit the read loop if connection is preparing to be closed
break if @state == :closing || @state == :closed
# for HTTP/2, we just want to write goaway frame
end unless @state == :closing
# exit #consume altogether if all outstanding requests have been dealt with
return if @pending.size.zero? && @inflight.zero?
end unless (interests == :w || @state == :closing) && !epiped
# dwrite
#
# tight write loop.
#
# flush as many bytes as the sockets allow.
#
loop do
# buffer has been drainned, mark and exit the write loop.
if @write_buffer.empty?
# we only mark as drained on the first loop
write_drained = write_drained.nil? && @inflight.positive?
break
end
siz = @io.write(@write_buffer)
begin
siz = @io.write(@write_buffer)
rescue Errno::EPIPE
# this can happen if we still have bytes in the buffer to send to the server, but
# the server wants to respond immediately with some message, or an error. An example is
# when one's uploading a big file to an unintended endpoint, and the server stops the
# consumption, and responds immediately with an authorization of even method not allowed error.
# at this point, we have to let the connection switch to read-mode.
log(level: 2) { "pipe broken, could not flush buffer..." }
epiped = true
read_drained = false
break
end
log(level: 3, color: :cyan) { "IO WRITE: #{siz} bytes..." }
unless siz
ex = EOFError.new("descriptor closed")
ex.set_backtrace(caller)
@ -303,21 +343,28 @@ module HTTPX
return
end
# socket closed for writing. mark and exit the write loop.
if siz.zero?
write_drained = !@write_buffer.empty?
break
end
break if @state == :closing || @state == :closed
# exit write loop if marked to consume from peer, or is closing.
break if interests == :r || @state == :closing || @state == :closed
write_drained = false
end
end unless interests == :r
# return if socket is drained
if read_drained && write_drained
log(level: 3) { "WAITING FOR EVENTS..." }
return
end
next unless (interests != :r || read_drained) &&
(interests != :w || write_drained)
# gotta go back to the event loop. It happens when:
#
# * the socket is drained of bytes or it's not the interest of the conn to read;
# * theres nothing more to write, or it's not in the interest of the conn to write;
log(level: 3) { "(#{interests}): WAITING FOR EVENTS..." }
return
end
end
end

View File

@ -42,7 +42,7 @@ module HTTPX
return @buffer.empty? ? :r : :rw
end
return :w unless @pending.empty?
return :w if !@pending.empty? && can_buffer_more_requests?
return :w if @streams.each_key.any? { |r| r.interests == :w }
@ -70,10 +70,14 @@ module HTTPX
@connection << data
end
def can_buffer_more_requests?
@handshake_completed &&
@streams.size < @max_concurrent_requests &&
@streams.size < @max_requests
end
def send(request)
if !@handshake_completed ||
@streams.size >= @max_concurrent_requests ||
@streams.size >= @max_requests
unless can_buffer_more_requests?
@pending << request
return
end

View File

@ -21,10 +21,6 @@ module HTTPX
@state = :negotiated if @keep_open
end
def interests
@interests || super
end
def protocol
@io.alpn_protocol || super
rescue StandardError
@ -66,6 +62,7 @@ module HTTPX
@io.connect_nonblock
@io.post_connection_check(@sni_hostname) if @ctx.verify_mode != OpenSSL::SSL::VERIFY_NONE
transition(:negotiated)
@interests = :w
rescue ::IO::WaitReadable
@interests = :r
rescue ::IO::WaitWritable

View File

@ -7,7 +7,7 @@ module HTTPX
class TCP
include Loggable
attr_reader :ip, :port, :addresses, :state
attr_reader :ip, :port, :addresses, :state, :interests
alias_method :host, :ip
@ -18,6 +18,7 @@ module HTTPX
@options = Options.new(options)
@fallback_protocol = @options.fallback_protocol
@port = origin.port
@interests = :w
if @options.io
@io = case @options.io
when Hash
@ -39,10 +40,6 @@ module HTTPX
@io ||= build_socket
end
def interests
:w
end
def to_io
@io.to_io
end
@ -62,6 +59,8 @@ module HTTPX
@io.connect_nonblock(Socket.sockaddr_in(@port, @ip.to_s))
rescue Errno::EISCONN
end
@interests = :w
transition(:connected)
rescue Errno::EHOSTUNREACH => e
raise e if @ip_index <= 0
@ -69,13 +68,15 @@ module HTTPX
@ip_index -= 1
retry
rescue Errno::ETIMEDOUT => e
raise ConnectTimeoutError, e.message if @ip_index <= 0
raise ConnectTimeoutError.new(@options.timeout.connect_timeout, e.message) if @ip_index <= 0
@ip_index -= 1
retry
rescue Errno::EINPROGRESS,
Errno::EALREADY,
::IO::WaitReadable
Errno::EALREADY
@interests = :w
rescue ::IO::WaitReadable
@interests = :r
end
if RUBY_VERSION < "2.3"

View File

@ -0,0 +1,93 @@
# frozen_string_literal: true
module HTTPX
module Plugins
#
# The InternalTelemetry plugin is for internal use only. It is therefore undocumented, and
# its use is disencouraged, as API compatiblity will **not be guaranteed**.
#
# The gist of it is: when debug_level of logger is enabled to 3 or greater, considered internal-only
# supported log levels, it'll be loaded by default.
#
# Against a specific point of time, which will be by default the session initialization, but can be set
# by the end user in $http_init_time, different diff metrics can be shown. The "point of time" is calculated
# using the monotonic clock.
module InternalTelemetry
module TrackTimeMethods
private
def elapsed_time
yield
ensure
meter_elapsed_time("#{self.class.superclass}##{caller_locations(1, 1)[0].label}")
end
def meter_elapsed_time(label)
$http_init_time ||= Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond)
prev_time = $http_init_time
after_time = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond)
# $http_init_time = after_time
elapsed = after_time - prev_time
warn(+"\e[31m" << "[ELAPSED TIME]: #{label}: #{elapsed} (ms)" << "\e[0m")
end
end
module InstanceMethods
def self.included(klass)
klass.prepend TrackTimeMethods
super
end
def initialize(*)
meter_elapsed_time("Session: initializing...")
super
meter_elapsed_time("Session: initialized!!!")
end
private
def build_requests(*)
elapsed_time { super }
end
def fetch_response(*)
response = super
meter_elapsed_time("Session -> response") if response
response
end
def close(*)
super
meter_elapsed_time("Session -> close")
end
end
module RequestMethods
def self.included(klass)
klass.prepend TrackTimeMethods
super
end
def transition(nextstate)
state = @state
super
meter_elapsed_time("Request[#{@verb} #{@uri}: #{state}] -> #{nextstate}") if nextstate == @state
end
end
module ConnectionMethods
def self.included(klass)
klass.prepend TrackTimeMethods
super
end
def transition(nextstate)
state = @state
super
meter_elapsed_time("Connection[#{@origin}]: #{state} -> #{nextstate}") if nextstate == @state
end
end
end
register_plugin :internal_telemetry, InternalTelemetry
end
end

View File

@ -16,6 +16,14 @@ module HTTPX
Error = Socks4Error
module ConnectionMethods
def interests
if @state == :connecting
return @write_buffer.empty? ? :r : :w
end
super
end
private
def transition(nextstate)

View File

@ -35,6 +35,14 @@ module HTTPX
super || @state == :authenticating || @state == :negotiating
end
def interests
if @state == :connecting || @state == :authenticating || @state == :negotiating
return @write_buffer.empty? ? :r : :w
end
super
end
private
def transition(nextstate)

View File

@ -135,7 +135,6 @@ module HTTPX
connection.on(:close) do
unregister_connection(connection)
end
return if connection.state == :open
end
def unregister_connection(connection)

View File

@ -268,8 +268,14 @@ module HTTPX
@error.message
end
def to_s
@error.backtrace.join("\n")
if Exception.method_defined?(:full_message)
def to_s
@error.full_message
end
else
def to_s
"#{@error.message} (#{@error.class})\n#{@error.backtrace.join("\n")}"
end
end
def raise_for_status

View File

@ -199,7 +199,18 @@ module HTTPX
responses << response
requests.shift
break if requests.empty? || pool.empty?
break if requests.empty?
next unless pool.empty?
# in some cases, the pool of connections might have been drained because there was some
# handshake error, and the error responses have already been emitted, but there was no
# opportunity to traverse the requests, hence we're returning only a fraction of the errors
# we were supposed to. This effectively fetches the existing responses and return them.
while (request = requests.shift)
responses << fetch_response(request, connections, request_options)
end
break
end
responses
ensure
@ -269,7 +280,17 @@ module HTTPX
end
# :nocov:
end
end
plugin(:proxy) unless ENV.grep(/https?_proxy$/i).empty?
unless ENV.grep(/https?_proxy$/i).empty?
proxy_session = plugin(:proxy)
::HTTPX.send(:remove_const, :Session)
::HTTPX.send(:const_set, :Session, proxy_session.class)
end
if Session.default_options.debug_level > 2
proxy_session = plugin(:internal_telemetry)
::HTTPX.send(:remove_const, :Session)
::HTTPX.send(:const_set, :Session, proxy_session.class)
end
end

View File

@ -23,6 +23,8 @@ module HTTPX
def <<: (String) -> void
def can_buffer_more_requests: () -> bool
def send: (Request) -> void
def consume: () -> void

View File

@ -174,7 +174,7 @@ module Requests
.with_timeout(total_timeout: 2)
.plugin(:multipart)
retries_response = retries_session.post(uri, retry_change_requests: true, form: { image: File.new(fixture_file_path) })
assert check_error[retries_response]
assert check_error[retries_response], "expected #{retries_response} to be an error response"
assert retries_session.calls == 1, "expect request to be retried 1 time (was #{retries_session.calls})"
end