Merge branch 'unixsock' into 'master'

Unixsock

Closes #24

See merge request honeyryderchuck/httpx!20
This commit is contained in:
HoneyryderChuck 2018-06-07 11:34:27 +00:00
commit daa843504c
9 changed files with 393 additions and 255 deletions

View File

@ -42,14 +42,15 @@ module HTTPX
class << self
def by(uri, options)
io = case uri.scheme
when "http"
IO.registry("tcp").new(uri.host, uri.port, options)
when "https"
IO.registry("ssl").new(uri.host, uri.port, options)
else
raise Error, "#{uri}: #{uri.scheme}: unrecognized channel"
type = options.transport || begin
case uri.scheme
when "http" then "tcp"
when "https" then "ssl"
else
raise Error, "#{uri}: #{uri.scheme}: unrecognized channel"
end
end
io = IO.registry(type).new(uri, options)
new(io, options)
end
end

View File

@ -1,255 +1,15 @@
# frozen_string_literal: true
require "resolv"
require "socket"
require "openssl"
require "ipaddr"
require "httpx/io/tcp"
require "httpx/io/ssl"
require "httpx/io/unix"
module HTTPX
class TCP
include Loggable
attr_reader :ip, :port
def initialize(hostname, port, options)
@state = :idle
@hostname = hostname
@options = Options.new(options)
@fallback_protocol = @options.fallback_protocol
@port = port
if @options.io
@io = case @options.io
when Hash
@ip = Resolv.getaddress(@hostname)
@options.io[@ip] || @options.io["#{@ip}:#{@port}"]
else
@ip = hostname
@options.io
end
unless @io.nil?
@keep_open = true
@state = :connected
end
else
@ip = Resolv.getaddress(@hostname)
end
@io ||= build_socket
end
def scheme
"http"
end
def to_io
@io.to_io
end
def protocol
@fallback_protocol
end
def connect
return unless closed?
begin
if @io.closed?
transition(:idle)
@io = build_socket
end
@io.connect_nonblock(Socket.sockaddr_in(@port, @ip))
rescue Errno::EISCONN
end
transition(:connected)
rescue Errno::EINPROGRESS,
Errno::EALREADY,
::IO::WaitReadable
end
if RUBY_VERSION < "2.3"
def read(size, buffer)
@io.read_nonblock(size, buffer)
buffer.bytesize
rescue ::IO::WaitReadable
0
rescue EOFError
nil
end
def write(buffer)
siz = @io.write_nonblock(buffer)
buffer.slice!(0, siz)
siz
rescue ::IO::WaitWritable
0
rescue EOFError
nil
end
else
def read(size, buffer)
ret = @io.read_nonblock(size, buffer, exception: false)
return 0 if ret == :wait_readable
return if ret.nil?
buffer.bytesize
end
def write(buffer)
siz = @io.write_nonblock(buffer, exception: false)
return 0 if siz == :wait_writable
return if siz.nil?
buffer.slice!(0, siz)
siz
end
end
def close
return if @keep_open || closed?
begin
@io.close
ensure
transition(:closed)
end
end
def connected?
@state == :connected
end
def closed?
@state == :idle || @state == :closed
end
def inspect
id = @io.closed? ? "closed" : @io.fileno
"#<TCP(fd: #{id}): #{@ip}:#{@port} (state: #{@state})>"
end
private
def build_socket
addr = IPAddr.new(@ip)
Socket.new(addr.family, :STREAM, 0)
end
def transition(nextstate)
case nextstate
# when :idle
when :connected
return unless @state == :idle
when :closed
return unless @state == :connected
end
do_transition(nextstate)
end
def do_transition(nextstate)
log(level: 1, label: "#{inspect}: ") { nextstate.to_s }
@state = nextstate
end
end
class SSL < TCP
TLS_OPTIONS = if OpenSSL::SSL::SSLContext.instance_methods.include?(:alpn_protocols)
{ alpn_protocols: %w[h2 http/1.1] }
else
{}
end
def initialize(_, _, options)
@ctx = OpenSSL::SSL::SSLContext.new
ctx_options = TLS_OPTIONS.merge(options.ssl)
@ctx.set_params(ctx_options) unless ctx_options.empty?
super
@state = :negotiated if @keep_open
end
def scheme
"https"
end
def protocol
@io.alpn_protocol || super
rescue StandardError
super
end
def close
super
# allow reconnections
# connect only works if initial @io is a socket
@io = @io.io if @io.respond_to?(:io)
@negotiated = false
end
def connected?
@state == :negotiated
end
def connect
super
if @keep_open
@state = :negotiated
return
end
return if @state == :negotiated ||
@state != :connected
unless @io.is_a?(OpenSSL::SSL::SSLSocket)
@io = OpenSSL::SSL::SSLSocket.new(@io, @ctx)
@io.hostname = @hostname
@io.sync_close = true
end
# TODO: this might block it all
@io.connect_nonblock
transition(:negotiated)
rescue ::IO::WaitReadable,
::IO::WaitWritable
end
if RUBY_VERSION < "2.3"
def read(*)
super
rescue ::IO::WaitWritable
0
end
def write(*)
super
rescue ::IO::WaitReadable
0
end
else
if OpenSSL::VERSION < "2.0.6"
def read(size, buffer)
@io.read_nonblock(size, buffer)
buffer.bytesize
rescue ::IO::WaitReadable,
::IO::WaitWritable
0
rescue EOFError
nil
end
end
end
def inspect
id = @io.closed? ? "closed" : @io.to_io.fileno
"#<SSL(fd: #{id}): #{@ip}:#{@port} state: #{@state}>"
end
private
def transition(nextstate)
case nextstate
when :negotiated
return unless @state == :connected
when :closed
return unless @state == :negotiated ||
@state == :connected
end
do_transition(nextstate)
end
end
module IO
extend Registry
register "tcp", TCP
register "ssl", SSL
register "unix", HTTPX::UNIX
end
end

