mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-12 00:01:58 -04:00
setting stream callbacks in the same method
This commit is contained in:
parent
25925f95e6
commit
bb6bc00280
@ -37,37 +37,7 @@ module HTTPX
|
||||
end
|
||||
unless stream = @streams[request]
|
||||
stream = @connection.new_stream
|
||||
stream.on(:close) do |error|
|
||||
if request.expects?
|
||||
return handle(request, stream)
|
||||
end
|
||||
response = request.response || ErrorResponse.new(error, retries)
|
||||
emit(:response, request, response)
|
||||
log(2, "#{stream.id}: ") { "closing stream" }
|
||||
|
||||
|
||||
@streams.delete(request)
|
||||
send(@pending.shift) unless @pending.empty?
|
||||
end
|
||||
stream.on(:half_close) do
|
||||
log(2, "#{stream.id}: ") { "waiting for response..." }
|
||||
end
|
||||
# stream.on(:altsvc)
|
||||
stream.on(:headers) do |h|
|
||||
log(stream.id) do
|
||||
h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
|
||||
end
|
||||
_, status = h.shift
|
||||
headers = @options.headers_class.new(h)
|
||||
response = @options.response_class.new(request, status, "2.0", headers, @options)
|
||||
request.response = response
|
||||
@streams[request] = stream
|
||||
end
|
||||
stream.on(:data) do |data|
|
||||
log(1, "#{stream.id}: ") { "<- DATA: #{data.bytesize} bytes..." }
|
||||
log(2, "#{stream.id}: ") { "<- #{data.inspect}" }
|
||||
request.response << data
|
||||
end
|
||||
handle_stream(stream, request)
|
||||
@streams[request] = stream
|
||||
end
|
||||
handle(request, stream)
|
||||
@ -119,6 +89,40 @@ module HTTPX
|
||||
@connection.on(:goaway, &method(:on_close))
|
||||
end
|
||||
|
||||
def handle_stream(stream, request)
|
||||
stream.on(:close) do |error|
|
||||
if request.expects?
|
||||
return handle(request, stream)
|
||||
end
|
||||
response = request.response || ErrorResponse.new(error, retries)
|
||||
emit(:response, request, response)
|
||||
log(2, "#{stream.id}: ") { "closing stream" }
|
||||
|
||||
|
||||
@streams.delete(request)
|
||||
send(@pending.shift) unless @pending.empty?
|
||||
end
|
||||
stream.on(:half_close) do
|
||||
log(2, "#{stream.id}: ") { "waiting for response..." }
|
||||
end
|
||||
# stream.on(:altsvc)
|
||||
stream.on(:headers) do |h|
|
||||
log(stream.id) do
|
||||
h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
|
||||
end
|
||||
_, status = h.shift
|
||||
headers = @options.headers_class.new(h)
|
||||
response = @options.response_class.new(request, status, "2.0", headers, @options)
|
||||
request.response = response
|
||||
@streams[request] = stream
|
||||
end
|
||||
stream.on(:data) do |data|
|
||||
log(1, "#{stream.id}: ") { "<- DATA: #{data.bytesize} bytes..." }
|
||||
log(2, "#{stream.id}: ") { "<- #{data.inspect}" }
|
||||
request.response << data
|
||||
end
|
||||
end
|
||||
|
||||
def join_headers(stream, request)
|
||||
set_request_headers(request)
|
||||
headers = {}
|
||||
|
@ -18,7 +18,7 @@ module HTTPX
|
||||
upgrade_request.headers["upgrade"] = "h2c"
|
||||
upgrade_request.headers.add("connection", "upgrade")
|
||||
upgrade_request.headers.add("connection", "http2-settings")
|
||||
upgrade_request.headers["http2-settings"] = FrameBuilder.settings_value(@default_options.http2_settings)
|
||||
upgrade_request.headers["http2-settings"] = HTTP2::Client.settings_header(@default_options.http2_settings)
|
||||
# TODO: validate!
|
||||
upgrade_response = __send_reqs(*upgrade_request).first
|
||||
|
||||
@ -58,40 +58,8 @@ module HTTPX
|
||||
@connection.send_connection_preface
|
||||
# skip checks, it is assumed that this is the first
|
||||
# request in the connection
|
||||
stream = @connection.new_stream
|
||||
# Stream 1 is implicitly "half-closed" from the client toward the server (see Section 5.1)
|
||||
stream.__send__(:event, :half_closed_local)
|
||||
stream.on(:close) do |error|
|
||||
if request.expects?
|
||||
return handle(request, stream)
|
||||
end
|
||||
response = request.response || ErrorResponse.new(error, retries)
|
||||
emit(:response, request, response)
|
||||
log(2, "#{stream.id}: ") { "closing stream" }
|
||||
|
||||
|
||||
@streams.delete(request)
|
||||
send(@pending.shift) unless @pending.empty?
|
||||
end
|
||||
stream.on(:half_close) do
|
||||
log(2, "#{stream.id}: ") { "waiting for response..." }
|
||||
end
|
||||
# stream.on(:altsvc)
|
||||
stream.on(:headers) do |h|
|
||||
log(stream.id) do
|
||||
h.map { |k, v| "<- HEADER: #{k}: #{v}" }.join("\n")
|
||||
end
|
||||
_, status = h.shift
|
||||
headers = @options.headers_class.new(h)
|
||||
response = @options.response_class.new(request, status, "2.0", headers, @options)
|
||||
request.response = response
|
||||
@streams[request] = stream
|
||||
end
|
||||
stream.on(:data) do |data|
|
||||
log(1, "#{stream.id}: ") { "<- DATA: #{data.bytesize} bytes..." }
|
||||
log(2, "#{stream.id}: ") { "<- #{data.inspect}" }
|
||||
request.response << data
|
||||
end
|
||||
stream = @connection.upgrade
|
||||
handle_stream(stream, request)
|
||||
@streams[request] = stream
|
||||
end
|
||||
end
|
||||
|
Loading…
x
Reference in New Issue
Block a user