Merge branch 'brotli' into 'master'

Brotli

Closes #16

See merge request honeyryderchuck/httpx!13
This commit is contained in:
HoneyryderChuck 2018-01-13 23:20:19 +00:00
commit 0126ea95dd
20 changed files with 239 additions and 195 deletions

View File

@ -6,6 +6,7 @@ gemspec
gem "rake" gem "rake"
platform :mri do platform :mri do
gem "brotli", require: false
gem "pry-byebug", require: false gem "pry-byebug", require: false
end end
# gem "guard-rspec", :require => false # gem "guard-rspec", :require => false

View File

@ -12,8 +12,8 @@ module HTTPX
branch(**options).request(verb, uri) branch(**options).request(verb, uri)
end end
def timeout(klass, **options) def timeout(**args)
branch(timeout: Timeout.by(klass, **options)) branch(timeout: args)
end end
def headers(headers) def headers(headers)

View File

@ -64,6 +64,7 @@ module HTTPX
@read_buffer = "".b @read_buffer = "".b
@write_buffer = Buffer.new(BUFFER_SIZE) @write_buffer = Buffer.new(BUFFER_SIZE)
@pending = [] @pending = []
@state = :idle
end end
def match?(uri) def match?(uri)
@ -75,18 +76,17 @@ module HTTPX
end end
def to_io def to_io
connect case @state
when :idle
transition(:open)
when :open
end
@io.to_io @io.to_io
end end
def close(hard=false) def close(hard=false)
if pr = @parser pr = @parser
pr.close transition(:closed)
@parser = nil
end
@io.close
@read_buffer.clear
@write_buffer.clear
return true if hard return true if hard
unless pr && pr.empty? unless pr && pr.empty?
connect connect
@ -106,7 +106,7 @@ module HTTPX
end end
def call def call
return if closed? return if @state == :closed
catch(:called) do catch(:called) do
dread dread
dwrite dwrite
@ -122,11 +122,6 @@ module HTTPX
private private
def connect
@io.connect
send_pending
end
def dread(wsize = @window_size) def dread(wsize = @window_size)
loop do loop do
siz = @io.read(wsize, @read_buffer) siz = @io.read(wsize, @read_buffer)
@ -148,7 +143,6 @@ module HTTPX
end end
def send_pending def send_pending
return if @io.closed?
while !@write_buffer.full? && (req_args = @pending.shift) while !@write_buffer.full? && (req_args = @pending.shift)
request, args = req_args request, args = req_args
parser.send(request, **args) parser.send(request, **args)
@ -165,5 +159,27 @@ module HTTPX
parser.on(:close) { throw(:close, self) } parser.on(:close) { throw(:close, self) }
parser parser
end end
def transition(nextstate)
case nextstate
when :idle
when :open
return if @state == :closed
@io.connect
return if @io.closed?
send_pending
when :closed
return if @state == :idle
if pr = @parser
pr.close
@parser = nil
end
@io.close
@read_buffer.clear
@write_buffer.clear
end
@state = nextstate
end
end end
end end

View File

@ -99,12 +99,20 @@ module HTTPX
# guarantee ordered responses # guarantee ordered responses
loop do loop do
request = requests.shift begin
request = requests.first
@connection.next_tick until response = fetch_response(request) @connection.next_tick until response = fetch_response(request)
responses << response responses << response
requests.shift
break if requests.empty? break if requests.empty?
rescue TimeoutError => e
while requests.shift
responses << ErrorResponse.new(e.message, 0)
end
break
end
end end
requests.size == 1 ? responses.first : responses requests.size == 1 ? responses.first : responses
end end

View File

