diff --git a/lib/httpx/channel.rb b/lib/httpx/channel.rb index 4af85a7a..73471de0 100644 --- a/lib/httpx/channel.rb +++ b/lib/httpx/channel.rb @@ -15,12 +15,12 @@ module HTTPX BUFFER_SIZE = 1 << 16 class << self - def by(selector, uri, options, &blk) + def by(uri, options, &blk) io = case uri.scheme when "http" - TCP.new(selector, uri, options) + TCP.new(uri, options) when "https" - SSL.new(selector, uri, options) + SSL.new(uri, options) else raise Error, "#{uri.scheme}: unrecognized channel" end @@ -115,7 +115,7 @@ module HTTPX def set_processor return @processor if defined?(@processor) - @processor = PROTOCOLS[@io.protocol].new(@io.selector, @write_buffer, @options) + @processor = PROTOCOLS[@io.protocol].new(@write_buffer, @options) @processor.on(:response, &@on_response) @processor.on(:close) { throw(:close, self) } while (request, args = @pending.shift) diff --git a/lib/httpx/channel/http1.rb b/lib/httpx/channel/http1.rb index a3740b3b..9a503a83 100644 --- a/lib/httpx/channel/http1.rb +++ b/lib/httpx/channel/http1.rb @@ -8,9 +8,8 @@ module HTTPX CRLF = "\r\n" - def initialize(selector, buffer, options) + def initialize(buffer, options) @options = Options.new(options) - @selector = selector @max_concurrent_requests = @options.max_concurrent_requests @parser = HTTP::Parser.new(self) @parser.header_value_type = :arrays @@ -62,11 +61,13 @@ module HTTPX def on_headers_complete(h) log { "headers received" } headers = @options.headers_class.new(h) - response = @options.response_class.new(@selector, @parser.status_code, headers) + response = @options.response_class.new(@requests.last, @parser.status_code, headers, @options) @responses << response - request = @requests[@responses.size - 1] - emit(:response, request, response) log { response.headers.each.map { |f, v| "-> #{f}: #{v}" }.join("\n") } + request = @requests.last + # parser can't say if it's parsing GET or HEAD, + # call the completeness callback manually + on_message_complete if request.verb == :head end def on_body(chunk) @@ -80,6 +81,8 @@ module HTTPX response = @responses.shift reset + emit(:response, request, response) + send(@pending.shift) unless @pending.empty? return unless response.headers["connection"] == "close" log { "connection closed" } diff --git a/lib/httpx/channel/http2.rb b/lib/httpx/channel/http2.rb index 65057f6c..04f7f192 100644 --- a/lib/httpx/channel/http2.rb +++ b/lib/httpx/channel/http2.rb @@ -6,9 +6,8 @@ module HTTPX class Channel::HTTP2 include Callbacks - def initialize(selector, buffer, options) + def initialize(buffer, options) @options = Options.new(options) - @selector = selector @max_concurrent_requests = @options.max_concurrent_requests init_connection @retries = options.max_retries @@ -36,10 +35,8 @@ module HTTPX end stream = @connection.new_stream stream.on(:close) do |error| - unless @streams.delete(stream.id) - response = ErrorResponse.new(error, retries) - emit(:response, request, response) - end + response = @streams.delete(stream.id) || ErrorResponse.new(error, retries) + emit(:response, request, response) send(@pending.shift) unless @pending.empty? end @@ -48,9 +45,8 @@ module HTTPX stream.on(:headers) do |h| _, status = h.shift headers = @options.headers_class.new(h) - response = @options.response_class.new(@selector, status, headers) + response = @options.response_class.new(request, status, headers, @options) @streams[stream.id] = response - emit(:response, request, response) end stream.on(:data) do |data| @streams[stream.id] << data diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 45491d1d..2e748754 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -81,7 +81,7 @@ module HTTPX uri.port == channel.remote_port && uri.scheme == channel.uri.scheme end || begin - channel = Channel.by(self, uri, @options) do |request, response| + channel = Channel.by(uri, @options) do |request, response| @responses[request] = response end diff --git a/lib/httpx/io.rb b/lib/httpx/io.rb index 0fae2013..99bc01d9 100644 --- a/lib/httpx/io.rb +++ b/lib/httpx/io.rb @@ -7,11 +7,10 @@ require "ipaddr" module HTTPX class TCP - attr_reader :ip, :port, :uri, :selector + attr_reader :ip, :port, :uri - def initialize(selector, uri, **) + def initialize(uri, **) @connected = false - @selector = selector @uri = uri @ip = TCPSocket.getaddress(@uri.host) @port = @uri.port diff --git a/lib/httpx/response.rb b/lib/httpx/response.rb index f7b039cd..c664e87d 100644 --- a/lib/httpx/response.rb +++ b/lib/httpx/response.rb @@ -9,8 +9,10 @@ module HTTPX attr_reader :status, :headers, :body def_delegator :@body, :to_s - def initialize(selector, status, headers) - @selector = selector + + def initialize(request, status, headers, **options) + @options = Options.new(options) + @request = request @status = Integer(status) @headers = @options.headers_class.new(headers) @body = Body.new(self) @@ -50,13 +52,11 @@ module HTTPX @length += chunk.bytesize transition @buffer.write(chunk) - @chunk_cb[chunk] if @chunk_cb end alias :<< :write def each return enum_for(__method__) unless block_given? - @chunk_cb = ->(e) { yield(e) } begin unless @state == :idle rewind @@ -64,16 +64,15 @@ module HTTPX yield(*args) end end - buffering! ensure - @chunk_cb = nil close end end - + def to_s - buffering! - @buffer.read + rewind + return @buffer.read if @buffer + "" ensure close end @@ -97,28 +96,24 @@ module HTTPX end private - + def rewind return if @state == :idle @buffer.rewind end - def buffering! - @selector.next_tick until buffered? - rewind - end - - def buffered? - if content_length = @headers["content-length"] - content_length = Integer(content_length) - @length >= content_length - elsif @headers["transfer-encoding"] == "chunked" - # dechunk - raise "TODO: implement de-chunking" - else - !@selector.running? - end - end + # def buffered? + # return true if @response.bodyless? + # if content_length = @headers["content-length"] + # content_length = Integer(content_length) + # @length >= content_length + # elsif @headers["transfer-encoding"] == "chunked" + # # dechunk + # raise "TODO: implement de-chunking" + # else + # true + # end + # end def transition case @state @@ -150,6 +145,39 @@ module HTTPX end end + class ContentType + MIME_TYPE_RE = %r{^([^/]+/[^;]+)(?:$|;)} + CHARSET_RE = /;\s*charset=([^;]+)/i + + attr_reader :mime_type, :charset + + def initialize(mime_type, charset) + @mime_type = mime_type + @charset = charset + end + + class << self + # Parse string and return ContentType struct + def parse(str) + new(mime_type(str), charset(str)) + end + + private + + # :nodoc: + def mime_type(str) + m = str.to_s[MIME_TYPE_RE, 1] + m && m.strip.downcase + end + + # :nodoc: + def charset(str) + m = str.to_s[CHARSET_RE, 1] + m && m.strip.delete('"') + end + end + end + class ErrorResponse attr_reader :error, :retries diff --git a/test/response_test.rb b/test/response_test.rb index 4cecb1ab..97e4aa57 100644 --- a/test/response_test.rb +++ b/test/response_test.rb @@ -6,9 +6,9 @@ class ResponseTest < Minitest::Test include HTTPX def test_response_status - r1 = Response.new(selector, 200, {}) + r1 = Response.new(request, 200, {}) assert r1.status == 200, "unexpected status code (#{r1.status})" - r2 = Response.new(selector, "200", {}) + r2 = Response.new(request, "200", {}) assert r2.status == 200, "unexpected status code (#{r2.status})" end @@ -23,7 +23,7 @@ class ResponseTest < Minitest::Test end def test_response_body_to_s - body1 = Response::Body.new(selector, {}) + body1 = Response::Body.new(Response.new(request, 200, {})) assert body1.empty?, "body must be empty after initialization" body1 << "foo" assert body1 == "foo", "body must be updated" @@ -31,43 +31,32 @@ class ResponseTest < Minitest::Test body1 << "bar" assert body1 == "foobar", "body must buffer subsequent chunks" - sel = Minitest::Mock.new - body2 = Response::Body.new(sel, "content-length" => "6") - sel.expect(:running?, true, []) - sel.expect(:next_tick, nil) do - body2 << "foobar" - true - end - assert body2.empty?, "body must be empty after initialization" - assert body2 == "foobar", "body must buffer before cast" + body3 = Response::Body.new(Response.new(request("head"), 200, {})) + assert body3.empty?, "body must be empty after initialization" + assert body3 == "", "HEAD requets body must be empty" + end def test_response_body_each - body1 = Response::Body.new(selector, {}) + body1 = Response::Body.new(Response.new(request, 200, {})) body1 << "foo" assert body1.each.to_a == %w(foo), "must yield buffer" body1 << "foo" body1 << "bar" assert body1.each.to_a == %w(foobar), "must yield buffers" - - sel = Minitest::Mock.new - body2 = Response::Body.new(sel, "content-length" => "6") - sel.expect(:running?, true, []) - sel.expect(:next_tick, nil) do - body2 << "foo" - body2 << "bar" - true - end - assert body2.each.to_a == %w(foo bar), "must yield buffer chunks" end private - def selector - Connection.new(Options.new) + def request(verb=:get, uri="http://google.com") + Request.new(verb, uri) + end + + def response(*args) + Response.new(*args) end def resource - @resource ||= Response.new(selector, 200, {}) + @resource ||= Response.new(request, 200, {}) end end