removed the tcp and ssl channels, added proper IO wrappers (which now enables the possibility of IO customization), and channel just got a bit cleaner

This commit is contained in:
HoneyryderChuck 2017-11-29 23:05:30 +00:00
parent bd67d3d745
commit 95d141b4d9
4 changed files with 278 additions and 257 deletions

View File

@ -1,21 +1,127 @@
# frozen_string_literal: true
module HTTPX::Channel
module_function
require "httpx/io"
def by(uri, options, &blk)
case uri.scheme
when "http"
TCP.new(uri, options, &blk)
when "https"
SSL.new(uri, options, &blk)
else
raise Error, "#{uri.scheme}: unrecognized channel"
module HTTPX
class Channel
require "httpx/channel/http2"
require "httpx/channel/http1"
PROTOCOLS = {
"h2" => HTTP2,
"http/1.1" => HTTP1
}
BUFFER_SIZE = 1 << 16
class << self
def by(uri, options, &blk)
io = case uri.scheme
when "http"
TCP.new(uri, options)
when "https"
SSL.new(uri, options)
else
raise Error, "#{uri.scheme}: unrecognized channel"
end
new(io, options, &blk)
end
end
def initialize(io, options, &on_response)
@io = io
@options = HTTPX::Options.new(options)
@read_buffer = +""
@write_buffer = +""
@pending = []
@on_response = on_response
end
def to_io
if @io.closed?
@io.connect
set_processor unless @io.closed?
end
@io.to_io
end
def uri
@io.uri
end
def remote_ip
@io.ip
end
def remote_port
@io.port
end
def close
if processor = @processor
processor.close
@processor = nil
end
@io.close
unless processor.empty?
@io.connect
@processor = processor
@processor.reenqueue!
end
end
def closed?
!@io || @io.closed?
end
def empty?
@write_buffer.empty?
end
def send(request)
if @processor
@processor.send(request)
else
@pending << request
end
end
def call
return if closed?
dread
dwrite
nil
end
private
def dread(size = BUFFER_SIZE)
loop do
siz = @io.read(size, @read_buffer)
throw(:close, self) unless siz
return if siz.zero?
@processor << @read_buffer
end
end
def dwrite
loop do
return if @write_buffer.empty?
siz = @io.write(@write_buffer)
throw(:close, self) unless siz
return if siz.zero?
end
end
def set_processor
return @processor if defined?(@processor)
@processor = PROTOCOLS[@io.protocol].new(@write_buffer)
@processor.on(:response, &@on_response)
@processor.on(:close) { throw(:close, self) }
while request = @pending.shift
@processor.send(request)
end
@processor
end
end
end
require "httpx/channel/http2"
require "httpx/channel/http1"
require "httpx/channel/tcp"
require "httpx/channel/ssl"

View File

@ -1,59 +0,0 @@
# frozen_string_literal: true
require "forwardable"
require "openssl"
module HTTPX::Channel
class SSL < TCP
def protocol
@io.alpn_protocol
end
if OpenSSL::VERSION < "2.0.6"
# OpenSSL < 2.0.6 has a leak in the buffer destination data.
# It has been fixed as of 2.0.6: https://github.com/ruby/openssl/pull/153
def dread(size = BUFFER_SIZE)
begin
loop do
@io.read_nonblock(size, @read_buffer)
@processor << @read_buffer
end
rescue IO::WaitReadable
return
rescue EOFError
# EOF
throw(:close, self)
end
end
end
private
def connect
ssl = @options.ssl
ctx = OpenSSL::SSL::SSLContext.new
ctx.set_params(ssl)
ctx.alpn_protocols = %w[h2 http/1.1] if ctx.respond_to?(:alpn_protocols=)
ctx.alpn_select_cb = lambda do |pr|
pr.first unless pr.nil? || pr.empty?
end if ctx.respond_to?(:alpn_select_cb=)
super
return if @closed
@io = OpenSSL::SSL::SSLSocket.new(@io, ctx)
@io.hostname = uri.host
@io.sync_close = true
@io.connect # TODO: non-block variant missing
rescue IO::WaitWritable
end
def perform_io
yield
rescue IO::WaitReadable, IO::WaitWritable
# wait read/write
rescue EOFError
# EOF
@io.close
end
end
end

View File