@ -44,7 +44,7 @@ module HTTPX
:ssl => { alpn_protocols: %w[h2 http/1.1] }, :ssl => { alpn_protocols: %w[h2 http/1.1] },
:http2_settings => { settings_enable_push: 0 }, :http2_settings => { settings_enable_push: 0 },
:fallback_protocol => "http/1.1", :fallback_protocol => "http/1.1",
:timeout => Timeout.by(:per_operation), :timeout => Timeout.new,
:headers => {}, :headers => {},
:max_concurrent_requests => MAX_CONCURRENT_REQUESTS, :max_concurrent_requests => MAX_CONCURRENT_REQUESTS,
:max_retries => MAX_RETRIES, :max_retries => MAX_RETRIES,
@ -65,8 +65,8 @@ module HTTPX
self.headers.merge(headers) self.headers.merge(headers)
end end
def_option(:timeout) do |type, opts| def_option(:timeout) do |opts|
self.timeout = Timeout.by(type, opts) self.timeout = Timeout.new(opts)
end end
def_option(:max_concurrent_requests) do |num| def_option(:max_concurrent_requests) do |num|
@ -98,7 +98,7 @@ module HTTPX
merged = h1.merge(h2) do |k, v1, v2| merged = h1.merge(h2) do |k, v1, v2|
case k case k
when :headers, :ssl, :http2_settings when :headers, :ssl, :http2_settings, :timeout
v1.merge(v2) v1.merge(v2)
else else
v2 v2

View File

@ -0,0 +1,40 @@
# frozen_string_literal: true
module HTTPX
module Plugins
module Compression
module Brotli
def self.load_dependencies(klass, *)
klass.plugin(:compression)
require "brotli"
end
def self.configure(*)
Transcoder.register "br", BrotliTranscoder
end
module ResponseBodyMethods
def write(chunk)
chunk = decompress(chunk)
super(chunk)
end
end
module BrotliTranscoder
module_function
def encode(payload)
::Brotli.deflate(payload)
end
def decode(io)
::Brotli.inflate(io)
end
end
end
end
register_plugin :"compression/brotli", Compression::Brotli
end
end

View File

@ -5,7 +5,7 @@ module HTTPX
module Compression module Compression
module GZIP module GZIP
def self_load_dependencies(*) def self.load_dependencies(*)
require "zlib" require "zlib"
end end

View File

@ -84,25 +84,15 @@ module HTTPX
def initialize(io, parameters, options, &blk) def initialize(io, parameters, options, &blk)
super(io, options, &blk) super(io, options, &blk)
@parameters = parameters @parameters = parameters
@state = :idle
end end
def match?(*) def match?(*)
true true
end end
def send_pending def to_io
return if @pending.empty? transition(:connecting) if @state == :idle
case @state super
when :open
# normal flow after connection
return super
when :connecting
# do NOT enqueue requests if proxy is connecting
return
when :idle
proxy_connect
end
end end
end end

View File

@ -17,7 +17,6 @@ module HTTPX
# and therefore, will share the connection. # and therefore, will share the connection.
# #
if req.uri.scheme == "https" if req.uri.scheme == "https"
transition(:connecting)
connect_request = ConnectRequest.new(req.uri) connect_request = ConnectRequest.new(req.uri)
if @parameters.authenticated? if @parameters.authenticated?
connect_request.headers["proxy-authentication"] = "Basic #{@parameters.token_authentication}" connect_request.headers["proxy-authentication"] = "Basic #{@parameters.token_authentication}"
@ -25,18 +24,20 @@ module HTTPX
parser.send(connect_request) parser.send(connect_request)
else else
transition(:open) transition(:open)
send_pending
end end
end end
def transition(nextstate) def transition(nextstate)
case nextstate case nextstate
when :idle
when :connecting when :connecting
return unless @state == :idle return unless @state == :idle
@io.connect
return if @io.closed?
@parser = ConnectProxyParser.new(@write_buffer, @options.merge(max_concurrent_requests: 1)) @parser = ConnectProxyParser.new(@write_buffer, @options.merge(max_concurrent_requests: 1))
@parser.once(:response, &method(:on_connect)) @parser.once(:response, &method(:on_connect))
@parser.on(:close) { throw(:close, self) } @parser.on(:close) { throw(:close, self) }
proxy_connect
return if @state == :open
when :open when :open
case @state case @state
when :connecting when :connecting
@ -46,19 +47,17 @@ module HTTPX
@parser = ProxyParser.new(@write_buffer, @options) @parser = ProxyParser.new(@write_buffer, @options)
@parser.inherit_callbacks(self) @parser.inherit_callbacks(self)
@parser.on(:close) { throw(:close, self) } @parser.on(:close) { throw(:close, self) }
else
return
end end
end end
@state = nextstate super
end end
def on_connect(request, response) def on_connect(request, response)
if response.status == 200 if response.status == 200
transition(:open)
req, _ = @pending.first req, _ = @pending.first
request_uri = req.uri request_uri = req.uri
@io = ProxySSL.new(@io, request_uri, @options) @io = ProxySSL.new(@io, request_uri, @options)
transition(:open)
throw(:called) throw(:called)
else else
pending = @parser.pending pending = @parser.pending