107
lib/httpx/io/ssl.rb Normal file
View File

@ -0,0 +1,107 @@
# frozen_string_literal: true
require "openssl"
module HTTPX
class SSL < TCP
TLS_OPTIONS = if OpenSSL::SSL::SSLContext.instance_methods.include?(:alpn_protocols)
{ alpn_protocols: %w[h2 http/1.1] }
else
{}
end
def initialize(_, options)
@ctx = OpenSSL::SSL::SSLContext.new
ctx_options = TLS_OPTIONS.merge(options.ssl)
@ctx.set_params(ctx_options) unless ctx_options.empty?
super
@state = :negotiated if @keep_open
end
def scheme
"https"
end
def protocol
@io.alpn_protocol || super
rescue StandardError
super
end
def close
super
# allow reconnections
# connect only works if initial @io is a socket
@io = @io.io if @io.respond_to?(:io)
@negotiated = false
end
def connected?
@state == :negotiated
end
def connect
super
if @keep_open
@state = :negotiated
return
end
return if @state == :negotiated ||
@state != :connected
unless @io.is_a?(OpenSSL::SSL::SSLSocket)
@io = OpenSSL::SSL::SSLSocket.new(@io, @ctx)
@io.hostname = @hostname
@io.sync_close = true
end
# TODO: this might block it all
@io.connect_nonblock
transition(:negotiated)
rescue ::IO::WaitReadable,
::IO::WaitWritable
end
if RUBY_VERSION < "2.3"
def read(*)
super
rescue ::IO::WaitWritable
0
end
def write(*)
super
rescue ::IO::WaitReadable
0
end
else
if OpenSSL::VERSION < "2.0.6"
def read(size, buffer)
@io.read_nonblock(size, buffer)
buffer.bytesize
rescue ::IO::WaitReadable,
::IO::WaitWritable
0
rescue EOFError
nil
end
end
end
def inspect
id = @io.closed? ? "closed" : @io.to_io.fileno
"#<SSL(fd: #{id}): #{@ip}:#{@port} state: #{@state}>"
end
private
def transition(nextstate)
case nextstate
when :negotiated
return unless @state == :connected
when :closed
return unless @state == :negotiated ||
@state == :connected
end
do_transition(nextstate)
end
end
end

