changed the resource structure

This commit is contained in:
HoneyryderChuck 2017-11-28 14:36:18 +00:00
parent 46a1223187
commit 85f08f0c7a
6 changed files with 69 additions and 65 deletions

View File

@ -3,7 +3,6 @@
require "httpx/version"
require "httpx/callbacks"
require "httpx/scheme"
require "httpx/connection"
require "httpx/headers"
require "httpx/request"

19
lib/httpx/channel.rb Normal file
View File

@ -0,0 +1,19 @@
# frozen_string_literal: true
module HTTPX::Channel
module_function
def by(uri)
case uri.scheme
when "http"
TCP.new(uri)
when "https"
TLS.new(uri)
else
raise "#{uri.scheme}: unrecognized channel"
end
end
end
require "httpx/channel/http2"
require "httpx/channel/tcp"

View File

@ -2,12 +2,10 @@
require "http/2"
module HTTPX
class Connection::HTTP2
class Channel::HTTP2
include Callbacks
attr_accessor :buffer
def initialize
def initialize(buffer)
@connection = HTTP2::Client.new
@connection.on(:frame, &method(:on_frame))
@connection.on(:frame_sent, &method(:on_frame_sent))
@ -15,10 +13,7 @@ module HTTPX
@connection.on(:promise, &method(:on_promise))
@connection.on(:altsvc, &method(:on_altsvc))
@streams = {}
end
def empty?
@buffer.empty?
@buffer = buffer
end
def <<(data)
@ -104,6 +99,7 @@ module HTTPX
end
def log(&msg)
return unless $HTTPX_DEBUG
$stderr << (+"connection (HTTP/2): " << msg.call << "\n")
end
end

View File

@ -2,21 +2,21 @@
require "forwardable"
module HTTPX
class Scheme::HTTP
module HTTPX::Channel
PROTOCOLS = {
"h2" => HTTP2
}
class TCP
extend Forwardable
include Callbacks
include HTTPX::Callbacks
BUFFER_SIZE = 1 << 16
attr_reader :processor
attr_reader :remote_ip, :remote_port
def_delegator :@io, :to_io
def_delegator :@processor, :empty?
def initialize(uri)
@io = TCPSocket.new(uri.host, uri.port)
_, @remote_port, _,@remote_ip = @io.peeraddr
@ -24,16 +24,23 @@ module HTTPX
@write_buffer = +""
end
def processor=(processor)
processor.buffer = @write_buffer
@processor = processor
def close
@io.close
end
def protocol
"h2"
end
def send(request)
def empty?
@write_buffer.empty?
end
def send(request, &block)
if @processor.nil?
@processor = PROTOCOLS[protocol].new(@write_buffer)
@processor.on(:response, &block)
end
@processor.send(request)
end
@ -48,7 +55,7 @@ module HTTPX
# wait read/write
rescue EOFError
# EOF
@io.close
throw(:close, self)
end
end
@ -63,7 +70,7 @@ module HTTPX
# wait read/write
rescue EOFError
# EOF
@io.close
throw(:close, self)
end
end
else
@ -74,8 +81,7 @@ module HTTPX
when :wait_readable
return
when nil
@io.close
return
throw(:close, self)
else
@processor << @read_buffer
end
@ -90,8 +96,7 @@ module HTTPX
when :wait_writable
return
when nil
@io.close
return
throw(:close, self)
else
@write_buffer.slice!(0, siz)
end

View File

@ -3,31 +3,32 @@
require "socket"
require "timeout"
require "httpx/channel"
module HTTPX
class Connection
require "httpx/connection/http2"
PROTOCOLS = {
"h2" => HTTP2
}
CONNECTION_TIMEOUT = 2
def initialize(**options)
@options = options
@connection_timeout = options.fetch(:connection_timeout, CONNECTION_TIMEOUT)
@channels = {}
@channels = []
@responses = {}
end
# opens a channel to the IP reachable through +uri+.
# Many hostnames are reachable through the same IP, so we try to
# maximize pipelining by opening as few channels as possible.
#
def bind(uri)
uri = URI(uri)
ip = TCPSocket.getaddress(uri.host)
return @channels.values.find do |io|
return @channels.find do |io|
ip == io.remote_ip && uri.port == io.remote_port
end || begin
scheme = Scheme.by(uri)
@channels[scheme.to_io] = scheme
channel = Channel.by(uri)
@channels << channel
channel
end
end
@ -35,14 +36,9 @@ module HTTPX
channel = bind(request.uri)
raise "no channel available" unless channel
channel.processor ||= begin
pr = PROTOCOLS[channel.protocol].new
pr.on(:response) do |request, response|
@responses[request] = response
end
pr
channel.send(request) do |request, response|
@responses[request] = response
end
channel.send(request)
end
def response(request)
@ -50,16 +46,23 @@ module HTTPX
end
def process_events(timeout: @connection_timeout)
rmonitors = @channels.values
rmonitors = @channels
wmonitors = rmonitors.reject(&:empty?)
readers, writers = IO.select(rmonitors, wmonitors, nil, timeout)
raise Timeout::Error, "timed out waiting for data" if readers.nil? && writers.nil?
readers.each do |reader|
reader.dread
channel = catch(:close) { reader.dread }
close(channel) if channel
end if readers
writers.each do |writer|
writer.dwrite
channel = catch(:close) { writer.dwrite }
close(channel) if channel
end if writers
end
def close(channel)
@channels.delete(channel)
@channel.close
end
end
end

View File

@ -1,18 +0,0 @@
# frozen_string_literal: true
module HTTPX::Scheme
module_function
def by(uri)
case uri.scheme
when "http"
HTTP.new(uri)
when "https"
HTTPS.new(uri)
else
raise "unrecognized Scheme"
end
end
end
require "httpx/scheme/http"