fix: http/1.1 recover from connection exhausted

While adding the test, the code to recover from an exhausted HTTP/1.1
connection proved not to be reliable, as it wasn't taking inflight
requests nor update-once keep-alive max requests counters into
account.

This has been fixed by implementing our own test dummy using
webrick.
This commit is contained in:
HoneyryderChuck 2020-11-26 19:40:35 +00:00
parent 34dc7df495
commit e6c9bb4714
6 changed files with 96 additions and 29 deletions

View File

@ -21,6 +21,7 @@ module HTTPX
@version = [1, 1]
@pending = []
@requests = []
@handshake_completed = false
end
def interests
@ -155,6 +156,7 @@ module HTTPX
@parser.reset!
@max_requests -= 1
manage_connection(response)
send(@pending.shift) unless @pending.empty?
end
@ -182,17 +184,28 @@ module HTTPX
connection = response.headers["connection"]
case connection
when /keep-alive/i
if @handshake_completed
if @max_requests.zero?
@pending.concat(@requests)
@requests.clear
emit(:exhausted)
end
return
end
keep_alive = response.headers["keep-alive"]
return unless keep_alive
parameters = Hash[keep_alive.split(/ *, */).map do |pair|
pair.split(/ *= */)
end]
@max_requests = parameters["max"].to_i if parameters.key?("max")
@max_requests = parameters["max"].to_i - 1 if parameters.key?("max")
if parameters.key?("timeout")
keep_alive_timeout = parameters["timeout"].to_i
emit(:timeout, keep_alive_timeout)
end
@handshake_completed = true
when /close/i
disable
when nil

View File

@ -45,6 +45,24 @@ class HTTPTest < Minitest::Test
assert log_output.match(/HEADER: content-length: \d+/)
end
def test_max_streams
server = KeepAliveServer.new
th = Thread.new { server.start }
begin
uri = "#{server.origin}/"
HTTPX.plugin(SessionWithPool).with(max_concurrent_requests: 1).wrap do |http|
responses = http.get(uri, uri, uri)
assert responses.size == 3, "expected 3 responses, got #{responses.size}"
connection_count = http.pool.connection_count
assert connection_count == 2, "expected to have 2 connections, instead have #{connection_count}"
assert http.connection_exausted, "expected 1 connnection to have exhausted"
end
ensure
server.shutdown
th.join
end
end
private
def origin(orig = httpbin)

View File

@ -75,24 +75,28 @@ class HTTPSTest < Minitest::Test
assert log_output.match(/HEADER: content-length: \d+/)
end
def test_http2_max_streams
uri = build_uri("/get")
HTTPX.plugin(SessionWithSingleStream).plugin(SessionWithPool).wrap do |http|
http.get(uri, uri)
connection_count = http.pool.connection_count
assert connection_count == 2, "expected to have 2 connections, instead have #{connection_count}"
assert http.connection_exausted, "expected 1 connnection to have exhausted"
end
end
unless RUBY_ENGINE == "jruby" || RUBY_VERSION < "2.3"
# HTTP/2-specific tests
def test_http2_uncoalesce_on_misdirected
uri = build_uri("/status/421")
HTTPX.plugin(SessionWithPool).wrap do |http|
response = http.get(uri)
verify_status(response, 421)
connection_count = http.pool.connection_count
assert connection_count == 2, "expected to have 2 connections, instead have #{connection_count}"
assert response.version == "1.1", "request should have been retried with HTTP/1.1"
def test_http2_max_streams
uri = build_uri("/get")
HTTPX.plugin(SessionWithSingleStream).plugin(SessionWithPool).wrap do |http|
http.get(uri, uri)
connection_count = http.pool.connection_count
assert connection_count == 2, "expected to have 2 connections, instead have #{connection_count}"
assert http.connection_exausted, "expected 1 connnection to have exhausted"
end
end
def test_http2_uncoalesce_on_misdirected
uri = build_uri("/status/421")
HTTPX.plugin(SessionWithPool).wrap do |http|
response = http.get(uri)
verify_status(response, 421)
connection_count = http.pool.connection_count
assert connection_count == 2, "expected to have 2 connections, instead have #{connection_count}"
assert response.version == "1.1", "request should have been retried with HTTP/1.1"
end
end
end

View File

@ -0,0 +1,34 @@
# frozen_string_literal: true
require "webrick"
require "logger"
class KeepAliveServer < WEBrick::HTTPServer
class KeepAliveApp < WEBrick::HTTPServlet::AbstractServlet
SOCKS = [].freeze
def do_GET(_req, res) # rubocop:disable Naming/MethodName
res.status = 200
res["Connection"] = "Keep-Alive"
res["Keep-Alive"] = "max=2"
res["Content-Type"] = "application/json"
res.body = "{\"counter\": 2}"
end
end
def initialize(options = {})
super({
:BindAddress => "127.0.0.1",
:Port => 0,
:AccessLog => File.new(File::NULL),
:Logger => Logger.new(File::NULL),
}.merge(options))
mount("/", KeepAliveApp)
end
def origin
sock = listeners.first
_, sock, ip, _ = sock.addr
"http://#{ip}:#{sock}"
end
end

View File

@ -20,9 +20,18 @@ module SessionWithPool
end
module InstanceMethods
attr_reader :connection_exausted
def pool
@pool ||= ConnectionPool.new
end
def set_connection_callbacks(connection, connections, options)
super
connection.on(:exhausted) do
@connection_exausted = true
end
end
end
module ConnectionMethods

View File

@ -17,15 +17,4 @@ module SessionWithSingleStream
parser
end
end
module InstanceMethods
attr_reader :connection_exausted
def set_connection_callbacks(connection, connections, options)
super
connection.on(:exhausted) do
@connection_exausted = true
end
end
end
end