View File

@ -19,22 +19,21 @@ module HTTPX
def proxy_connect def proxy_connect
@parser = SocksParser.new(@write_buffer, @options) @parser = SocksParser.new(@write_buffer, @options)
@parser.once(:packet, &method(:on_packet)) @parser.once(:packet, &method(:on_packet))
transition(:connecting)
end end
def on_packet(packet) def on_packet(packet)
version, status, port, ip = packet.unpack("CCnN") version, status, port, ip = packet.unpack("CCnN")
if status == GRANTED if status == GRANTED
transition(:open)
req, _ = @pending.first req, _ = @pending.first
request_uri = req.uri request_uri = req.uri
if request_uri.scheme == "https" if request_uri.scheme == "https"
@io = ProxySSL.new(@io, request_uri, @options) @io = ProxySSL.new(@io, request_uri, @options)
end end
transition(:open)
throw(:called) throw(:called)
else else
pending = @parser.instance_variable_get(:@pending) response = ErrorResponse.new("socks error: #{status}", 0)
while req = pending.shift while (req, _ = @pending.shift)
@on_response.call(req, response) @on_response.call(req, response)
end end
end end
@ -42,18 +41,20 @@ module HTTPX
def transition(nextstate) def transition(nextstate)
case nextstate case nextstate
when :idle
when :connecting when :connecting
return unless @state == :idle return unless @state == :idle
@io.connect
return if @io.closed?
req, _ = @pending.first req, _ = @pending.first
request_uri = req.uri request_uri = req.uri
@write_buffer << Packet.connect(@parameters, request_uri) @write_buffer << Packet.connect(@parameters, request_uri)
proxy_connect
when :open when :open
return unless :connecting return unless @state == :connecting
@parser = nil @parser = nil
end end
log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" } log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" }
@state = nextstate super
end end
end end
Parameters.register("socks4", Socks4ProxyChannel) Parameters.register("socks4", Socks4ProxyChannel)
@ -73,6 +74,10 @@ module HTTPX
def consume(*) def consume(*)
end end
def empty?
true
end
def <<(packet) def <<(packet)
emit(:packet, packet) emit(:packet, packet)
end end

View File

