HTTP/2 Connection Coalescing

This commit is contained in:
HoneyryderChuck 2018-09-04 15:00:08 +00:00
parent c8196b6f91
commit 112ea2a8ac
12 changed files with 84 additions and 25 deletions

View File

@ -25,6 +25,8 @@ services:
- ./:/home
entrypoint:
/home/test/support/ci/build.sh
links:
- "nghttp2:another"
socksproxy:
image: qautomatron/docker-3proxy

View File

@ -32,8 +32,8 @@ module HTTPX
end
alias_method :plugins, :plugin
def with(options)
branch(default_options.merge(options))
def with(options, &blk)
branch(default_options.merge(options), &blk)
end
private
@ -43,9 +43,9 @@ module HTTPX
end
# :nodoc:
def branch(options)
return self.class.new(options) if is_a?(Client)
Client.new(options)
def branch(options, &blk)
return self.class.new(options, &blk) if is_a?(Client)
Client.new(options, &blk)
end
end
end

View File

@ -58,7 +58,7 @@ module HTTPX
def_delegator :@write_buffer, :empty?
attr_reader :uri
attr_reader :uri, :state
def initialize(type, uri, options)
@type = type
@ -77,16 +77,29 @@ module HTTPX
@io = IO.registry(@type).new(@uri, addrs, @options)
end
def mergeable?(channel, addresses)
def mergeable?(addresses)
return false if @state == :closing || !@io
!(@io.addresses & addresses).empty? &&
@uri.port == channel.uri.port &&
@uri.scheme == channel.uri.scheme
!(@io.addresses & addresses).empty?
end
# coalescable channels need to be mergeable!
# but internally, #mergeable? is called before #coalescable?
def coalescable?(channel)
if @io.protocol == "h2" && @uri.scheme == "https"
@io.verify_hostname(channel.uri.host)
else
@uri.host == channel.uri.host &&
@uri.port == channel.uri.port &&
@uri.scheme == channel.uri.scheme
end
end
def merge(channel)
@hostnames += channel.instance_variable_get(:@hostnames)
@pending += channel.instance_variable_get(:@pending)
pending = channel.instance_variable_get(:@pending)
pending.each do |req, args|
send(req, args)
end
end
def match?(uri)
@ -243,6 +256,7 @@ module HTTPX
@io.connect
return unless @io.connected?
send_pending
emit(:open)
when :closing
return unless @state == :open
when :closed

View File

@ -42,6 +42,7 @@ module HTTPX
end
def close
@resolver.close unless @resolver.closed?
@channels.each(&:close)
next_tick until @channels.empty?
end
@ -81,13 +82,15 @@ module HTTPX
def on_resolver_channel(channel, addresses)
found_channel = @channels.find do |ch|
next if ch == channel
ch.mergeable?(channel, addresses)
ch != channel && ch.mergeable?(addresses)
end
if found_channel
found_channel.merge(channel)
return register_channel(channel) unless found_channel
if found_channel.state == :open
coalesce_channels(found_channel, channel)
else
register_channel(channel)
found_channel.once(:open) do
coalesce_channels(found_channel, channel)
end
end
end
@ -121,5 +124,14 @@ module HTTPX
return (@resolver.timeout || timeout) unless @resolver.closed?
timeout
end
def coalesce_channels(ch1, ch2)
if ch1.coalescable?(ch2)
ch1.merge(ch2)
@channels.delete(ch2)
else
register_channel(ch2)
end
end
end
end

View File

@ -28,6 +28,12 @@ module HTTPX
super
end
def verify_hostname(host)
return false if @ctx.verify_mode == OpenSSL::SSL::VERIFY_NONE
return false if @io.peer_cert.nil?
OpenSSL::SSL.verify_certificate_identity(@io.peer_cert, host)
end
def close
super
# allow reconnections

View File

@ -14,7 +14,7 @@ module HTTPX
register :https, :HTTPS
@lookup_mutex = Mutex.new
@lookups = {}
@lookups = Hash.new { |h, k| h[k] = [] }
@identifier_mutex = Mutex.new
@identifier = 1
@ -34,7 +34,10 @@ module HTTPX
entry["TTL"] += now
end
@lookup_mutex.synchronize do
@lookups[hostname] = entries
@lookups[hostname] += entries
entries.each do |entry|
@lookups[entry["name"]] << entry if entry["name"] != hostname
end
end
end
@ -80,7 +83,7 @@ module HTTPX
when Resolv::DNS::Resource::IN::CNAME
addresses << {
"name" => question.to_s,
"TTL" => value.ttl,
"TTL" => value.ttl,
"alias" => value.name.to_s,
}
when Resolv::DNS::Resource::IN::A,

View File

@ -141,7 +141,7 @@ module HTTPX
emit_addresses(channel, addresses.map { |addr| addr["data"] })
end
end
return emit(:close) if @channels.empty?
return if @channels.empty?
resolve
end

View File

@ -190,9 +190,13 @@ module HTTPX
channel = @queries.delete(address["name"])
return unless channel # probably a retried query for which there's an answer
if address.key?("alias") # CNAME
resolve(channel, address["alias"])
@queries.delete(address["name"])
return
if early_resolve(channel, hostname: address["alias"])
@channels.delete(channel)
else
resolve(channel, address["alias"])
@queries.delete(address["name"])
return
end
else
@channels.delete(channel)
Resolver.cached_lookup_set(channel.uri.host, addresses)

View File

@ -34,8 +34,7 @@ module HTTPX
emit(:resolve, channel, addresses)
end
def early_resolve(channel)
hostname = channel.uri.host
def early_resolve(channel, hostname: channel.uri.host)
addresses = ip_resolve(hostname) || Resolver.cached_lookup(hostname) || system_resolve(hostname)
return unless addresses
emit_addresses(channel, addresses)

View File

@ -0,0 +1,11 @@
# frozen_string_literal: true
module ResolverCachePurge
def purge_lookup_cache
@lookup_mutex.synchronize do
@lookups.clear
end
end
end
HTTPX::Resolver.extend(ResolverCachePurge)

View File

@ -56,6 +56,7 @@ module ResolverHelpers
return unless resolver.respond_to?(:parse)
channel = build_channel("http://ipv4c.tlund.se/")
resolver.queries["ipv4c.tlund.se"] = channel
# require "pry-byebug"; binding.pry
resolver.parse(cname_record)
assert channel.addresses.nil?
assert !resolver.queries.key?("ipv4c.tlund.se")
@ -73,6 +74,11 @@ module ResolverHelpers
private
def setup
super
HTTPX::Resolver.purge_lookup_cache
end
def build_channel(uri)
channel = HTTPX::Channel.by(URI(uri), HTTPX::Options.new)
channel.extend(ChannelExtensions)

View File

@ -1,3 +1,5 @@
# frozen_string_literal: true
module TimeoutForTest
# our own subclass so we never confused different timeouts
class TestTimeout < Timeout::Error