Merge branch 'push_promise' into 'master'

Push promise

See merge request honeyryderchuck/httpx!11
This commit is contained in:
HoneyryderChuck 2018-01-13 21:55:58 +00:00
commit e58620b733
17 changed files with 203 additions and 80 deletions

View File

@ -17,9 +17,14 @@ module HTTPX
callbacks(type).delete_if { |pr| pr[*args] == :delete }
end
private
protected
def callbacks(type)
def inherit_callbacks(callbackable)
@callbacks = callbackable.callbacks
end
def callbacks(type=nil)
return @callbacks unless type
@callbacks ||= Hash.new { |h, k| h[k] = [] }
@callbacks[type]
end

View File

@ -2,40 +2,10 @@
module HTTPX
module Chainable
def head(uri, **options)
request(:head, uri, **options)
end
def get(uri, **options)
request(:get, uri, **options)
end
def post(uri, **options)
request(:post, uri, **options)
end
def put(uri, **options)
request(:put, uri, **options)
end
def delete(uri, **options)
request(:delete, uri, **options)
end
def trace(uri, **options)
request(:trace, uri, **options)
end
def options(uri, **options)
request(:options, uri, **options)
end
def connect(uri, **options)
request(:connect, uri, **options)
end
def patch(uri, **options)
request(:patch, uri, **options)
%i[head get post put delete trace options connect patch].each do |meth|
define_method meth do |*uri, **options|
request(meth, *uri, **options)
end
end
def request(verb, uri, **options)
@ -63,10 +33,14 @@ module HTTPX
end
alias :plugins :plugin
def with(options)
branch(default_options.merge(options))
end
private
def default_options
@default_options || Options.new
@options || Options.new
end
# :nodoc:

View File

@ -32,6 +32,7 @@ module HTTPX
extend Forwardable
include Registry
include Loggable
include Callbacks
require "httpx/channel/http2"
require "httpx/channel/http1"
@ -39,7 +40,7 @@ module HTTPX
BUFFER_SIZE = 1 << 14
class << self
def by(uri, options, &blk)
def by(uri, options)
io = case uri.scheme
when "http"
IO.registry("tcp").new(uri.host, uri.port, options)
@ -48,7 +49,7 @@ module HTTPX
else
raise Error, "#{uri.scheme}: unrecognized channel"
end
new(io, options, &blk)
new(io, options)
end
end
@ -56,14 +57,13 @@ module HTTPX
def_delegator :@write_buffer, :empty?
def initialize(io, options, &on_response)
def initialize(io, options)
@io = io
@options = Options.new(options)
@window_size = @options.window_size
@read_buffer = "".b
@write_buffer = Buffer.new(BUFFER_SIZE)
@pending = []
@on_response = on_response
end
def match?(uri)
@ -161,7 +161,7 @@ module HTTPX
def build_parser(protocol=@io.protocol)
parser = registry(protocol).new(@write_buffer, @options)
parser.on(:response, &@on_response)
parser.inherit_callbacks(self)
parser.on(:close) { throw(:close, self) }
parser
end

View File

@ -7,6 +7,8 @@ module HTTPX
include Callbacks
include Loggable
attr_reader :streams, :pending
def initialize(buffer, options)
@options = Options.new(options)
@max_concurrent_requests = @options.max_concurrent_requests
@ -94,7 +96,7 @@ module HTTPX
if request.expects?
return handle(request, stream)
end
response = request.response || ErrorResponse.new(error, retries)
response = request.response || ErrorResponse.new(error, @retries)
emit(:response, request, response)
log(2, "#{stream.id}: ") { "closing stream" }
@ -202,9 +204,7 @@ module HTTPX
end
def on_promise(stream)
log(2, "#{stream.id}: ") { "refusing stream!" }
stream.refuse
# TODO: policy for handling promises
emit(:promise, self, stream)
end
def method_missing(meth, *args, &blk)

View File

