allow to return early responses just with headers; added a bufferable body, which means, when body is requests, it fetches it (if not available yet); for this, the selector must be exposed to all sub-levels; the response body by default buffers first/returns later for #to_s, and buffers and yields chunks on #each

This commit is contained in:
HoneyryderChuck 2017-12-06 17:20:03 +00:00
parent 6f133bb726
commit ee59c71274
8 changed files with 200 additions and 29 deletions

View File

@ -15,12 +15,12 @@ module HTTPX
BUFFER_SIZE = 1 << 16
class << self
def by(uri, options, &blk)
def by(selector, uri, options, &blk)
io = case uri.scheme
when "http"
TCP.new(uri, options)
TCP.new(selector, uri, options)
when "https"
SSL.new(uri, options)
SSL.new(selector, uri, options)
else
raise Error, "#{uri.scheme}: unrecognized channel"
end
@ -115,7 +115,7 @@ module HTTPX
def set_processor
return @processor if defined?(@processor)
@processor = PROTOCOLS[@io.protocol].new(@write_buffer, @options)
@processor = PROTOCOLS[@io.protocol].new(@io.selector, @write_buffer, @options)
@processor.on(:response, &@on_response)
@processor.on(:close) { throw(:close, self) }
while (request, args = @pending.shift)

View File

@ -8,8 +8,9 @@ module HTTPX
CRLF = "\r\n"
def initialize(buffer, options)
def initialize(selector, buffer, options)
@options = Options.new(options)
@selector = selector
@max_concurrent_requests = @options.max_concurrent_requests
@parser = HTTP::Parser.new(self)
@parser.header_value_type = :arrays
@ -58,8 +59,10 @@ module HTTPX
def on_headers_complete(h)
log { "headers received" }
response = Response.new(@parser.status_code, h)
response = Response.new(@selector, @parser.status_code, h)
@responses << response
request = @requests[@responses.size - 1]
emit(:response, request, response)
log { response.headers.each.map { |f, v| "-> #{f}: #{v}" }.join("\n") }
end
@ -72,7 +75,6 @@ module HTTPX
log { "parsing complete" }
request = @requests.shift
response = @responses.shift
emit(:response, request, response)
reset
send(@pending.shift) unless @pending.empty?

View File

@ -5,8 +5,9 @@ module HTTPX
class Channel::HTTP2
include Callbacks
def initialize(buffer, options)
def initialize(selector, buffer, options)
@options = Options.new(options)
@selector = selector
@max_concurrent_requests = @options.max_concurrent_requests
init_connection
@retries = options.max_retries
@ -20,7 +21,7 @@ module HTTPX
end
def empty?
@streams.empty?
@connection.state == :closed || @streams.empty?
end
def <<(data)
@ -34,9 +35,10 @@ module HTTPX
end
stream = @connection.new_stream
stream.on(:close) do |error|
response = @streams.delete(stream.id) ||
ErrorResponse.new(error, retries)
emit(:response, request, response)
unless @streams.delete(stream.id)
response = ErrorResponse.new(error, retries)
emit(:response, request, response)
end
send(@pending.shift) unless @pending.empty?
end
@ -44,7 +46,9 @@ module HTTPX
# stream.on(:altsvc)
stream.on(:headers) do |headers|
_, status = headers.shift
@streams[stream.id] = Response.new(status, headers)
response = Response.new(@selector, status, headers)
@streams[stream.id] = response
emit(:response, request, response)
end
stream.on(:data) do |data|
@streams[stream.id] << data
@ -105,8 +109,8 @@ module HTTPX
@max_concurrent_requests = [@max_concurrent_requests, @connection.remote_settings[:settings_max_concurrent_streams]].min
end
def on_close
return unless @server.state == :closed && @server.active_stream_count.zero?
def on_close(*)
return unless @connection.state == :closed && @connection.active_stream_count.zero?
log { "connection closed" }
emit(:close)
end

View File

@ -22,7 +22,7 @@ module HTTPX
# guarantee ordered responses
loop do
request = requests.shift
@connection.process_events until response = @connection.response(request)
@connection.next_tick until response = @connection.response(request)
responses << response

View File

@ -16,6 +16,10 @@ module HTTPX
@responses = {}
end
def running?
!@channels.empty?
end
# opens a channel to the IP reachable through +uri+.
# Many hostnames are reachable through the same IP, so we try to
# maximize pipelining by opening as few channels as possible.
@ -28,7 +32,7 @@ module HTTPX
uri.port == channel.remote_port &&
uri.scheme == channel.uri.scheme
end || begin
channel = Channel.by(uri, @options) do |request, response|
channel = Channel.by(self, uri, @options) do |request, response|
@responses[request] = response
end
@ -62,7 +66,7 @@ module HTTPX
end
end
def process_events(timeout: @timeout.timeout)
def next_tick(timeout: @timeout.timeout)
@selector.select(timeout) do |monitor|
if task = monitor.value
channel = catch(:close) { task.call }

View File

