added retryability, in that certain errors (for http2 now, rst_stream ones); added a max concurrent requests setting, which governs how many concurrent requests might be inflight (besides the obvious settings handled by the lower layer)

This commit is contained in:
HoneyryderChuck 2017-12-06 14:27:58 +00:00
parent abff891a14
commit 7cc2f66eed
7 changed files with 69 additions and 26 deletions

View File

@ -78,11 +78,11 @@ module HTTPX
@write_buffer.empty?
end
def send(request)
def send(request, **args)
if @processor
@processor.send(request)
@processor.send(request, **args)
else
@pending << request
@pending << [request, args]
end
end
@ -115,11 +115,11 @@ module HTTPX
def set_processor
return @processor if defined?(@processor)
@processor = PROTOCOLS[@io.protocol].new(@write_buffer)
@processor = PROTOCOLS[@io.protocol].new(@write_buffer, @options)
@processor.on(:response, &@on_response)
@processor.on(:close) { throw(:close, self) }
while request = @pending.shift
@processor.send(request)
while (request, args = @pending.shift)
@processor.send(request, **args)
end
@processor
end

View File

@ -1,4 +1,5 @@
# frozen_string_literal: true
require "http_parser"
module HTTPX
@ -7,11 +8,14 @@ module HTTPX
CRLF = "\r\n"
def initialize(buffer, version: [1,1], **)
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
@parser = HTTP::Parser.new(self)
@parser.header_value_type = :arrays
@buffer = buffer
@version = version
@version = [1,1]
@pending = []
@requests = []
@responses = []
end
@ -29,7 +33,11 @@ module HTTPX
@parser << data
end
def send(request)
def send(request, **)
if @requests.size >= @max_concurrent_requests
@pending << request
return
end
@requests << request
join_headers(request)
join_body(request)
@ -66,6 +74,8 @@ module HTTPX
emit(:response, request, response)
reset
emit(:close) if response.headers["connection"] == "close"
send(@pending.shift) unless @pending.empty?
end
private

View File

@ -5,8 +5,11 @@ module HTTPX
class Channel::HTTP2
include Callbacks
def initialize(buffer)
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
init_connection
@retries = options.max_retries
@pending = []
@streams = {}
@buffer = buffer
@ -24,15 +27,15 @@ module HTTPX
@connection << data
end
def send(request)
# if @connection.active_stream_count >= @connection.remote_settings[:settings_max_concurrent_streams]
# @pending << request
# return
# end
def send(request, retries: @retries, **)
if @connection.active_stream_count >= @max_concurrent_requests
@pending << request
return
end
stream = @connection.new_stream
stream.on(:close) do |error|
response = @streams.delete(stream.id) ||
ErrorResponse.new(error)
ErrorResponse.new(error, retries)
emit(:response, request, response)
send(@pending.shift) unless @pending.empty?
@ -68,6 +71,7 @@ module HTTPX
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:promise, &method(:on_promise))
@connection.on(:altsvc, &method(:on_altsvc))
@connection.on(:settings_ack, &method(:on_settings))
end
def join_headers(stream, request)
@ -96,6 +100,10 @@ module HTTPX
@buffer << bytes
end
def on_settings(*)
@max_concurrent_requests = [@max_concurrent_requests, @connection.remote_settings[:settings_max_concurrent_streams]].min
end
def on_frame_sent(frame)
log { "frame was sent!" }
log do

View File

@ -39,15 +39,27 @@ module HTTPX
end
end
def <<(request)
def send(request, **args)
channel = bind(request.uri)
raise Error, "no channel available" unless channel
channel.send(request)
channel.send(request, **args)
end
alias :<< :send
def response(request)
@responses.delete(request)
response = @responses.delete(request)
case response
when ErrorResponse
if response.retryable?
send(request, retries: response.retries - 1)
nil
else
response
end
else
response
end
end
def process_events(timeout: @timeout.timeout)

View File

@ -2,9 +2,8 @@
module HTTPX
class Options
KEEP_ALIVE_TIMEOUT = 5
OPERATION_TIMEOUT = 5
CONNECT_TIMEOUT = 5
MAX_CONCURRENT_REQUESTS = 100
MAX_RETRIES = 3
class << self
def new(options = {})
@ -38,6 +37,8 @@ module HTTPX
:timeout => Timeout.by(:null),
:headers => {},
:cookies => {},
:max_concurrent_requests => MAX_CONCURRENT_REQUESTS,
:max_retries => MAX_RETRIES,
}
defaults.merge!(options)
@ -60,9 +61,14 @@ module HTTPX
self.timeout = Timeout.by(type, opts)
end
def_option(:max_concurrent_requests) do |num|
max = Integer(num)
raise Error, ":max_concurrent_requests must be positive" unless max.positive?
self.max_concurrent_requests = max
end
%w[
proxy params form json body follow
ssl
proxy params form json body follow ssl max_retries
].each do |method_name|
def_option(method_name)
end

View File

@ -21,12 +21,17 @@ module HTTPX
class ErrorResponse
attr_reader :error
attr_reader :error, :retries
alias :status :error
def initialize(error)
def initialize(error, retries)
@error = error
@retries = retries
end
def retryable?
@retries.positive?
end
end
end

View File

@ -67,6 +67,8 @@ class OptionsSpec < Minitest::Test
:headers => {"Foo" => "foo", "Accept" => "xml", "Bar" => "bar"},
:proxy => {:proxy_address => "127.0.0.1", :proxy_port => 8080},
:cookies => {},
:max_concurrent_requests => 100,
:max_retries => 3,
}, "options haven't merged correctly"
end