@ -2,11 +2,12 @@
module HTTPX
class Client
include Loggable
include Chainable
def initialize(options = {})
@default_options = self.class.default_options.merge(options)
@connection = Connection.new(@default_options)
@options = self.class.default_options.merge(options)
@connection = Connection.new(@options)
@responses = {}
if block_given?
begin
@ -38,6 +39,12 @@ module HTTPX
@responses[request] = response
end
def on_promise(_, stream)
log(2, "#{stream.id}: ") { "refusing stream!" }
stream.refuse
# TODO: policy for handling promises
end
def fetch_response(request)
response = @responses.delete(request)
if response.is_a?(ErrorResponse) && response.retryable?
@ -50,26 +57,33 @@ module HTTPX
def find_channel(request)
uri = URI(request.uri)
@connection.find_channel(uri) ||
@connection.build_channel(uri, &method(:on_response))
@connection.find_channel(uri) || begin
channel = @connection.build_channel(uri)
set_channel_callbacks(channel)
channel
end
end
def set_channel_callbacks(channel)
channel.on(:response, &method(:on_response))
channel.on(:promise, &method(:on_promise))
end
def __build_reqs(*args, **options)
case args.size
when 1
reqs = args.first
requests = reqs.map do |verb, uri, opts = {}|
__build_req(verb, uri, options.merge(opts))
requests = reqs.map do |verb, uri|
__build_req(verb, uri, options)
end
when 2, 3
verb, uris, opts = args
opts ||= {}
verb, *uris = args
if uris.respond_to?(:each)
requests = uris.map do |uri|
__build_req(verb, uri, options.merge(opts))
__build_req(verb, uri, options)
end
else
[ __build_req(verb, uris, options.merge(opts)) ]
[ __build_req(verb, uris, options) ]
end
else
raise ArgumentError, "unsupported number of arguments"
@ -96,8 +110,8 @@ module HTTPX
end
def __build_req(verb, uri, options = {})
rklass = @default_options.request_class
rklass.new(verb, uri, @default_options.merge(options))
rklass = @options.request_class
rklass.new(verb, uri, @options.merge(options))
end
@default_options = Options.new

View File

@ -38,8 +38,8 @@ module HTTPX
end
end
def build_channel(uri, &on_response)
channel = Channel.by(uri, @options, &on_response)
def build_channel(uri)
channel = Channel.by(uri, @options)
register_channel(channel)
channel
end

View File

@ -18,7 +18,7 @@ module HTTPX
keep_open = @keep_open
@keep_open = true
max_redirects = @default_options.max_redirects || MAX_REDIRECTS
max_redirects = @options.max_redirects || MAX_REDIRECTS
requests = __build_reqs(*args, **options)
responses = __send_reqs(*requests)

View File

@ -18,7 +18,7 @@ module HTTPX
upgrade_request.headers["upgrade"] = "h2c"
upgrade_request.headers.add("connection", "upgrade")
upgrade_request.headers.add("connection", "http2-settings")
upgrade_request.headers["http2-settings"] = HTTP2::Client.settings_header(@default_options.http2_settings)
upgrade_request.headers["http2-settings"] = HTTP2::Client.settings_header(@options.http2_settings)
# TODO: validate!
upgrade_response = __send_reqs(*upgrade_request).first

View File

@ -34,7 +34,7 @@ module HTTPX
private
def proxy_params(uri)
return @default_options.proxy if @default_options.proxy
return @options.proxy if @options.proxy
uri = URI(uri).find_proxy
return unless uri
{ uri: uri }
@ -44,16 +44,19 @@ module HTTPX
uri = URI(request.uri)
proxy = proxy_params(uri)
return super unless proxy
@connection.find_channel(proxy) ||
build_proxy_channel(proxy)
@connection.find_channel(proxy) || begin
channel = build_proxy_channel(proxy)
set_channel_callbacks(channel)
channel
end
end
def build_proxy_channel(proxy)
parameters = Parameters.new(**proxy)
uri = parameters.uri
io = TCP.new(uri.host, uri.port, @default_options)
io = TCP.new(uri.host, uri.port, @options)
proxy_type = Parameters.registry(parameters.uri.scheme)
channel = proxy_type.new(io, parameters, @default_options, &method(:on_response))
channel = proxy_type.new(io, parameters, @options, &method(:on_response))
@connection.__send__(:register_channel, channel)
channel
end

View File

@ -44,7 +44,7 @@ module HTTPX
@parser = nil
when :idle
@parser = ProxyParser.new(@write_buffer, @options)
@parser.on(:response, &@on_response)
@parser.inherit_callbacks(self)
@parser.on(:close) { throw(:close, self) }
else
return

View File

