store response in requests, index stream by request; this allow pauses; improved body buffering (not sending empty chunk), allow body drain to pause when buffer is full

This commit is contained in:
HoneyryderChuck 2017-12-12 02:25:59 +00:00
parent 24821d1e8a
commit 2b581bbefa

View File

@ -13,6 +13,7 @@ module HTTPX
@retries = options.max_retries
@pending = []
@streams = {}
@drains = {}
@buffer = buffer
end
@ -33,30 +34,39 @@ module HTTPX
@pending << request
return
end
stream = @connection.new_stream
stream.on(:close) do |error|
response = @streams.delete(stream.id) || ErrorResponse.new(error, retries)
emit(:response, request, response)
unless stream = @streams[request]
stream = @connection.new_stream
stream.on(:close) do |error|
response = request.response || ErrorResponse.new(error, retries)
emit(:response, request, response)
send(@pending.shift) unless @pending.empty?
@streams.delete(request)
send(@pending.shift) unless @pending.empty?
end
# stream.on(:half_close)
# stream.on(:altsvc)
stream.on(:headers) do |h|
_, status = h.shift
headers = @options.headers_class.new(h)
response = @options.response_class.new(request, status, headers, @options)
request.response = response
@streams[request] = stream
end
stream.on(:data) do |data|
request.response << data
end
end
# stream.on(:half_close)
# stream.on(:altsvc)
stream.on(:headers) do |h|
_, status = h.shift
headers = @options.headers_class.new(h)
response = @options.response_class.new(request, status, headers, @options)
@streams[stream.id] = response
catch(:buffer_full) do
request.transition(:headers)
join_headers(stream, request) if request.state == :headers
request.transition(:body)
join_body(stream, request) if request.state == :body
request.transition(:done)
end
stream.on(:data) do |data|
@streams[stream.id] << data
end
join_headers(stream, request)
join_body(stream, request)
end
def reenqueue!
requests = @streams.values
requests = @streams.keys
@streams.clear
init_connection
requests.each do |request|
@ -84,15 +94,20 @@ module HTTPX
headers[":path"] = request.path
headers[":authority"] = request.authority
headers = headers.merge(request.headers)
stream.headers(headers, end_stream: !request.body)
stream.headers(headers, end_stream: request.empty?)
end
def join_body(stream, request)
return unless request.body
request.body.each do |chunk|
stream.data(chunk, end_stream: false)
chunk = @drains.delete(request) || request.drain_body
while chunk
next_chunk = request.drain_body
stream.data(chunk, end_stream: !next_chunk)
chunk = next_chunk
if chunk && @buffer.full?
@drains[request] = chunk
throw(:buffer_full)
end
end
stream.data("", end_stream: true)
end
######