@ -28,7 +28,7 @@ module HTTPX
def on_packet(packet) def on_packet(packet)
case @state case @state
when :negotiating when :connecting
version, method = packet.unpack("CC") version, method = packet.unpack("CC")
check_version(version) check_version(version)
case method case method
@ -36,54 +36,63 @@ module HTTPX
transition(:authenticating) transition(:authenticating)
return return
when NONE when NONE
raise Error, "no supported authorization methods" on_error_response("no supported authorization methods")
else else
transition(:connecting) transition(:negotiating)
end end
when :authenticating when :authenticating
version, status = packet.unpack("CC") version, status = packet.unpack("CC")
check_version(version) check_version(version)
raise Error, "could not authorize" if status != SUCCESS return transition(:negotiating) if status == SUCCESS
transition(:connecting) on_error_response("socks authentication error: #{status}")
when :connecting when :negotiating
version, reply, = packet.unpack("CC") version, reply, = packet.unpack("CC")
check_version(version) check_version(version)
raise Error, "Illegal response type" unless reply == SUCCESS return on_error_response("socks5 negotiation error: #{reply}") unless reply == SUCCESS
transition(:open)
req, _ = @pending.first req, _ = @pending.first
request_uri = req.uri request_uri = req.uri
if request_uri.scheme == "https" if request_uri.scheme == "https"
@io = ProxySSL.new(@io, request_uri, @options) @io = ProxySSL.new(@io, request_uri, @options)
end end
transition(:open)
throw(:called) throw(:called)
end end
end end
def transition(nextstate) def transition(nextstate)
case nextstate case nextstate
when :idle
when :negotiating
return unless @state == :idle
@write_buffer << Packet.negotiate(@parameters)
when :authenticating
return unless @state == :negotiating
@write_buffer << Packet.authenticate(@parameters)
when :connecting when :connecting
return unless @state == :negotiating || @state == :authenticating return unless @state == :idle
@io.connect
return if @io.closed?
@write_buffer << Packet.negotiate(@parameters)
proxy_connect
when :authenticating
return unless @state == :connecting
@write_buffer << Packet.authenticate(@parameters)
when :negotiating
return unless @state == :connecting || @state == :authenticating
req, _ = @pending.first req, _ = @pending.first
request_uri = req.uri request_uri = req.uri
@write_buffer << Packet.connect(request_uri) @write_buffer << Packet.connect(request_uri)
when :open when :open
return unless :connecting return unless @state == :negotiating
@parser = nil @parser = nil
end end
log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" } log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" }
@state = nextstate super
end end
def check_version(version) def check_version(version)
raise Error, "invalid SOCKS version (#{version})" if version != 5 raise Error, "invalid SOCKS version (#{version})" if version != 5
end end
def on_error_response(error)
response = ErrorResponse.new(error, 0)
while (req, _ = @pending.shift)
@on_response.call(req, response)
end
end
end end
Parameters.register("socks5", Socks5ProxyChannel) Parameters.register("socks5", Socks5ProxyChannel)
@ -101,6 +110,10 @@ module HTTPX
def consume(*) def consume(*)
end end
def empty?
true
end
def <<(packet) def <<(packet)
emit(:packet, packet) emit(:packet, packet)
end end

View File

@ -1,29 +1,70 @@
# frozen_string_literal: true # frozen_string_literal: true
require "timeout"
module HTTPX module HTTPX
module Timeout class Timeout
class << self LOOP_TIMEOUT = 5
def by(type, **opts)
case type def self.new(opts = {})
when :null return opts if opts.is_a?(Timeout)
Null.new(opts) super
when :per_operation end
PerOperation.new(opts)
when :global def initialize(loop_timeout: 5, total_timeout: nil)
Global.new(opts) @loop_timeout = loop_timeout
when Null, Global, PerOperation @total_timeout = total_timeout
type.new(opts) reset_counter
when Hash # default way end
PerOperation.new(type)
def timeout
@loop_timeout || @total_timeout
ensure
log_time
end
def ==(other)
if other.is_a?(Timeout)
@loop_timeout == other.instance_variable_get(:@loop_timeout) &&
@total_timeout == other.instance_variable_get(:@total_timeout)
else else
raise "#{type}: unrecognized timeout option" super
end end
end end
def merge(other)
case other
when Hash
timeout = Timeout.new(other)
merge(timeout)
when Timeout
loop_timeout = other.instance_variable_get(:@loop_timeout) || @loop_timeout
total_timeout = other.instance_variable_get(:@total_timeout) || @total_timeout
Timeout.new(loop_timeout: loop_timeout, total_timeout: total_timeout)
else
raise ArgumentError, "can't merge with #{other.class}"
end
end
private
def reset_counter
@time_left = @total_timeout
end
def reset_timer
@started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def log_time
return unless @time_left
return reset_timer unless @started
@time_left -= (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started)
if @time_left <= 0
raise TimeoutError, "Timed out after #{@total_timeout} seconds"
end
reset_timer
end end
end end
end end
require "httpx/timeout/null"
require "httpx/timeout/per_operation"
require "httpx/timeout/global"

View File

@ -1,50 +0,0 @@
# frozen_string_literal: true
require "timeout"
module HTTPX::Timeout
class Global < PerOperation
TOTAL_TIMEOUT = 15
attr_reader :total_timeout
def initialize(total_timeout: TOTAL_TIMEOUT)
@total_timeout = total_timeout
reset_counter
@running = false
end
def ==(other)
other.is_a?(Global) &&
@total_timeout == other.total_timeout
end
def timeout
unless @running
reset_timer
@running = true
end
log_time
@time_left
end
private
def reset_counter
@time_left = @total_timeout
end
def reset_timer
@started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def log_time
@time_left -= (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started)
if @time_left <= 0
raise HTTPX::TimeoutError, "Timed out after using the allocated #{@total_timeout} seconds"
end
reset_timer
end
end
end

