mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
went back from returning partial responses, and only expose fully buffered responses to the user; the main issue being, the main API favours closing the connection after requests, and this makes the whole handling of partial responses confusing and error prone; this way, one can buffer the body, even to the filesystem, and maybe make this tunable in the future
This commit is contained in:
parent
7b8858ec56
commit
d8eb3bbe5f
@ -15,12 +15,12 @@ module HTTPX
|
||||
BUFFER_SIZE = 1 << 16
|
||||
|
||||
class << self
|
||||
def by(selector, uri, options, &blk)
|
||||
def by(uri, options, &blk)
|
||||
io = case uri.scheme
|
||||
when "http"
|
||||
TCP.new(selector, uri, options)
|
||||
TCP.new(uri, options)
|
||||
when "https"
|
||||
SSL.new(selector, uri, options)
|
||||
SSL.new(uri, options)
|
||||
else
|
||||
raise Error, "#{uri.scheme}: unrecognized channel"
|
||||
end
|
||||
@ -115,7 +115,7 @@ module HTTPX
|
||||
|
||||
def set_processor
|
||||
return @processor if defined?(@processor)
|
||||
@processor = PROTOCOLS[@io.protocol].new(@io.selector, @write_buffer, @options)
|
||||
@processor = PROTOCOLS[@io.protocol].new(@write_buffer, @options)
|
||||
@processor.on(:response, &@on_response)
|
||||
@processor.on(:close) { throw(:close, self) }
|
||||
while (request, args = @pending.shift)
|
||||
|
@ -8,9 +8,8 @@ module HTTPX
|
||||
|
||||
CRLF = "\r\n"
|
||||
|
||||
def initialize(selector, buffer, options)
|
||||
def initialize(buffer, options)
|
||||
@options = Options.new(options)
|
||||
@selector = selector
|
||||
@max_concurrent_requests = @options.max_concurrent_requests
|
||||
@parser = HTTP::Parser.new(self)
|
||||
@parser.header_value_type = :arrays
|
||||
@ -62,11 +61,13 @@ module HTTPX
|
||||
def on_headers_complete(h)
|
||||
log { "headers received" }
|
||||
headers = @options.headers_class.new(h)
|
||||
response = @options.response_class.new(@selector, @parser.status_code, headers)
|
||||
response = @options.response_class.new(@requests.last, @parser.status_code, headers, @options)
|
||||
@responses << response
|
||||
request = @requests[@responses.size - 1]
|
||||
emit(:response, request, response)
|
||||
log { response.headers.each.map { |f, v| "-> #{f}: #{v}" }.join("\n") }
|
||||
request = @requests.last
|
||||
# parser can't say if it's parsing GET or HEAD,
|
||||
# call the completeness callback manually
|
||||
on_message_complete if request.verb == :head
|
||||
end
|
||||
|
||||
def on_body(chunk)
|
||||
@ -80,6 +81,8 @@ module HTTPX
|
||||
response = @responses.shift
|
||||
reset
|
||||
|
||||
emit(:response, request, response)
|
||||
|
||||
send(@pending.shift) unless @pending.empty?
|
||||
return unless response.headers["connection"] == "close"
|
||||
log { "connection closed" }
|
||||
|
@ -6,9 +6,8 @@ module HTTPX
|
||||
class Channel::HTTP2
|
||||
include Callbacks
|
||||
|
||||
def initialize(selector, buffer, options)
|
||||
def initialize(buffer, options)
|
||||
@options = Options.new(options)
|
||||
@selector = selector
|
||||
@max_concurrent_requests = @options.max_concurrent_requests
|
||||
init_connection
|
||||
@retries = options.max_retries
|
||||
@ -36,10 +35,8 @@ module HTTPX
|
||||
end
|
||||
stream = @connection.new_stream
|
||||
stream.on(:close) do |error|
|
||||
unless @streams.delete(stream.id)
|
||||
response = ErrorResponse.new(error, retries)
|
||||
emit(:response, request, response)
|
||||
end
|
||||
response = @streams.delete(stream.id) || ErrorResponse.new(error, retries)
|
||||
emit(:response, request, response)
|
||||
|
||||
send(@pending.shift) unless @pending.empty?
|
||||
end
|
||||
@ -48,9 +45,8 @@ module HTTPX
|
||||
stream.on(:headers) do |h|
|
||||
_, status = h.shift
|
||||
headers = @options.headers_class.new(h)
|
||||
response = @options.response_class.new(@selector, status, headers)
|
||||
response = @options.response_class.new(request, status, headers, @options)
|
||||
@streams[stream.id] = response
|
||||
emit(:response, request, response)
|
||||
end
|
||||
stream.on(:data) do |data|
|
||||
@streams[stream.id] << data
|
||||
|
@ -81,7 +81,7 @@ module HTTPX
|
||||
uri.port == channel.remote_port &&
|
||||
uri.scheme == channel.uri.scheme
|
||||
end || begin
|
||||
channel = Channel.by(self, uri, @options) do |request, response|
|
||||
channel = Channel.by(uri, @options) do |request, response|
|
||||
@responses[request] = response
|
||||
end
|
||||
|
||||
|
@ -7,11 +7,10 @@ require "ipaddr"
|
||||
module HTTPX
|
||||
class TCP
|
||||
|
||||
attr_reader :ip, :port, :uri, :selector
|
||||
attr_reader :ip, :port, :uri
|
||||
|
||||
def initialize(selector, uri, **)
|
||||
def initialize(uri, **)
|
||||
@connected = false
|
||||
@selector = selector
|
||||
@uri = uri
|
||||
@ip = TCPSocket.getaddress(@uri.host)
|
||||
@port = @uri.port
|
||||
|
@ -9,8 +9,10 @@ module HTTPX
|
||||
attr_reader :status, :headers, :body
|
||||
|
||||
def_delegator :@body, :to_s
|
||||
def initialize(selector, status, headers)
|
||||
@selector = selector
|
||||
|
||||
def initialize(request, status, headers, **options)
|
||||
@options = Options.new(options)
|
||||
@request = request
|
||||
@status = Integer(status)
|
||||
@headers = @options.headers_class.new(headers)
|
||||
@body = Body.new(self)
|
||||
@ -50,13 +52,11 @@ module HTTPX
|
||||
@length += chunk.bytesize
|
||||
transition
|
||||
@buffer.write(chunk)
|
||||
@chunk_cb[chunk] if @chunk_cb
|
||||
end
|
||||
alias :<< :write
|
||||
|
||||
def each
|
||||
return enum_for(__method__) unless block_given?
|
||||
@chunk_cb = ->(e) { yield(e) }
|
||||
begin
|
||||
unless @state == :idle
|
||||
rewind
|
||||
@ -64,16 +64,15 @@ module HTTPX
|
||||
yield(*args)
|
||||
end
|
||||
end
|
||||
buffering!
|
||||
ensure
|
||||
@chunk_cb = nil
|
||||
close
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
def to_s
|
||||
buffering!
|
||||
@buffer.read
|
||||
rewind
|
||||
return @buffer.read if @buffer
|
||||
""
|
||||
ensure
|
||||
close
|
||||
end
|
||||
@ -97,28 +96,24 @@ module HTTPX
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
|
||||
def rewind
|
||||
return if @state == :idle
|
||||
@buffer.rewind
|
||||
end
|
||||
|
||||
def buffering!
|
||||
@selector.next_tick until buffered?
|
||||
rewind
|
||||
end
|
||||
|
||||
def buffered?
|
||||
if content_length = @headers["content-length"]
|
||||
content_length = Integer(content_length)
|
||||
@length >= content_length
|
||||
elsif @headers["transfer-encoding"] == "chunked"
|
||||
# dechunk
|
||||
raise "TODO: implement de-chunking"
|
||||
else
|
||||
!@selector.running?
|
||||
end
|
||||
end
|
||||
# def buffered?
|
||||
# return true if @response.bodyless?
|
||||
# if content_length = @headers["content-length"]
|
||||
# content_length = Integer(content_length)
|
||||
# @length >= content_length
|
||||
# elsif @headers["transfer-encoding"] == "chunked"
|
||||
# # dechunk
|
||||
# raise "TODO: implement de-chunking"
|
||||
# else
|
||||
# true
|
||||
# end
|
||||
# end
|
||||
|
||||
def transition
|
||||
case @state
|
||||
@ -150,6 +145,39 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
class ContentType
|
||||
MIME_TYPE_RE = %r{^([^/]+/[^;]+)(?:$|;)}
|
||||
CHARSET_RE = /;\s*charset=([^;]+)/i
|
||||
|
||||
attr_reader :mime_type, :charset
|
||||
|
||||
def initialize(mime_type, charset)
|
||||
@mime_type = mime_type
|
||||
@charset = charset
|
||||
end
|
||||
|
||||
class << self
|
||||
# Parse string and return ContentType struct
|
||||
def parse(str)
|
||||
new(mime_type(str), charset(str))
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# :nodoc:
|
||||
def mime_type(str)
|
||||
m = str.to_s[MIME_TYPE_RE, 1]
|
||||
m && m.strip.downcase
|
||||
end
|
||||
|
||||
# :nodoc:
|
||||
def charset(str)
|
||||
m = str.to_s[CHARSET_RE, 1]
|
||||
m && m.strip.delete('"')
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class ErrorResponse
|
||||
|
||||
attr_reader :error, :retries
|
||||
|
@ -6,9 +6,9 @@ class ResponseTest < Minitest::Test
|
||||
include HTTPX
|
||||
|
||||
def test_response_status
|
||||
r1 = Response.new(selector, 200, {})
|
||||
r1 = Response.new(request, 200, {})
|
||||
assert r1.status == 200, "unexpected status code (#{r1.status})"
|
||||
r2 = Response.new(selector, "200", {})
|
||||
r2 = Response.new(request, "200", {})
|
||||
assert r2.status == 200, "unexpected status code (#{r2.status})"
|
||||
end
|
||||
|
||||
@ -23,7 +23,7 @@ class ResponseTest < Minitest::Test
|
||||
end
|
||||
|
||||
def test_response_body_to_s
|
||||
body1 = Response::Body.new(selector, {})
|
||||
body1 = Response::Body.new(Response.new(request, 200, {}))
|
||||
assert body1.empty?, "body must be empty after initialization"
|
||||
body1 << "foo"
|
||||
assert body1 == "foo", "body must be updated"
|
||||
@ -31,43 +31,32 @@ class ResponseTest < Minitest::Test
|
||||
body1 << "bar"
|
||||
assert body1 == "foobar", "body must buffer subsequent chunks"
|
||||
|
||||
sel = Minitest::Mock.new
|
||||
body2 = Response::Body.new(sel, "content-length" => "6")
|
||||
sel.expect(:running?, true, [])
|
||||
sel.expect(:next_tick, nil) do
|
||||
body2 << "foobar"
|
||||
true
|
||||
end
|
||||
assert body2.empty?, "body must be empty after initialization"
|
||||
assert body2 == "foobar", "body must buffer before cast"
|
||||
body3 = Response::Body.new(Response.new(request("head"), 200, {}))
|
||||
assert body3.empty?, "body must be empty after initialization"
|
||||
assert body3 == "", "HEAD requets body must be empty"
|
||||
|
||||
end
|
||||
|
||||
def test_response_body_each
|
||||
body1 = Response::Body.new(selector, {})
|
||||
body1 = Response::Body.new(Response.new(request, 200, {}))
|
||||
body1 << "foo"
|
||||
assert body1.each.to_a == %w(foo), "must yield buffer"
|
||||
body1 << "foo"
|
||||
body1 << "bar"
|
||||
assert body1.each.to_a == %w(foobar), "must yield buffers"
|
||||
|
||||
sel = Minitest::Mock.new
|
||||
body2 = Response::Body.new(sel, "content-length" => "6")
|
||||
sel.expect(:running?, true, [])
|
||||
sel.expect(:next_tick, nil) do
|
||||
body2 << "foo"
|
||||
body2 << "bar"
|
||||
true
|
||||
end
|
||||
assert body2.each.to_a == %w(foo bar), "must yield buffer chunks"
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def selector
|
||||
Connection.new(Options.new)
|
||||
def request(verb=:get, uri="http://google.com")
|
||||
Request.new(verb, uri)
|
||||
end
|
||||
|
||||
def response(*args)
|
||||
Response.new(*args)
|
||||
end
|
||||
|
||||
def resource
|
||||
@resource ||= Response.new(selector, 200, {})
|
||||
@resource ||= Response.new(request, 200, {})
|
||||
end
|
||||
end
|
||||
|
Loading…
x
Reference in New Issue
Block a user