@ -7,10 +7,11 @@ require "ipaddr"
module HTTPX
class TCP
attr_reader :ip, :port, :uri
attr_reader :ip, :port, :uri, :selector
def initialize(uri, **)
def initialize(selector, uri, **)
@connected = false
@selector = selector
@uri = uri
@ip = TCPSocket.getaddress(@uri.host)
@port = @uri.port
@ -23,8 +24,7 @@ module HTTPX
end
def protocol
#"http/1.1"
"h2"
"http/1.1"
end
def connect
@ -88,7 +88,7 @@ module HTTPX
end
class SSL < TCP
def initialize(uri, options)
def initialize(_, _, options)
@negotiated = false
@ctx = OpenSSL::SSL::SSLContext.new
@ctx.set_params(options.ssl)

View File

@ -9,13 +9,130 @@ module HTTPX
attr_reader :status, :headers, :body
def_delegator :@body, :to_s
def initialize(status, headers)
def initialize(selector, status, headers)
@selector = selector
@status = Integer(status)
@headers = Headers.new(headers)
@body = Body.new(@selector, @headers)
end
def <<(data)
(@body ||= +"") << data
@body << data
end
class Body
MAX_THRESHOLD_SIZE = 1024 * (80 + 32) # 112 Kbytes
def initialize(selector, headers, threshold_size: MAX_THRESHOLD_SIZE)
@selector = selector
@headers = headers
@threshold_size = threshold_size
@length = 0
@buffer = nil
@state = :idle
end
def write(chunk)
@length += chunk.bytesize
transition
@buffer.write(chunk)
@chunk_cb[chunk] if @chunk_cb
end
alias :<< :write
def each
return enum_for(__method__) unless block_given?
@chunk_cb = ->(e) { yield(e) }
begin
unless @state == :idle
rewind
@buffer.each do |*args|
yield(*args)
end
end
buffering!
ensure
@chunk_cb = nil
close
end
end
def to_s
buffering!
@buffer.read
ensure
close
end
def empty?
@length.zero?
end
# closes/cleans the buffer, resets everything
def close
return if @state == :idle
@buffer.close
@buffer.unlink if @buffer.respond_to?(:unlink)
@buffer = nil
@length = 0
@state = :idle
end
def ==(other)
to_s == other.to_s
end
private
def rewind
return if @state == :idle
@buffer.rewind
end
def buffering!
@selector.next_tick until buffered?
rewind
end
def buffered?
if content_length = @headers["content-length"]
content_length = Integer(content_length)
@length >= content_length
elsif @headers["transfer-encoding"] == "chunked"
# dechunk
raise "TODO: implement de-chunking"
else
!@selector.running?
end
end
def transition
case @state
when :idle
if @length > @threshold_size
@state = :buffer
@buffer = Tempfile.new("httpx", encoding: Encoding::BINARY, mode: File::RDWR)
else
@state = :memory
@buffer = StringIO.new("".b, File::RDWR)
end
when :memory
if @length > @threshold_size
aux = @buffer
@buffer = Tempfile.new("palanca", encoding: Encoding::BINARY, mode: File::RDWR)
aux.rewind
IO.copy_stream(aux, @buffer)
# TODO: remove this if/when minor ruby is 2.3
# (this looks like a bug from older versions)
@buffer.pos = aux.pos #######################
#############################################
aux.close
@state = :buffer
end
end
return unless %i[memory buffer].include?(@state)
end
end
end
@ -34,4 +151,5 @@ module HTTPX
@retries.positive?
end
end
end

View File

@ -6,9 +6,9 @@ class ResponseTest < Minitest::Test
include HTTPX
def test_response_status
r1 = Response.new(200, {})
r1 = Response.new(selector, 200, {})
assert r1.status == 200, "unexpected status code (#{r1.status})"
r2 = Response.new("200", {})
r2 = Response.new(selector, "200", {})
assert r2.status == 200, "unexpected status code (#{r2.status})"
end
@ -17,14 +17,57 @@ class ResponseTest < Minitest::Test
end
def test_response_body_concat
assert resource.body.nil?, "body should be nil after init"
assert resource.body.empty?, "body should be empty after init"
resource << "data"
assert resource.body == "data", "body should have been updated"
end
def test_response_body_to_s
body1 = Response::Body.new(selector, {})
assert body1.empty?, "body must be empty after initialization"
body1 << "foo"
assert body1 == "foo", "body must be updated"
body1 << "foo"
body1 << "bar"
assert body1 == "foobar", "body must buffer subsequent chunks"
sel = Minitest::Mock.new
body2 = Response::Body.new(sel, "content-length" => "6")
sel.expect(:running?, true, [])
sel.expect(:next_tick, nil) do
body2 << "foobar"
true
end
assert body2.empty?, "body must be empty after initialization"
assert body2 == "foobar", "body must buffer before cast"
end
def test_response_body_each
body1 = Response::Body.new(selector, {})
body1 << "foo"
assert body1.each.to_a == %w(foo), "must yield buffer"
body1 << "foo"
body1 << "bar"
assert body1.each.to_a == %w(foobar), "must yield buffers"
sel = Minitest::Mock.new
body2 = Response::Body.new(sel, "content-length" => "6")
sel.expect(:running?, true, [])
sel.expect(:next_tick, nil) do
body2 << "foo"
body2 << "bar"
true
end
assert body2.each.to_a == %w(foo bar), "must yield buffer chunks"
end
private
def selector
Connection.new(Options.new)
end
def resource
@resource ||= Response.new(200, {})
@resource ||= Response.new(selector, 200, {})
end
end