Add support for making a request and receiving the response as a stream. (#983)

This commit is contained in:
Dominic Charley-Roy 2021-06-24 10:24:11 -04:00 committed by GitHub
parent 28e6d19a90
commit 59eb8d06cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1045 additions and 725 deletions

View File

@ -6,6 +6,28 @@ module Stripe
module ClassMethods
def execute_resource_request(method, url,
params = {}, opts = {})
execute_resource_request_internal(
:execute_request, method, url, params, opts
)
end
def execute_resource_request_stream(method, url,
params = {}, opts = {},
&read_body_chunk_block)
execute_resource_request_internal(
:execute_request_stream,
method,
url,
params,
opts,
&read_body_chunk_block
)
end
private def execute_resource_request_internal(client_request_method_sym,
method, url,
params, opts,
&read_body_chunk_block)
params ||= {}
error_on_invalid_params(params)
@ -22,10 +44,12 @@ module Stripe
client = headers.delete(:client)
# Assume all remaining opts must be headers
resp, opts[:api_key] = client.execute_request(
resp, opts[:api_key] = client.send(
client_request_method_sym,
method, url,
api_base: api_base, api_key: api_key,
headers: headers, params: params
headers: headers, params: params,
&read_body_chunk_block
)
# Hash#select returns an array before 1.9
@ -89,6 +113,15 @@ module Stripe
self.class.execute_resource_request(method, url, params, opts)
end
protected def execute_resource_request_stream(method, url,
params = {}, opts = {},
&read_body_chunk_block)
opts = @opts.merge(Util.normalize_opts(opts))
self.class.execute_resource_request_stream(
method, url, params, opts, &read_body_chunk_block
)
end
# See notes on `alias` above.
alias request execute_resource_request
end

View File

@ -115,5 +115,13 @@ module Stripe
Util.convert_to_stripe_object(resp.data, opts)
end
end
protected def request_stream(method:, path:, params:, opts: {},
&read_body_chunk_block)
resp, = execute_resource_request_stream(
method, path, params, opts, &read_body_chunk_block
)
resp
end
end
end

View File

@ -66,7 +66,8 @@ module Stripe
# Executes an HTTP request to the given URI with the given method. Also
# allows a request body, headers, and query string to be specified.
def execute_request(method, uri, body: nil, headers: nil, query: nil)
def execute_request(method, uri, body: nil, headers: nil, query: nil,
&block)
# Perform some basic argument validation because it's easy to get
# confused between strings and hashes for things like body and query
# parameters.
@ -92,8 +93,22 @@ module Stripe
u.path
end
method_name = method.to_s.upcase
has_response_body = method_name != "HEAD"
request = Net::HTTPGenericRequest.new(
method_name,
(body ? true : false),
has_response_body,
path,
headers
)
@mutex.synchronize do
connection.send_request(method.to_s.upcase, path, body, headers)
# The block parameter is special here. If a block is provided, the block
# is invoked with the Net::HTTPResponse. However, the body will not have
# been read yet in the block, and can be streamed by calling
# HTTPResponse#read_body.
connection.request(request, body, &block)
end
end

View File

@ -213,62 +213,9 @@ module Stripe
def execute_request(method, path,
api_base: nil, api_key: nil, headers: {}, params: {})
raise ArgumentError, "method should be a symbol" \
unless method.is_a?(Symbol)
raise ArgumentError, "path should be a string" \
unless path.is_a?(String)
api_base ||= config.api_base
api_key ||= config.api_key
params = Util.objects_to_ids(params)
check_api_key!(api_key)
body_params = nil
query_params = nil
case method
when :get, :head, :delete
query_params = params
else
body_params = params
end
query_params, path = merge_query_params(query_params, path)
headers = request_headers(api_key, method)
.update(Util.normalize_headers(headers))
url = api_url(path, api_base)
# Merge given query parameters with any already encoded in the path.
query = query_params ? Util.encode_parameters(query_params) : nil
# Encoding body parameters is a little more complex because we may have
# to send a multipart-encoded body. `body_log` is produced separately as
# a log-friendly variant of the encoded form. File objects are displayed
# as such instead of as their file contents.
body, body_log =
body_params ? encode_body(body_params, headers) : [nil, nil]
# stores information on the request we're about to make so that we don't
# have to pass as many parameters around for logging.
context = RequestLogContext.new
context.account = headers["Stripe-Account"]
context.api_key = api_key
context.api_version = headers["Stripe-Version"]
context.body = body_log
context.idempotency_key = headers["Idempotency-Key"]
context.method = method
context.path = path
context.query = query
http_resp = execute_request_with_rescues(method, api_base, context) do
self.class
.default_connection_manager(config)
.execute_request(method, url,
body: body,
headers: headers,
query: query)
end
http_resp, api_key = execute_request_internal(
method, path, api_base, api_key, headers, params
)
begin
resp = StripeResponse.from_net_http(http_resp)
@ -284,6 +231,38 @@ module Stripe
[resp, api_key]
end
# Executes a request and returns the body as a stream instead of converting
# it to a StripeObject. This should be used for any request where we expect
# an arbitrary binary response.
#
# A `read_body_chunk` block can be passed, which will be called repeatedly
# with the body chunks read from the socket.
#
# If a block is passed, a StripeHeadersOnlyResponse is returned as the
# block is expected to do all the necessary body processing. If no block is
# passed, then a StripeStreamResponse is returned containing an IO stream
# with the response body.
def execute_request_stream(method, path,
api_base: nil, api_key: nil,
headers: {}, params: {},
&read_body_chunk_block)
unless block_given?
raise ArgumentError,
"execute_request_stream requires a read_body_chunk_block"
end
http_resp, api_key = execute_request_internal(
method, path, api_base, api_key, headers, params, &read_body_chunk_block
)
# When the read_body_chunk_block is given, we no longer have access to the
# response body at this point and so return a response object containing
# only the headers. This is because the body was consumed by the block.
resp = StripeHeadersOnlyResponse.from_net_http(http_resp)
[resp, api_key]
end
def store_last_response(object_id, resp)
return unless last_response_has_key?(object_id)
@ -451,6 +430,83 @@ module Stripe
pruned_contexts.count
end
private def execute_request_internal(method, path,
api_base, api_key, headers, params,
&read_body_chunk_block)
raise ArgumentError, "method should be a symbol" \
unless method.is_a?(Symbol)
raise ArgumentError, "path should be a string" \
unless path.is_a?(String)
api_base ||= config.api_base
api_key ||= config.api_key
params = Util.objects_to_ids(params)
check_api_key!(api_key)
body_params = nil
query_params = nil
case method
when :get, :head, :delete
query_params = params
else
body_params = params
end
query_params, path = merge_query_params(query_params, path)
headers = request_headers(api_key, method)
.update(Util.normalize_headers(headers))
url = api_url(path, api_base)
# Merge given query parameters with any already encoded in the path.
query = query_params ? Util.encode_parameters(query_params) : nil
# Encoding body parameters is a little more complex because we may have
# to send a multipart-encoded body. `body_log` is produced separately as
# a log-friendly variant of the encoded form. File objects are displayed
# as such instead of as their file contents.
body, body_log =
body_params ? encode_body(body_params, headers) : [nil, nil]
# stores information on the request we're about to make so that we don't
# have to pass as many parameters around for logging.
context = RequestLogContext.new
context.account = headers["Stripe-Account"]
context.api_key = api_key
context.api_version = headers["Stripe-Version"]
context.body = body_log
context.idempotency_key = headers["Idempotency-Key"]
context.method = method
context.path = path
context.query = query
# A block can be passed in to read the content directly from the response.
# We want to execute this block only when the response was actually
# successful. When it wasn't, we defer to the standard error handling as
# we have to read the body and parse the error JSON.
response_block =
if block_given?
lambda do |response|
unless should_handle_as_error(response.code.to_i)
response.read_body(&read_body_chunk_block)
end
end
end
http_resp = execute_request_with_rescues(method, api_base, context) do
self.class
.default_connection_manager(config)
.execute_request(method, url,
body: body,
headers: headers,
query: query,
&response_block)
end
[http_resp, api_key]
end
private def api_url(url = "", api_base = nil)
(api_base || config.api_base) + url
end
@ -490,6 +546,7 @@ module Stripe
# that's more condusive to logging.
flattened_params =
flattened_params.map { |k, v| [k, v.is_a?(String) ? v : v.to_s] }.to_h
else
body = Util.encode_parameters(body_params)
end
@ -503,6 +560,10 @@ module Stripe
[body, body_log]
end
private def should_handle_as_error(http_status)
http_status >= 400
end
private def execute_request_with_rescues(method, api_base, context)
num_retries = 0
@ -520,7 +581,9 @@ module Stripe
http_status = resp.code.to_i
context = context.dup_from_response_headers(resp)
handle_error_response(resp, context) if http_status >= 400
if should_handle_as_error(http_status)
handle_error_response(resp, context)
end
log_response(context, request_start, http_status, resp.body)
notify_request_end(context, request_duration, http_status,

View File

@ -1,63 +1,54 @@
# frozen_string_literal: true
module Stripe
# StripeResponse encapsulates some vitals of a response that came back from
# the Stripe API.
class StripeResponse
# Headers provides an access wrapper to an API response's header data. It
# mainly exists so that we don't need to expose the entire
# `Net::HTTPResponse` object while still getting some of its benefits like
# case-insensitive access to header names and flattening of header values.
class Headers
# Initializes a Headers object from a Net::HTTP::HTTPResponse object.
def self.from_net_http(resp)
new(resp.to_hash)
# Headers provides an access wrapper to an API response's header data. It
# mainly exists so that we don't need to expose the entire
# `Net::HTTPResponse` object while still getting some of its benefits like
# case-insensitive access to header names and flattening of header values.
class StripeResponseHeaders
# Initializes a Headers object from a Net::HTTP::HTTPResponse object.
def self.from_net_http(resp)
new(resp.to_hash)
end
# `hash` is expected to be a hash mapping header names to arrays of
# header values. This is the default format generated by calling
# `#to_hash` on a `Net::HTTPResponse` object because headers can be
# repeated multiple times. Using `#[]` will collapse values down to just
# the first.
def initialize(hash)
if !hash.is_a?(Hash) ||
!hash.keys.all? { |n| n.is_a?(String) } ||
!hash.values.all? { |a| a.is_a?(Array) } ||
!hash.values.all? { |a| a.all? { |v| v.is_a?(String) } }
raise ArgumentError,
"expect hash to be a map of string header names to arrays of " \
"header values"
end
# `hash` is expected to be a hash mapping header names to arrays of
# header values. This is the default format generated by calling
# `#to_hash` on a `Net::HTTPResponse` object because headers can be
# repeated multiple times. Using `#[]` will collapse values down to just
# the first.
def initialize(hash)
if !hash.is_a?(Hash) ||
!hash.keys.all? { |n| n.is_a?(String) } ||
!hash.values.all? { |a| a.is_a?(Array) } ||
!hash.values.all? { |a| a.all? { |v| v.is_a?(String) } }
raise ArgumentError,
"expect hash to be a map of string header names to arrays of " \
"header values"
end
@hash = {}
@hash = {}
# This shouldn't be strictly necessary because `Net::HTTPResponse` will
# produce a hash with all headers downcased, but do it anyway just in
# case an object of this class was constructed manually.
#
# Also has the effect of duplicating the hash, which is desirable for a
# little extra object safety.
hash.each do |k, v|
@hash[k.downcase] = v
end
end
def [](name)
values = @hash[name.downcase]
if values && values.count > 1
warn("Duplicate header values for `#{name}`; returning only first")
end
values ? values.first : nil
# This shouldn't be strictly necessary because `Net::HTTPResponse` will
# produce a hash with all headers downcased, but do it anyway just in
# case an object of this class was constructed manually.
#
# Also has the effect of duplicating the hash, which is desirable for a
# little extra object safety.
hash.each do |k, v|
@hash[k.downcase] = v
end
end
# The data contained by the HTTP body of the response deserialized from
# JSON.
attr_accessor :data
# The raw HTTP body of the response.
attr_accessor :http_body
def [](name)
values = @hash[name.downcase]
if values && values.count > 1
warn("Duplicate header values for `#{name}`; returning only first")
end
values ? values.first : nil
end
end
module StripeResponseBase
# A Hash of the HTTP headers of the response.
attr_accessor :http_headers
@ -67,15 +58,52 @@ module Stripe
# The Stripe request ID of the response.
attr_accessor :request_id
def self.populate_for_net_http(resp, http_resp)
resp.http_headers = StripeResponseHeaders.from_net_http(http_resp)
resp.http_status = http_resp.code.to_i
resp.request_id = http_resp["request-id"]
end
end
# StripeResponse encapsulates some vitals of a response that came back from
# the Stripe API.
class StripeResponse
include StripeResponseBase
# The data contained by the HTTP body of the response deserialized from
# JSON.
attr_accessor :data
# The raw HTTP body of the response.
attr_accessor :http_body
# Initializes a StripeResponse object from a Net::HTTP::HTTPResponse
# object.
def self.from_net_http(http_resp)
resp = StripeResponse.new
resp.data = JSON.parse(http_resp.body, symbolize_names: true)
resp.http_body = http_resp.body
resp.http_headers = Headers.from_net_http(http_resp)
resp.http_status = http_resp.code.to_i
resp.request_id = http_resp["request-id"]
StripeResponseBase.populate_for_net_http(resp, http_resp)
resp
end
end
# We have to alias StripeResponseHeaders to StripeResponse::Headers, as this
# class used to be embedded within StripeResponse and we want to be backwards
# compatible.
StripeResponse::Headers = StripeResponseHeaders
# StripeHeadersOnlyResponse includes only header-related vitals of the
# response. This is used for streaming requests where the response was read
# directly in a block and we explicitly don't want to store the body of the
# response in memory.
class StripeHeadersOnlyResponse
include StripeResponseBase
# Initializes a StripeHeadersOnlyResponse object from a
# Net::HTTP::HTTPResponse object.
def self.from_net_http(http_resp)
resp = StripeHeadersOnlyResponse.new
StripeResponseBase.populate_for_net_http(resp, http_resp)
resp
end
end

View File

@ -613,6 +613,58 @@ module Stripe
end
end
context "#request_stream" do
class StreamTestAPIResource < APIResource
OBJECT_NAME = "stream"
def read_stream(params = {}, opts = {}, &read_body_chunk_block)
request_stream(
method: :get,
path: resource_url + "/read",
params: params,
opts: opts,
&read_body_chunk_block
)
end
end
setup do
Util.instance_variable_set(
:@object_classes,
Stripe::ObjectTypes.object_names_to_classes.merge(
"stream" => StreamTestAPIResource
)
)
end
teardown do
Util.class.instance_variable_set(:@object_classes, Stripe::ObjectTypes.object_names_to_classes)
end
should "supports requesting with a block" do
stub_request(:get, "#{Stripe.api_base}/v1/streams/hi_123/read")
.with(query: { foo: "bar" }, headers: { "Stripe-Account" => "acct_hi" })
.to_return(body: "response body")
accumulated_body = +""
resp = StreamTestAPIResource.new(id: "hi_123").read_stream({ foo: "bar" }, stripe_account: "acct_hi") do |body_chunk|
accumulated_body << body_chunk
end
assert_instance_of Stripe::StripeHeadersOnlyResponse, resp
assert_equal "response body", accumulated_body
end
should "fail when requesting without a block" do
stub_request(:get, "#{Stripe.api_base}/v1/streams/hi_123/read")
.with(query: { foo: "bar" }, headers: { "Stripe-Account" => "acct_hi" })
.to_return(body: "response body")
assert_raises ArgumentError do
StreamTestAPIResource.new(id: "hi_123").read_stream({ foo: "bar" }, stripe_account: "acct_hi")
end
end
end
@@fixtures = {} # rubocop:disable Style/ClassVars
setup do
if @@fixtures.empty?

View File

@ -159,6 +159,27 @@ module Stripe
query: "query=bar")
end
should "make a request with a block" do
stub_request(:post, "#{Stripe.api_base}/path?query=bar")
.with(
body: "body=foo",
headers: { "Stripe-Account" => "bar" }
)
.to_return(body: "HTTP response body")
accumulated_body = +""
@manager.execute_request(:post, "#{Stripe.api_base}/path",
body: "body=foo",
headers: { "Stripe-Account" => "bar" },
query: "query=bar") do |res|
res.read_body do |body_chunk|
accumulated_body << body_chunk
end
end
assert_equal "HTTP response body", accumulated_body
end
should "perform basic argument validation" do
e = assert_raises ArgumentError do
@manager.execute_request("POST", "#{Stripe.api_base}/path")

File diff suppressed because it is too large Load Diff

View File

@ -4,12 +4,12 @@ require ::File.expand_path("../test_helper", __dir__)
module Stripe
class StripeResponseTest < Test::Unit::TestCase
context "Headers" do
context "StripeResponseHeaders" do
should "allow case-insensitive header access" do
headers = { "Request-Id" => "request-id" }
http_resp = create_net_http_resp(200, "", headers)
headers = StripeResponse::Headers.from_net_http(http_resp)
headers = StripeResponseHeaders.from_net_http(http_resp)
assert_equal "request-id", headers["request-id"]
assert_equal "request-id", headers["Request-Id"]
@ -17,26 +17,26 @@ module Stripe
end
should "initialize without error" do
StripeResponse::Headers.new({})
StripeResponse::Headers.new("Request-Id" => [])
StripeResponse::Headers.new("Request-Id" => ["request-id"])
StripeResponseHeaders.new({})
StripeResponseHeaders.new("Request-Id" => [])
StripeResponseHeaders.new("Request-Id" => ["request-id"])
end
should "initialize with error on a malformed hash" do
assert_raises(ArgumentError) do
StripeResponse::Headers.new(nil)
StripeResponseHeaders.new(nil)
end
assert_raises(ArgumentError) do
StripeResponse::Headers.new(1 => [])
StripeResponseHeaders.new(1 => [])
end
assert_raises(ArgumentError) do
StripeResponse::Headers.new("Request-Id" => 1)
StripeResponseHeaders.new("Request-Id" => 1)
end
assert_raises(ArgumentError) do
StripeResponse::Headers.new("Request-Id" => [1])
StripeResponseHeaders.new("Request-Id" => [1])
end
end
@ -44,7 +44,7 @@ module Stripe
old_stderr = $stderr
$stderr = StringIO.new
begin
headers = StripeResponse::Headers.new("Duplicated" => %w[a b])
headers = StripeResponseHeaders.new("Duplicated" => %w[a b])
assert_equal "a", headers["Duplicated"]
assert_equal "Duplicate header values for `Duplicated`; returning only first",
$stderr.string.rstrip
@ -54,20 +54,46 @@ module Stripe
end
end
context ".from_net_http" do
should "converts to StripeResponse" do
code = 200
body = '{"foo": "bar"}'
headers = { "Request-Id" => "request-id" }
http_resp = create_net_http_resp(code, body, headers)
[StripeResponse, StripeHeadersOnlyResponse].each do |response_class|
context "StripeResponseBase mixin for #{response_class}" do
context ".from_net_http" do
should "populate the base fields" do
code = 200
body = '{"foo": "bar"}'
headers = { "Request-Id" => "request-id" }
http_resp = create_net_http_resp(code, body, headers)
resp = StripeResponse.from_net_http(http_resp)
resp = response_class.from_net_http(http_resp)
assert_equal JSON.parse(body, symbolize_names: true), resp.data
assert_equal body, resp.http_body
assert_equal "request-id", resp.http_headers["Request-ID"]
assert_equal code, resp.http_status
assert_equal "request-id", resp.request_id
assert_equal "request-id", resp.http_headers["Request-ID"]
assert_equal code, resp.http_status
assert_equal "request-id", resp.request_id
end
end
end
end
context "#StripeResponse" do
context ".from_net_http" do
should "converts to StripeResponse" do
code = 200
body = '{"foo": "bar"}'
http_resp = create_net_http_resp(code, body, {})
resp = StripeResponse.from_net_http(http_resp)
assert_instance_of StripeResponse, resp
assert_equal JSON.parse(body, symbolize_names: true), resp.data
assert_equal body, resp.http_body
end
end
context "Headers backwards compatibility" do
should "alias StripeResponseHeaders" do
headers = StripeResponse::Headers.new("Request-Id" => ["request-id"])
assert_instance_of StripeResponseHeaders, headers
end
end
end