148
lib/httpx/io/tcp.rb Normal file
View File

@ -0,0 +1,148 @@
# frozen_string_literal: true
require "resolv"
require "ipaddr"
module HTTPX
class TCP
include Loggable
attr_reader :ip, :port
alias_method :host, :ip
def initialize(uri, options)
@state = :idle
@hostname = uri.host
@options = Options.new(options)
@fallback_protocol = @options.fallback_protocol
@port = uri.port
if @options.io
@io = case @options.io
when Hash
@ip = Resolv.getaddress(@hostname)
@options.io[@ip] || @options.io["#{@ip}:#{@port}"]
else
@ip = @hostname
@options.io
end
unless @io.nil?
@keep_open = true
@state = :connected
end
else
@ip = Resolv.getaddress(@hostname)
end
@io ||= build_socket
end
def scheme
"http"
end
def to_io
@io.to_io
end
def protocol
@fallback_protocol
end
def connect
return unless closed?
begin
if @io.closed?
transition(:idle)
@io = build_socket
end
@io.connect_nonblock(Socket.sockaddr_in(@port, @ip))
rescue Errno::EISCONN
end
transition(:connected)
rescue Errno::EINPROGRESS,
Errno::EALREADY,
::IO::WaitReadable
end
if RUBY_VERSION < "2.3"
def read(size, buffer)
@io.read_nonblock(size, buffer)
buffer.bytesize
rescue ::IO::WaitReadable
0
rescue EOFError
nil
end
def write(buffer)
siz = @io.write_nonblock(buffer)
buffer.slice!(0, siz)
siz
rescue ::IO::WaitWritable
0
rescue EOFError
nil
end
else
def read(size, buffer)
ret = @io.read_nonblock(size, buffer, exception: false)
return 0 if ret == :wait_readable
return if ret.nil?
buffer.bytesize
end
def write(buffer)
siz = @io.write_nonblock(buffer, exception: false)
return 0 if siz == :wait_writable
return if siz.nil?
buffer.slice!(0, siz)
siz
end
end
def close
return if @keep_open || closed?
begin
@io.close
ensure
transition(:closed)
end
end
def connected?
@state == :connected
end
def closed?
@state == :idle || @state == :closed
end
def inspect
id = @io.closed? ? "closed" : @io.fileno
"#<TCP(fd: #{id}): #{@ip}:#{@port} (state: #{@state})>"
end
private
def build_socket
addr = IPAddr.new(@ip)
Socket.new(addr.family, :STREAM, 0)
end
def transition(nextstate)
case nextstate
# when :idle
when :connected
return unless @state == :idle
when :closed
return unless @state == :connected
end
do_transition(nextstate)
end
def do_transition(nextstate)
log(level: 1, label: "#{inspect}: ") { nextstate.to_s }
@state = nextstate
end
end
end

56
lib/httpx/io/unix.rb Normal file
View File

@ -0,0 +1,56 @@
require "forwardable"
module HTTPX
class UNIX < TCP
extend Forwardable
def_delegator :@uri, :port, :scheme
def initialize(uri, options)
@uri = uri
@state = :idle
@options = Options.new(options)
@path = @options.transport_options[:path]
@fallback_protocol = @options.fallback_protocol
if @options.io
@io = case @options.io
when Hash
@options.io[@path]
else
@options.io
end
unless @io.nil?
@keep_open = true
@state = :connected
end
end
@io ||= build_socket
end
def hostname
@uri.host
end
def connect
return unless closed?
begin
if @io.closed?
transition(:idle)
@io = build_socket
end
@io.connect_nonblock(Socket.sockaddr_un(@path))
rescue Errno::EISCONN
end
transition(:connected)
rescue Errno::EINPROGRESS,
Errno::EALREADY,
::IO::WaitReadable
end
private
def build_socket
Socket.new(Socket::PF_UNIX, :STREAM, 0)
end
end
end