View File

@ -1,16 +0,0 @@
# frozen_string_literal: true
module HTTPX::Timeout
class Null
def initialize(**)
end
def ==(other)
other.is_a?(Null)
end
def timeout
nil
end
end
end

View File

@ -1,32 +0,0 @@
# frozen_string_literal: true
require "timeout"
module HTTPX::Timeout
class PerOperation < Null
OPERATION_TIMEOUT = 5
CONNECT_TIMEOUT = 5
attr_reader :connect_timeout, :operation_timeout
def initialize(connect: CONNECT_TIMEOUT,
operation: OPERATION_TIMEOUT)
@connect_timeout = connect
@operation_timeout = operation
@timeout = @connect_timeout
end
def timeout
timeout = @timeout
@timeout = @operation_timeout
timeout
end
def ==(other)
other.is_a?(PerOperation) &&
@connect_timeout == other.connect_timeout &&
@operation_timeout == other.operation_timeout
end
end
end

View File

@ -12,6 +12,7 @@ class HTTP1Test < HTTPTest
include Headers include Headers
include ResponseBody include ResponseBody
include IO include IO
include Timeouts
include Plugins::Proxy include Plugins::Proxy
include Plugins::Authentication include Plugins::Authentication

View File

@ -10,6 +10,7 @@ class HTTP2Test < HTTPTest
include Headers include Headers
include ResponseBody include ResponseBody
include IO include IO
include Timeouts
include Plugins::Proxy include Plugins::Proxy
include Plugins::Authentication include Plugins::Authentication

View File

@ -53,7 +53,6 @@ class OptionsSpec < Minitest::Test
:ssl => {:foo => "bar"}, :ssl => {:foo => "bar"},
) )
assert foo.merge(bar).to_hash == { assert foo.merge(bar).to_hash == {
:io => ENV.key?("HTTPX_DEBUG") ? $stderr : nil, :io => ENV.key?("HTTPX_DEBUG") ? $stderr : nil,
:debug => nil, :debug => nil,
@ -65,7 +64,7 @@ class OptionsSpec < Minitest::Test
:window_size => 16_384, :window_size => 16_384,
:body_threshold_size => 114_688, :body_threshold_size => 114_688,
:form => {:bar => "bar"}, :form => {:bar => "bar"},
:timeout => Timeout::PerOperation.new, :timeout => Timeout.new,
:ssl => {:foo => "bar", :alpn_protocols => %w[h2 http/1.1] }, :ssl => {:foo => "bar", :alpn_protocols => %w[h2 http/1.1] },
:http2_settings => { :settings_enable_push => 0 }, :http2_settings => { :settings_enable_push => 0 },
:fallback_protocol => "http/1.1", :fallback_protocol => "http/1.1",

View File

@ -59,6 +59,13 @@ module Requests
assert body["data"].bytesize < 8012, "body hasn't been compressed" assert body["data"].bytesize < 8012, "body hasn't been compressed"
end end
def test_plugin_compression_brotli
client = HTTPX.plugin(:"compression/brotli")
response = client.get("http://httpbin.org/brotli")
verify_status(response.status, 200)
body = json_body(response)
assert body["brotli"], "response should be deflated"
end
end end
end end
end end

View File

@ -0,0 +1,21 @@
# frozen_string_literal: true
module Requests
module Timeouts
# def test_http_timeouts_loop_timeout
# uri = build_uri("/delay/2")
# client = HTTPX.timeout(loop_timeout: 1)
# response = client.get(uri)
# assert response.is_a?(HTTPX::ErrorResponse), "response should have failed"
# assert response.error =~ /timed out while waiting/, "response should have timed out"
# end
def test_http_timeouts_total_timeout
uri = build_uri("/delay/3")
client = HTTPX.timeout(loop_timeout: 1, total_timeout: 2)
response = client.get(uri)
assert response.is_a?(HTTPX::ErrorResponse), "response should have failed"
assert response.error =~ /timed out after 2 seconds/i, "response should have timed out"
end
end
end