added http/1 channel

This commit is contained in:
HoneyryderChuck 2017-11-28 16:41:24 +00:00
parent f3255ff182
commit 70a2d7ad82
6 changed files with 140 additions and 19 deletions

View File

@ -3,12 +3,12 @@
module HTTPX::Channel
module_function
def by(uri, *options)
def by(uri, **options)
case uri.scheme
when "http"
TCP.new(uri, *options)
TCP.new(uri, **options)
when "https"
SSL.new(uri, *options)
SSL.new(uri, **options)
else
raise "#{uri.scheme}: unrecognized channel"
end
@ -16,5 +16,6 @@ module HTTPX::Channel
end
require "httpx/channel/http2"
require "httpx/channel/http1"
require "httpx/channel/tcp"
require "httpx/channel/ssl"

View File

@ -0,0 +1,95 @@
# frozen_string_literal: true
require "http_parser"
module HTTPX
class Channel::HTTP1
include Callbacks
CRLF = "\r\n"
def initialize(buffer, version: [1,1], **)
@parser = HTTP::Parser.new(self)
@parser.header_value_type = :arrays
@buffer = buffer
@version = version
end
def reset
@request = nil
@response = nil
@parser.reset!
end
alias :close :reset
def <<(data)
@parser << data
end
def send(request)
@request = request
join_headers(request)
join_body(request)
end
def on_message_begin
log { "parsing begins" }
end
def on_headers_complete(h)
log { "headers received" }
@response = Response.new(@parser.status_code, h)
log { @response.headers.each.map { |f, v| "-> #{f}: #{v}" }.join("\n") }
end
def on_body(chunk)
log { "-> #{chunk.inspect}" }
@response << chunk
end
def on_message_complete
log { "parsing complete" }
emit(:response, @request, @response)
response = @response
reset
if response.headers["connection"] == "close"
throw(:close)
end
end
private
def join_headers(request)
request.headers["host"] ||= request.authority
buffer = +""
buffer << "#{request.verb.to_s.upcase} #{request.path} HTTP/#{@version.join(".")}" << CRLF
log { "<- #{buffer.inspect}" }
@buffer << buffer
buffer.clear
request.headers.each do |field, value|
buffer << "#{capitalized(field)}: #{value}" << CRLF
log { "<- #{buffer.inspect}" }
@buffer << buffer
buffer.clear
end
@buffer << CRLF
end
def join_body(request)
return unless request.body
request.body.each do |chunk|
log { "<- #{chunk}" }
@buffer << chunk
end
end
def capitalized(field)
field.to_s.split("-").map(&:capitalize).join("-")
end
def log(&msg)
return unless $HTTPX_DEBUG
$stderr << (+"" << msg.call << "\n")
end
end
end

View File

@ -16,6 +16,10 @@ module HTTPX
@buffer = buffer
end
def close
@connection.goaway
end
def <<(data)
@connection << data
end
@ -36,26 +40,39 @@ module HTTPX
stream.on(:data) do |data|
@streams[stream] << data
end
headers = {}
headers[":scheme"] = request.scheme
headers[":method"] = request.verb.to_s.upcase
headers[":path"] = request.path
headers[":authority"] = request.authority
headers = headers.merge(request.headers)
join_headers
if body = request.body
headers["content-length"] = String(body.bytesize) if body.respond_to?(:bytesize)
# TODO: expect-continue
stream.data(headers, end_stream: false)
stream.data(body.to_s, end_stream: true)
stream.headers(headers, end_stream: false)
body.each do |chunk|
stream.data(chunk, end_stream: false)
end
stream.data("", end_stream: true)
else
stream.headers(headers, end_stream: true)
end
end
private
def join_headers(stream, request)
headers = {}
headers[":scheme"] = request.scheme
headers[":method"] = request.verb.to_s.upcase
headers[":path"] = request.path
headers[":authority"] = request.authority
headers = headers.merge(request.headers)
stream.headers(headers, end_stream: !request.body)
end
def join_body(stream, request)
return unless request.body
request.body.each do |chunk|
stream.data(chunk, end_stream: false)
end
stream.data("", end_stream: true)
end
######
# HTTP/2 Callbacks
######
@ -96,6 +113,7 @@ module HTTPX
end
def on_promise(stream)
# TODO: policy for handling promises
end
def log(&msg)

View File

@ -5,7 +5,7 @@ require "openssl"
module HTTPX::Channel
class SSL < TCP
def initialize(uri, ssl = {}, **)
def initialize(uri, ssl: {}, **)
ctx = OpenSSL::SSL::SSLContext.new
ctx.set_params(ssl)
ctx.alpn_protocols = %w[h2 http/1.1] if ctx.respond_to?(:alpn_protocols=)

View File

@ -4,7 +4,8 @@ require "forwardable"
module HTTPX::Channel
PROTOCOLS = {
"h2" => HTTP2
"h2" => HTTP2,
"http/1.1" => HTTP1
}
class TCP
@ -17,15 +18,16 @@ module HTTPX::Channel
def_delegator :@io, :to_io
def initialize(uri, *)
def initialize(uri, **)
@io = TCPSocket.new(uri.host, uri.port)
_, @remote_port, _,@remote_ip = @io.peeraddr
@read_buffer = +""
@write_buffer = +""
@protocol = "h2"
@protocol = "http/1.1"
end
def close
@processor.close if @processor
@io.close
end

View File

@ -2,6 +2,8 @@
module HTTPX
class Request
USER_AGENT = "httpx.rb/#{VERSION}"
attr_reader :verb, :uri, :headers, :body
def initialize(verb, uri, headers: {}, **options)
@ -9,6 +11,9 @@ module HTTPX
@uri = URI(uri)
@headers = Headers.new(headers)
@body = nil
@headers["user-agent"] ||= USER_AGENT
@headers["accept"] ||= "*/*"
end
def scheme