View File

@ -53,6 +53,8 @@ module HTTPX
:headers_class => Class.new(Headers),
:request_body_class => Class.new(Request::Body),
:response_body_class => Class.new(Response::Body),
:transport => nil,
:transport_options => nil,
}
defaults.merge!(options)
@ -82,11 +84,17 @@ module HTTPX
self.body_threshold_size = Integer(num)
end
def_option(:transport) do |tr|
transport = tr.to_s
raise Error, "#{transport} is an unsupported transport type" unless IO.registry.keys.include?(transport)
self.transport = transport
end
%w[
params form json body
follow ssl http2_settings
request_class response_class headers_class request_body_class response_body_class
io fallback_protocol debug debug_level
io fallback_protocol debug debug_level transport_options
].each do |method_name|
def_option(method_name)
end

View File

@ -55,7 +55,7 @@ module HTTPX
parameters = Parameters.new(**proxy)
uri = parameters.uri
log { "proxy: #{uri}" }
io = TCP.new(uri.host, uri.port, @options)
io = TCP.new(uri, @options)
proxy_type = Parameters.registry(parameters.uri.scheme)
channel = proxy_type.new(io, parameters, @options.merge(options), &method(:on_response))
@connection.__send__(:register_channel, channel)
@ -113,7 +113,7 @@ module HTTPX
class ProxySSL < SSL
def initialize(tcp, request_uri, options)
@io = tcp.to_io
super(tcp.ip, tcp.port, options)
super(tcp, options)
@hostname = request_uri.host
@state = :connected
end

56
test/io/unix_test.rb Normal file
View File

@ -0,0 +1,56 @@
# frozen_string_literal: true
require "tempfile"
require_relative "../test_helper"
class UnixTest < Minitest::Test
include HTTPX
def test_unix_client
on_unix_server do |path|
client = Client.new(transport: "unix", transport_options: { path: path })
response = client.get("http://unix.com/ping")
assert response.status == 200, "unexpected code (#{response.status})"
assert response.to_s == "pong", "unexpected body (#{response})"
response.close
client.close
end
end
private
RESPONSE_HEADER = <<-HTTP.lines.map.map(&:chomp).join("\r\n") << ("\r\n" * 2)
HTTP/1.1 200 OK
Date: Mon, 27 Jul 2009 12:28:53 GMT
Content-Length: 4
Content-Type: text/plain
Connection: close
HTTP
def on_unix_server
mutex = Mutex.new
resource = ConditionVariable.new
path = File.join(Dir.tmpdir, "httpx-unix.sock")
server = UNIXServer.new(path)
begin
th = Thread.start do
mutex.synchronize do
resource.signal
end
socket = server.accept
socket.readpartial(4096) # drain the socket for the request
socket.write(RESPONSE_HEADER)
socket.write("pong")
socket.close
end
mutex.synchronize do
resource.wait(mutex)
end
yield server.path
ensure
server.close
File.unlink(path)
th.terminate
end
end
end

View File

@ -2,7 +2,7 @@
require_relative "test_helper"
class OptionsSpec < Minitest::Test
class OptionsTest < Minitest::Test
include HTTPX
def test_options_body
@ -75,6 +75,8 @@ class OptionsSpec < Minitest::Test
:headers_class => bar.headers_class,
:request_body_class => bar.request_body_class,
:response_body_class => bar.response_body_class,
:transport => nil,
:transport_options => nil,
}, "options haven't merged correctly"
end