@ -1,183 +0,0 @@
# frozen_string_literal: true
require "ipaddr"
require "forwardable"
module HTTPX::Channel
PROTOCOLS = {
"h2" => HTTP2,
"http/1.1" => HTTP1
}
class TCP
extend Forwardable
include HTTPX::Callbacks
BUFFER_SIZE = 1 << 16
attr_reader :uri, :remote_ip, :remote_port
def to_io
if @closed
connect
set_processor unless @closed
end
@io.to_io
end
def initialize(uri, options, &on_response)
@closed = true
@uri = uri
@options = HTTPX::Options.new(options)
@read_buffer = +""
@write_buffer = +""
@pending = []
@on_response = on_response
set_remote_info
addr = IPAddr.new(@remote_ip)
@io = Socket.new(addr.family, :STREAM, 0)
end
def protocol
"http/1.1"
end
def closed?
@closed
end
def close
if processor = @processor
processor.close
@processor = nil
end
@io.close
@closed = true
unless processor.empty?
connect
@processor = processor
@processor.reenqueue!
end
end
def empty?
@write_buffer.empty?
end
def send(request)
if @processor
@processor.send(request)
else
@pending << request
end
end
def call
return if @closed
dread
dwrite
nil
end
if RUBY_VERSION < "2.3"
def dread(size = BUFFER_SIZE)
begin
loop do
@io.read_nonblock(size, @read_buffer)
@processor << @read_buffer
end
rescue IO::WaitReadable
return
rescue EOFError
# EOF
throw(:close, self)
end
end
def dwrite
begin
loop do
return if @write_buffer.empty?
siz = @io.write_nonblock(@write_buffer)
@write_buffer.slice!(0, siz)
end
rescue IO::WaitWritable
return
rescue EOFError
# EOF
throw(:close, self)
end
end
else
def dread(size = BUFFER_SIZE)
loop do
buf = @io.read_nonblock(size, @read_buffer, exception: false)
case buf
when :wait_readable
return
when nil
throw(:close, self)
else
@processor << @read_buffer
end
end
end
def dwrite
loop do
return if @write_buffer.empty?
siz = @io.write_nonblock(@write_buffer, exception: false)
case siz
when :wait_writable
return
when nil
throw(:close, self)
else
@write_buffer.slice!(0, siz)
end
end
end
end
def connect
return unless @closed
begin
@io.connect_nonblock(Socket.sockaddr_in(@remote_port, @remote_ip))
rescue Errno::EISCONN
end
@options.timeout # force renovation
@read_buffer.clear
@write_buffer.clear
@closed = false
rescue Errno::EINPROGRESS,
Errno::EALREADY,
IO::WaitReadable
end
def set_processor
return @processor if defined?(@processor)
@processor = PROTOCOLS[protocol].new(@write_buffer)
@processor.on(:response, &@on_response)
@processor.on(:close) { throw(:close, self) }
while request = @pending.shift
@processor.send(request)
end
@processor
end
def set_remote_info
@remote_ip = TCPSocket.getaddress(@uri.host)
@remote_port = @uri.port
end
def perform_io
yield
rescue IO::WaitReadable, IO::WaitWritable
# wait read/write
rescue EOFError
# EOF
@io.close
end
end
end

157
lib/httpx/io.rb Normal file
View File

@ -0,0 +1,157 @@
# frozen_string_literal: true
require "socket"
require "openssl"
require "ipaddr"
module HTTPX
class TCP
attr_reader :ip, :port, :uri
def initialize(uri, **)
@connected = false
@uri = uri
@ip = TCPSocket.getaddress(@uri.host)
@port = @uri.port
addr = IPAddr.new(@ip)
@io = Socket.new(addr.family, :STREAM, 0)
end
def to_io
@io.to_io
end
def protocol
#"http/1.1"
"h2"
end
def connect
return if @connected
begin
@io.connect_nonblock(Socket.sockaddr_in(@port, @ip))
rescue Errno::EISCONN
end
@connected = true
rescue Errno::EINPROGRESS,
Errno::EALREADY,
IO::WaitReadable
end
if RUBY_VERSION < "2.3"
def read(size, buffer)
@io.read_nonblock(size, buffer)
buffer.bytesize
rescue IO::WaitReadable
0
rescue EOFError
nil
end
def write(buffer)
siz = @io.write_nonblock(buffer)
buffer.slice!(0, siz)
siz
rescue IO::WaitWritable
0
rescue EOFError
nil
end
else
def read(size, buffer)
ret = @io.read_nonblock(size, buffer, exception: false)
return 0 if ret == :wait_readable
return if ret.nil?
buffer.bytesize
end
def write(buffer)
siz = @io.write_nonblock(buffer, exception: false)
return 0 if siz == :wait_writable
return if siz.nil?
buffer.slice!(0, siz)
siz
end
end
def close
@io.close
ensure
@connected = false
end
def closed?
!@connected
end
end
class SSL < TCP
def initialize(uri, options)
@negotiated = false
@ctx = OpenSSL::SSL::SSLContext.new
@ctx.set_params(options.ssl)
@ctx.alpn_protocols = %w[h2 http/1.1] if @ctx.respond_to?(:alpn_protocols=)
@ctx.alpn_select_cb = lambda do |pr|
pr.first unless pr.nil? || pr.empty?
end if @ctx.respond_to?(:alpn_select_cb=)
super
end
def protocol
@io.alpn_protocol
rescue
super
end
def close
super
# allow reconnections
# connect only works if initial @io is a socket
@io = @io.io
@negotiated = false
end
def connect
super
return if not @connected
return if @negotiated
@io = OpenSSL::SSL::SSLSocket.new(@io, @ctx)
@io.hostname = @uri.host
@io.sync_close = true
@io.connect
@negotiated = true
end
if RUBY_VERSION < "2.3"
def read(*)
super
rescue IO::WaitWritable
0
end
def write(*)
super
rescue IO::WaitReadable
0
end
else
if OpenSSL::VERSION < "2.0.6"
def read(size, buffer)
@io.read_nonblock(size, buffer)
buffer.bytesize
rescue IO::WaitReadable, IO::WaitWritable
0
rescue EOFError
nil
end
end
end
def closed?
super || !@negotiated
end
end
end