Streaming requests for Net::HTTP (#604)

This commit is contained in:
Martin Mauch 2017-08-30 09:12:01 +02:00 committed by Mattia
parent b09c6db315
commit ff94676369
23 changed files with 246 additions and 10 deletions

View File

@ -102,6 +102,19 @@ conn.get do |req|
req.options.timeout = 5 # open/read timeout in seconds
req.options.open_timeout = 2 # connection open timeout in seconds
end
## Streaming responses ##
streamed = [] # A buffer to store the streamed data
conn.get('/nigiri/sake.json') do |req|
# Set a callback which will receive tuples of chunk Strings
# and the sum of characters received so far
req.options.on_data = Proc.new do |chunk, overall_received_bytes|
puts "Received #{overall_received_bytes} characters"
streamed << chunk
end
end
streamed.join
```
And you can inject arbitrary data into the request using the `context` option:

View File

@ -14,7 +14,7 @@ require 'forwardable'
# conn.get '/'
#
module Faraday
VERSION = "0.13.0"
VERSION = "0.13.1"
class << self
# Public: Gets or sets the root path that Faraday is being loaded from.

View File

@ -140,6 +140,10 @@ module Faraday
def perform_single_request(env)
req = create_request(env)
req.setup_request(env[:method], request_config(env)).callback { |client|
if env[:request].stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
env[:request].on_data.call(client.response, client.response.bytesize)
end
status = client.response_header.status
reason = client.response_header.http_reason
save_response(env, status, client.response, nil, reason) do |resp_headers|

View File

@ -26,6 +26,11 @@ module Faraday
# Queue requests for parallel execution.
if env[:parallel_manager]
env[:parallel_manager].add(request, http_method, request_config(env)) do |resp|
if (req = env[:request]).stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
req.on_data.call(resp.response, resp.response.bytesize)
end
save_response(env, resp.response_header.status, resp.response) do |resp_headers|
resp.response_header.each do |name, value|
resp_headers[name.to_sym] = value
@ -54,6 +59,10 @@ module Faraday
raise client.error if client.error
if env[:request].stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
env[:request].on_data.call(client.response, client.response.bytesize)
end
status = client.response_header.status
reason = client.response_header.http_reason
save_response(env, status, client.response, nil, reason) do |resp_headers|

View File

@ -52,6 +52,10 @@ module Faraday
:headers => env[:request_headers],
:body => read_body(env)
if req.stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
req.on_data.call(resp.body, resp.body.bytesize)
end
save_response(env, resp.status.to_i, resp.body, resp.headers, resp.reason_phrase)
@app.call env

View File

@ -39,6 +39,10 @@ module Faraday
:body => env[:body],
:header => env[:request_headers]
if (req = env[:request]).stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
req.on_data.call(resp.body, resp.body.bytesize)
end
save_response env, resp.status, resp.body, resp.headers, resp.reason
@app.call env

View File

@ -73,9 +73,43 @@ module Faraday
end
def perform_request(http, env)
if env[:request].stream_response?
size = 0
yielded = false
http_response = perform_request_with_wrapped_block(http, env) do |chunk|
if chunk.bytesize > 0 || size > 0
yielded = true
size += chunk.bytesize
env[:request].on_data.call(chunk, size)
end
end
env[:request].on_data.call("", 0) unless yielded
# Net::HTTP returns something, but it's not meaningful according to the docs.
http_response.body = nil
http_response
else
http_response = perform_request_with_wrapped_block(http, env)
end
end
def perform_request_with_wrapped_block(http, env, &block)
if :get == env[:method] and !env[:body]
# prefer `get` to `request` because the former handles gzip (ruby 1.9)
http.get env[:url].request_uri, env[:request_headers]
request_via_get_method(http, env, &block)
else
request_via_request_method(http, env, &block)
end
end
def request_via_get_method(http, env, &block)
http.get env[:url].request_uri, env[:request_headers], &block
end
def request_via_request_method(http, env, &block)
if block_given?
http.request create_request(env) do |response|
response.read_body(&block)
end
else
http.request create_request(env)
end

View File

@ -30,6 +30,10 @@ module Faraday
raise Error::ConnectionFailed, $!
end
if (req = env[:request]).stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
req.on_data.call(response.body, response.body.bytesize)
end
# Remove the "HTTP/1.1 200", leaving just the reason phrase
reason_phrase = response.status_line.gsub(/^.* \d{3} /, '')

View File

@ -46,6 +46,11 @@ module Faraday
execute_request(env, rack_env)
end
if (req = env[:request]).stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
req.on_data.call(response.body, response.body.bytesize)
end
save_response(env, response.status, response.body, response.headers)
@app.call env
end

View File

@ -67,6 +67,11 @@ module Faraday
raise Error::ClientError, resp.curl_error_message
end
if env[:request].stream_response?
warn "Streaming downloads for #{self.class.name} are not yet implemented."
env[:request].on_data.call(resp.body, resp.body.bytesize)
end
save_response(env, resp.code, resp.body) do |response_headers|
response_headers.parse resp.headers
end

View File

@ -202,7 +202,8 @@ module Faraday
end
class RequestOptions < Options.new(:params_encoder, :proxy, :bind,
:timeout, :open_timeout, :boundary, :oauth, :context)
:timeout, :open_timeout, :boundary,
:oauth, :context, :on_data)
def []=(key, value)
if key && key.to_sym == :proxy
@ -211,6 +212,10 @@ module Faraday
super(key, value)
end
end
def stream_response?
on_data.is_a?(Proc)
end
end
class SSLOptions < Options.new(:verify, :ca_file, :ca_path, :verify_mode,

View File

@ -80,6 +80,7 @@ module Faraday
# :request - Hash of options for configuring the request.
# :timeout - open/read timeout Integer in seconds
# :open_timeout - read timeout Integer in seconds
# :on_data - Proc for streaming
# :proxy - Hash of proxy options
# :uri - Proxy Server URI
# :user - Proxy server username

View File

@ -5,7 +5,7 @@ module Adapters
def adapter() :default end
Integration.apply(self, :NonParallel) do
Integration.apply(self, :NonParallel, :Streaming) do
# default stack is not configured with Multipart
undef :test_POST_sends_files
end

View File

@ -5,7 +5,7 @@ module Adapters
def adapter() :em_http end
Integration.apply(self, :Parallel) do
Integration.apply(self, :Parallel, :NonStreaming, :ParallelNonStreaming) do
# https://github.com/eventmachine/eventmachine/pull/289
undef :test_timeout

View File

@ -6,7 +6,7 @@ module Adapters
def adapter() :em_synchrony end
unless jruby?
Integration.apply(self, :Parallel) do
Integration.apply(self, :Parallel, :NonStreaming, :ParallelNonStreaming) do
# https://github.com/eventmachine/eventmachine/pull/289
undef :test_timeout

View File

@ -5,7 +5,7 @@ module Adapters
def adapter() :excon end
Integration.apply(self, :NonParallel) do
Integration.apply(self, :NonParallel, :NonStreaming) do
# https://github.com/geemus/excon/issues/126 ?
undef :test_timeout if ssl_mode?

View File

@ -1,5 +1,6 @@
require 'forwardable'
require File.expand_path("../../helper", __FILE__)
require File.expand_path("../../shared", __FILE__)
Faraday.require_lib 'autoload'
module Adapters
@ -57,6 +58,81 @@ module Adapters
end
end
module ParallelNonStreaming
def test_callback_is_called_in_parallel_with_no_streaming_support
resp1, resp2 = nil, nil
streamed1, streamed2 = nil, nil
connection = create_connection
err = capture_warnings do
connection.in_parallel do
resp1, streamed1 = streaming_request(connection, :get, 'stream?a=1')
resp2, streamed2 = streaming_request(connection, :get, 'stream?b=2', :chunk_size => 16*1024)
assert connection.in_parallel?
assert_nil resp1.body
assert_nil resp2.body
assert_equal [], streamed1
assert_equal [], streamed2
end
end
assert !connection.in_parallel?
assert_match(/Streaming .+ not yet implemented/, err)
opts = {:streaming? => false, :chunk_size => 16*1024}
check_streaming_response(streamed1, opts.merge(:prefix => '{"a"=>"1"}'))
check_streaming_response(streamed2, opts.merge(:prefix => '{"b"=>"2"}'))
end
end
module Streaming
def test_GET_streaming
response, streamed = streaming_request(create_connection, :get, 'stream')
check_streaming_response(streamed, :chunk_size => 16*1024)
assert_equal "", response.body
end
def test_non_GET_streaming
response, streamed = streaming_request(create_connection, :post, 'stream')
check_streaming_response(streamed, :chunk_size => 16*1024)
assert_equal "", response.body
end
def test_GET_streaming_empty_response
_, streamed = streaming_request(create_connection, :get, 'empty_stream')
assert_equal [["", 0]], streamed
end
def test_non_GET_streaming_empty_response
_, streamed = streaming_request(create_connection, :post, 'empty_stream')
assert_equal [["", 0]], streamed
end
end
module NonStreaming
include Faraday::Shared
def test_GET_streaming
response, streamed = nil
err = capture_warnings do
response, streamed = streaming_request(create_connection, :get, 'stream')
end
assert_match(/Streaming .+ not yet implemented/, err)
check_streaming_response(streamed, :streaming? => false)
assert_equal big_string, response.body
end
def test_non_GET_streaming
response, streamed = nil
err = capture_warnings do
response, streamed = streaming_request(create_connection, :post, 'stream')
end
assert_match(/Streaming .+ not yet implemented/, err)
check_streaming_response(streamed, :streaming? => false)
assert_equal big_string, response.body
end
end
module Compression
def test_GET_handles_compression
res = get('echo_header', :name => 'accept-encoding')
@ -258,6 +334,44 @@ module Adapters
conn.builder.insert_before adapter_handler, Faraday::Response::RaiseError
end
end
def streaming_request(connection, method, path, options={})
streamed = []
response = connection.send(method, path) do |req|
req.options.on_data = Proc.new{|*args| streamed << args}
end
[response, streamed]
end
def check_streaming_response(streamed, options={})
opts = {
:prefix => '',
:streaming? => true
}.merge(options)
expected_response = opts[:prefix] + Faraday::Shared.big_string
chunks, sizes = streamed.transpose
# Check that the total size of the chunks (via the last size returned)
# is the same size as the expected_response
assert_equal sizes.last, expected_response.bytesize
start_index = 0
expected_chunks = []
chunks.each do |actual_chunk|
expected_chunk = expected_response[start_index..((start_index + actual_chunk.bytesize)-1)]
expected_chunks << expected_chunk
start_index += expected_chunk.bytesize
end
# it's easier to read a smaller portion, so we check that first
assert_equal expected_chunks[0][0..255], chunks[0][0..255]
[expected_chunks, chunks].transpose.each do |expected, actual|
assert_equal expected, actual
end
end
end
end
end

View File

@ -7,7 +7,7 @@ module Adapters
def adapter() :net_http end
behaviors = [:NonParallel, :Compression]
behaviors = [:NonParallel, :Compression, :Streaming]
Integration.apply(self, *behaviors)

View File

@ -6,7 +6,7 @@ module Adapters
def adapter() :patron end
unless jruby?
Integration.apply(self, :NonParallel) do
Integration.apply(self, :NonParallel, :NonStreaming) do
# https://github.com/toland/patron/issues/34
undef :test_PATCH_send_url_encoded_params

View File

@ -13,6 +13,7 @@ module Adapters
# no Integration.apply because this doesn't require a server as a separate process
include Integration::Common
include Integration::NonParallel
include Integration::NonStreaming
# Rack::MockResponse doesn't provide any way to access the reason phrase,
# so override the shared test from Common.

View File

@ -5,7 +5,7 @@ module Adapters
def adapter() :typhoeus end
Integration.apply(self, :Parallel) do
Integration.apply(self, :Parallel, :NonStreaming, :ParallelNonStreaming) do
# https://github.com/dbalatero/typhoeus/issues/75
undef :test_GET_with_body

View File

@ -1,4 +1,5 @@
require 'sinatra/base'
require_relative 'shared'
module Faraday
class LiveServer < Sinatra::Base
@ -18,6 +19,26 @@ class LiveServer < Sinatra::Base
end
end
[:get, :post].each do |method|
send(method, '/stream') do
content_type :txt
stream do |out|
out << request.GET.inspect if request.GET.any?
out << request.POST.inspect if request.POST.any?
out << Faraday::Shared.big_string
end
end
end
[:get, :post].each do |method|
send(method, '/empty_stream') do
content_type :txt
stream do |out|
out << ""
end
end
end
get '/echo_header' do
header = "HTTP_#{params[:name].tr('-', '_').upcase}"
request.env.fetch(header) { 'NONE' }

12
test/shared.rb Normal file
View File

@ -0,0 +1,12 @@
module Faraday
module Shared
def self.big_string
kb = 1024
(32..126).map{|i| i.chr}.cycle.take(50*kb).join
end
def big_string
Faraday::Shared.big_string
end
end
end