enabled pipelining; allowed for reconnections in HTTP/1 which require to close the connection (which should be the exception)

This commit is contained in:
HoneyryderChuck 2017-11-29 02:17:45 +00:00
parent c0b1e12130
commit 149cfb602d
6 changed files with 106 additions and 61 deletions

View File

@ -12,48 +12,60 @@ module HTTPX
@parser.header_value_type = :arrays
@buffer = buffer
@version = version
@requests = []
@responses = []
end
def reset
@request = nil
@response = nil
@parser.reset!
end
alias :close :reset
def empty?
@requests.empty?
end
def <<(data)
@parser << data
end
def send(request)
@request = request
@requests << request
join_headers(request)
join_body(request)
end
def reenqueue!
requests = @requests.dup
@requests.clear
requests.each do |request|
send(request)
end
end
def on_message_begin
log { "parsing begins" }
end
def on_headers_complete(h)
log { "headers received" }
@response = Response.new(@parser.status_code, h)
log { @response.headers.each.map { |f, v| "-> #{f}: #{v}" }.join("\n") }
response = Response.new(@parser.status_code, h)
@responses << response
log { response.headers.each.map { |f, v| "-> #{f}: #{v}" }.join("\n") }
end
def on_body(chunk)
log { "-> #{chunk.inspect}" }
@response << chunk
@responses.last << chunk
end
def on_message_complete
log { "parsing complete" }
emit(:response, @request, @response)
response = @response
request = @requests.shift
response = @responses.shift
emit(:response, request, response)
reset
if response.headers["connection"] == "close"
throw(:close)
end
emit(:close) if response.headers["connection"] == "close"
end
private

View File

@ -6,12 +6,7 @@ module HTTPX
include Callbacks
def initialize(buffer)
@connection = HTTP2::Client.new
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:promise, &method(:on_promise))
@connection.on(:altsvc, &method(:on_altsvc))
init_connection
@streams = {}
@buffer = buffer
end
@ -20,6 +15,10 @@ module HTTPX
@connection.goaway
end
def empty?
@streams.empty?
end
def <<(data)
@connection << data
end
@ -44,8 +43,26 @@ module HTTPX
join_body(stream, request)
end
def reenqueue!
requests = @streams.values
@streams.clear
init_connection
requests.each do |request|
send(request)
end
end
private
def init_connection
@connection = HTTP2::Client.new
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@connection.on(:frame_received, &method(:on_frame_received))
@connection.on(:promise, &method(:on_promise))
@connection.on(:altsvc, &method(:on_altsvc))
end
def join_headers(stream, request)
headers = {}
headers[":scheme"] = request.scheme

View File

@ -5,9 +5,31 @@ require "openssl"
module HTTPX::Channel
class SSL < TCP
def initialize(uri, options)
@timeout = options.timeout
@options = HTTPX::Options.new(options)
def protocol
@io.alpn_protocol
end
if OpenSSL::VERSION < "2.0.6"
# OpenSSL < 2.0.6 has a leak in the buffer destination data.
# It has been fixed as of 2.0.6: https://github.com/ruby/openssl/pull/153
def dread(size = BUFFER_SIZE)
begin
loop do
@io.read_nonblock(size, @read_buffer)
@processor << @read_buffer
end
rescue IO::WaitReadable
return
rescue EOFError
# EOF
throw(:close, self)
end
end
end
private
def connect
ssl = @options.ssl
ctx = OpenSSL::SSL::SSLContext.new
ctx.set_params(ssl)
@ -25,38 +47,6 @@ module HTTPX::Channel
end
end
def protocol
@io.alpn_protocol
end
def send(request, &block)
if @processor.nil?
@processor = PROTOCOLS[protocol].new(@write_buffer)
@processor.on(:response, &block)
end
@processor.send(request)
end
if OpenSSL::VERSION < "2.0.6"
# OpenSSL < 2.0.6 has a leak in the buffer destination data.
# It has been fixed as of 2.0.6: https://github.com/ruby/openssl/pull/153
def dread(size = BUFFER_SIZE)
begin
loop do
@io.read_nonblock(size, @read_buffer)
@processor << @read_buffer
end
rescue IO::WaitReadable
# wait read/write
rescue EOFError
# EOF
throw(:close, self)
end
end
end
private
def perform_io
yield
rescue IO::WaitReadable, IO::WaitWritable

View File

@ -19,15 +19,17 @@ module HTTPX::Channel
def_delegator :@io, :to_io
def initialize(uri, options)
@closed = false
@uri = uri
@options = HTTPX::Options.new(options)
@timeout = options.timeout
@timeout.connect do
@io = TCPSocket.new(uri.host, uri.port)
end
@read_buffer = +""
@write_buffer = +""
@protocol = "http/1.1"
connect
end
def protocol
"http/1.1"
end
def remote_ip
@ -44,9 +46,23 @@ module HTTPX::Channel
end
end
def closed?
@closed
end
def close
@processor.close if @processor
if processor = @processor
processor.close
@processor = nil
end
@io.close
@closed = true
unless processor.empty?
connect
@processor = processor
@processor.reenqueue!
@closed = false
end
end
def empty?
@ -57,6 +73,7 @@ module HTTPX::Channel
if @processor.nil?
@processor = PROTOCOLS[protocol].new(@write_buffer)
@processor.on(:response, &block)
@processor.on(:close) { throw(:close, self) }
end
@processor.send(request)
end
@ -69,7 +86,7 @@ module HTTPX::Channel
@processor << @read_buffer
end
rescue IO::WaitReadable
# wait read/write
return
rescue EOFError
# EOF
throw(:close, self)
@ -84,7 +101,7 @@ module HTTPX::Channel
@write_buffer.slice!(0, siz)
end
rescue IO::WaitWritable
# wait read/write
return
rescue EOFError
# EOF
throw(:close, self)
@ -123,6 +140,14 @@ module HTTPX::Channel
private
def connect
@timeout.connect do
@io = TCPSocket.new(uri.host, uri.port)
end
@read_buffer.clear
@write_buffer.clear
end
def set_remote_info
_, @remote_port, _,@remote_ip = @io.peeraddr
end

View File

@ -25,6 +25,7 @@ module HTTPX
@connection.process_events until response = @connection.response(request)
responses << response
break if requests.empty?
end
requests.size == 1 ? responses.first : responses

View File

@ -45,7 +45,7 @@ module HTTPX
@responses.delete(request)
end
def process_events(timeout: @timeout.timeout)
def process_events(timeout: @timeout.timeout)
rmonitors = @channels
wmonitors = rmonitors.reject(&:empty?)
readers, writers = IO.select(rmonitors, wmonitors, nil, timeout)
@ -62,8 +62,8 @@ module HTTPX
def close(channel = nil)
if channel
@channels.delete(channel)
channel.close
@channels.delete(channel) if channel.closed?
else
while ch = @channels.shift
ch.close