@ -0,0 +1,84 @@
# frozen_string_literal: true
module HTTPX
module Plugins
module PushPromise
PUSH_OPTIONS = { http2_settings: { settings_enable_push: 1 },
max_concurrent_requests: 1 }
module RequestMethods
def headers=(h)
@headers = @options.headers_class.new(h)
end
end
module InstanceMethods
def initialize(opts = {})
super(PUSH_OPTIONS.merge(opts))
@promise_headers = {}
end
private
def on_promise(parser, stream)
stream.on(:headers) do |h|
k, _ = h.first
if k == ":method"
__on_promise_request(parser, stream, h)
else
__on_promise_response(parser, stream, h)
end
end
end
def __on_promise_request(parser, stream, h)
log(1, "#{stream.id}: ") do
h.map { |k, v| "-> HEADER: #{k}: #{v}" }.join("\n")
end
headers = @options.headers_class.new(h)
path = headers[":path"]
authority = headers[":authority"]
request = parser.pending.find { |r| r.authority == authority && r.path == path }
if request
request.headers = headers
@promise_headers[stream] = request
else
stream.refuse
end
end
def __on_promise_response(parser, stream, h)
log(1, "#{stream.id}(promise): ") do
h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
end
request = @promise_headers.delete(stream)
return unless request
_, status = h.shift
headers = @options.headers_class.new(h)
response = @options.response_class.new(request, status, "2.0", headers, @options)
request.response = response
request.transition(:done)
parser.streams[request] = stream
stream.on(:data) do |data|
log(1, "#{stream.id}(promise): ") { "<- DATA: #{data.bytesize} bytes..." }
log(2, "#{stream.id}(promise): ") { "<- #{data.inspect}" }
request.response << data
end
stream.on(:close) do |error|
if request.expects?
return handle(request, stream)
end
response = request.response || ErrorResponse.new(error, retries)
on_response(request, response)
log(2, "#{stream.id}(promise): ") { "closing stream" }
parser.streams.delete(request)
end
end
end
end
register_plugin(:push_promise, PushPromise)
end
end

View File

@ -188,8 +188,7 @@ module HTTPX
end
end
when :done
return unless @state == :body ||
@state == :headers
return if @state == :expect
end
@state = nextstate
nil

View File

@ -49,11 +49,11 @@ class ClientTest < Minitest::Test
end
def options
@default_options
@options
end
def response(*args)
@default_options.response_class.new(*args, @default_options)
@options.response_class.new(*args, @options)
end
end
self::OptionsClassMethods = Module.new do

View File

@ -16,6 +16,7 @@ class HTTP2Test < HTTPTest
include Plugins::FollowRedirects
include Plugins::Cookies
include Plugins::Compression
include Plugins::PushPromise
private

View File

@ -10,14 +10,18 @@ module ResponseHelpers
%w(header param).each do |meth|
class_eval <<-DEFINE, __FILE__, __LINE__ + 1
def verify_#{meth}(#{meth}s, key, expect)
assert #{meth}s.key?(key), "#{meth}s don't contain the given key (" + key + ")"
assert #{meth}s.key?(key), "#{meth}s don't contain the given key (\#{key})"
value = #{meth}s[key]
if value.respond_to?(:start_with?)
assert value.start_with?(expect), "#{meth} assertion failed: " + key + "=" + value + " (expected: " + expect + ")"
assert value.start_with?(expect), "#{meth} assertion failed: \#{key}=\#{value} (expected: \#{expect}})"
else
assert value == expect, "#{meth} assertion failed: " + key + "=" + value.to_s + " (expected: " + expect.to_s + ")"
assert value == expect, "#{meth} assertion failed: \#{key}=\#{value.to_s} (expected: \#{expect.to_s})"
end
end
def verify_no_#{meth}(#{meth}s, key)
assert !#{meth}s.key?(key), "#{meth}s contains the given key (" + key + "=\#{#{meth}s[key]})"
end
DEFINE
end

View File

@ -5,10 +5,10 @@ module Requests
module Proxy
# https://www.sslproxies.org
PROXIES = %W[
185.82.212.95:8080
137.74.168.174:8080
]
def test_plugin_proxy_anonymous
def test_plugin_http_proxy
client = HTTPX.plugin(:proxy).with_proxy(uri: http_proxy_uri)
uri = build_uri("/get")
response = client.get(uri)
@ -47,15 +47,15 @@ module Requests
end
def socks4_proxy_uri
"socks4://119.28.107.60:1080"
"socks4://138.201.6.100:8080"
end
def socks4a_proxy_uri
"socks4a://119.28.107.60:1080"
"socks4a://138.201.6.100:8080"
end
def socks5_proxy_uri
"socks5://118.201.230.192:58303"
"socks5://99.194.30.192:47997"
end
end
end

View File

@ -0,0 +1,39 @@
# frozen_string_literal: true
module Requests
module Plugins
module PushPromise
def test_plugin_push_promise_get
client = HTTPX.plugin(:push_promise)
html, css = client.get(push_html_uri, push_css_uri)
verify_status(html.status, 200)
verify_status(css.status, 200)
verify_header(css.headers, "x-http2-push", "1")
end
def test_plugin_push_promise_concurrent
client = HTTPX.plugin(:push_promise)
.with(max_concurrent_requests: 100)
html, css = client.get(push_html_uri, push_css_uri)
verify_status(html.status, 200)
verify_status(css.status, 200)
verify_no_header(css.headers, "x-http2-push")
end
private
def push_origin
"https://nghttp2.org"
end
def push_html_uri
"#{push_origin}/"
end
def push_css_uri
"#{push_origin}/stylesheets/screen.css"
end
end
end
end