Merge branch 'issue-119' into 'master'

new option: origin

See merge request honeyryderchuck/httpx!131
This commit is contained in:
HoneyryderChuck 2021-05-22 14:23:05 +00:00
commit 2c3280b6d0
46 changed files with 1162 additions and 145 deletions

View File

@ -37,6 +37,11 @@ group :test do
gem "brotli"
gem "ed25519"
gem "net-ssh-gateway"
if RUBY_VERSION >= "2.3"
gem "grpc"
gem "logging"
end
end
platform :mri_21 do

View File

@ -26,7 +26,7 @@ From now on, both headers and the responnse payload will also appear, so expecte
## Bugfixes
* HTTP/2 and HTTP/1.1 exhausted connections now get properly migrated into a new connection;
* HTTP/2 421 responses will now correctly migrate the connection and pendign requests to HTTP/1.1 (a hanging loop was being caused);
* HTTP/2 421 responses will now correctly migrate the connection and pending requests to HTTP/1.1 (a hanging loop was being caused);
* HTTP/2 connection failed with a GOAWAY settings timeout will now return error responses (instead of hanging indefinitely);
* Non-IP proxy name-resolving errors will now move on to the next available proxy in the list (instead of hanging indefinitely);
* Non-IP DNS resolve errors for `native` and `https` variants will now return the appropriate error response (instead of hanging indefinitely);

View File

@ -41,7 +41,7 @@ The `:transport_options` are therefore deprecated, and will be moved in a major
## Improvements
Some internal improvements that allow certainn plugins not to "leak" globally, such as the `:compression` plugin, which used to enable compression for all the `httpx` sessions from the same process. It doesn't anymore.
Some internal improvements that allow certain plugins not to "leak" globally, such as the `:compression` plugin, which used to enable compression for all the `httpx` sessions from the same process. It doesn't anymore.
Using exceptionless nonblocking connect calls in the supported rubies.
@ -55,4 +55,4 @@ When passing open IO objects for origins (the `:io` option), `httpx` was still t
Fixed usage of `:io` option when passed an "authority/io" hash.
Fixing some issues around trying to connnect to the next available IPAddress when the previous one was unreachable or ETIMEDOUT.
Fixing some issues around trying to connnect to the next available IPAddress when the previous one was unreachable or ETIMEDOUT.

View File

@ -2,4 +2,4 @@
## Bugfixes
Rescue `Errno::EALREADY` on calls to `connect_nonblock(exception: false)` (there are exceptionns after all...).
Rescue `Errno::EALREADY` on calls to `connect_nonblock(exception: false)` (there are exceptions after all...).

View File

@ -30,6 +30,6 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.add_runtime_dependency "http-2-next", ">= 0.1.2"
gem.add_runtime_dependency "http-2-next", ">= 0.4.1"
gem.add_runtime_dependency "timers"
end

View File

@ -24,6 +24,10 @@ module HTTPX
protected
def callbacks_for?(type)
@callbacks.key?(type) && !@callbacks[type].empty?
end
def callbacks(type = nil)
return @callbacks unless type

View File

@ -443,6 +443,7 @@ module HTTPX
emit(:misdirected, request)
else
response = ErrorResponse.new(request, ex, @options)
request.response = response
request.emit(:response, response)
end
end
@ -542,7 +543,9 @@ module HTTPX
def handle_error(error)
parser.handle_error(error) if @parser && parser.respond_to?(:handle_error)
while (request = @pending.shift)
request.emit(:response, ErrorResponse.new(request, error, @options))
response = ErrorResponse.new(request, error, @options)
request.response = response
request.emit(:response, response)
end
end

View File

@ -272,23 +272,18 @@ module HTTPX
join_headers(request) if request.state == :headers
request.transition(:body)
join_body(request) if request.state == :body
request.transition(:trailers)
# HTTP/1.1 trailers should only work for chunked encoding
join_trailers(request) if request.body.chunked? && request.state == :trailers
request.transition(:done)
end
end
def join_headers(request)
buffer = +""
buffer << "#{request.verb.to_s.upcase} #{headline_uri(request)} HTTP/#{@version.join(".")}" << CRLF
log(color: :yellow) { "<- HEADLINE: #{buffer.chomp.inspect}" }
@buffer << buffer
buffer.clear
@buffer << "#{request.verb.to_s.upcase} #{headline_uri(request)} HTTP/#{@version.join(".")}" << CRLF
log(color: :yellow) { "<- HEADLINE: #{@buffer.to_s.chomp.inspect}" }
set_protocol_headers(request)
request.headers.each do |field, value|
buffer << "#{capitalized(field)}: #{value}" << CRLF
log(color: :yellow) { "<- HEADER: #{buffer.chomp}" }
@buffer << buffer
buffer.clear
end
join_headers2(request.headers)
log { "<- " }
@buffer << CRLF
end
@ -302,6 +297,26 @@ module HTTPX
@buffer << chunk
throw(:buffer_full, request) if @buffer.full?
end
raise request.drain_error if request.drain_error
end
def join_trailers(request)
return unless request.trailers? && request.callbacks_for?(:trailers)
join_headers2(request.trailers)
log { "<- " }
@buffer << CRLF
end
def join_headers2(headers)
buffer = "".b
headers.each do |field, value|
buffer << "#{capitalized(field)}: #{value}" << CRLF
log(color: :yellow) { "<- HEADER: #{buffer.chomp}" }
@buffer << buffer
buffer.clear
end
end
UPCASED = {

View File

@ -38,7 +38,11 @@ module HTTPX
# waiting for WINDOW_UPDATE frames
return :r if @buffer.full?
return :w if @connection.state == :closed
if @connection.state == :closed
return unless @handshake_completed
return :w
end
unless (@connection.state == :connected && @handshake_completed)
return @buffer.empty? ? :r : :rw
@ -146,6 +150,8 @@ module HTTPX
join_headers(stream, request) if request.state == :headers
request.transition(:body)
join_body(stream, request) if request.state == :body
request.transition(:trailers)
join_trailers(stream, request) if request.state == :trailers && !request.body.empty?
request.transition(:done)
end
end
@ -175,6 +181,7 @@ module HTTPX
public :reset
def handle_stream(stream, request)
request.on(:refuse, &method(:on_stream_refuse).curry(3)[stream, request])
stream.on(:close, &method(:on_stream_close).curry(3)[stream, request])
stream.on(:half_close) do
log(level: 2) { "#{stream.id}: waiting for response..." }
@ -199,6 +206,18 @@ module HTTPX
stream.headers(request.headers.each, end_stream: request.empty?)
end
def join_trailers(stream, request)
unless request.trailers?
stream.data("", end_stream: true) if request.callbacks_for?(:trailers)
return
end
log(level: 1, color: :yellow) do
request.trailers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n")
end
stream.headers(request.trailers.each, end_stream: true)
end
def join_body(stream, request)
return if request.empty?
@ -207,13 +226,15 @@ module HTTPX
next_chunk = request.drain_body
log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" }
stream.data(chunk, end_stream: !next_chunk)
if next_chunk && @buffer.full?
stream.data(chunk, end_stream: !(next_chunk || request.trailers? || request.callbacks_for?(:trailers)))
if next_chunk && (@buffer.full? || request.body.unbounded_body?)
@drains[request] = next_chunk
throw(:buffer_full)
end
chunk = next_chunk
end
on_stream_refuse(stream, request, request.drain_error) if request.drain_error
end
######
@ -251,6 +272,11 @@ module HTTPX
request.response << data
end
def on_stream_refuse(stream, request, error)
stream.close
on_stream_close(stream, request, error)
end
def on_stream_close(stream, request, error)
log(level: 2) { "#{stream.id}: closing stream" }
@drains.delete(request)

View File

@ -88,6 +88,10 @@ module HTTPX
end
end
def_option(:origin, <<-OUT)
URI(value)
OUT
def_option(:headers, <<-OUT)
if self.headers
self.headers.merge(value)

View File

@ -32,9 +32,8 @@ module HTTPX
class << self
attr_reader :credentials, :region
def load_dependencies(klass)
def load_dependencies(_klass)
require "aws-sdk-core"
klass.plugin(:aws_sigv4)
client = Class.new(Seahorse::Client::Base) do
@identifier = :httpx
@ -50,6 +49,10 @@ module HTTPX
@region = client.config[:region]
end
def configure(klass)
klass.plugin(:aws_sigv4)
end
def extra_options(options)
options.merge(max_concurrent_requests: 1)
end

View File

@ -146,7 +146,7 @@ module HTTPX
def_option(:sigv4_signer, <<-OUT)
value.is_a?(#{Signer}) ? value : #{Signer}.new(value)
OUT
end.new.merge(options)
end.new(options)
end
def load_dependencies(klass)

View File

@ -8,9 +8,14 @@ module HTTPX
# https://gitlab.com/honeyryderchuck/httpx/wikis/Authentication#basic-authentication
#
module BasicAuthentication
def self.load_dependencies(klass)
require "base64"
klass.plugin(:authentication)
class << self
def load_dependencies(_klass)
require "base64"
end
def configure(klass)
klass.plugin(:authentication)
end
end
module InstanceMethods

View File

@ -66,7 +66,7 @@ module HTTPX
@body = Encoder.new(@body, options.encodings.registry(encoding).deflater)
end
@headers["content-length"] = @body.bytesize unless chunked?
@headers["content-length"] = @body.bytesize unless unbounded_body?
end
end

View File

@ -5,12 +5,12 @@ module HTTPX
module Compression
module Brotli
class << self
def load_dependencies(klass)
klass.plugin(:compression)
def load_dependencies(_klass)
require "brotli"
end
def configure(klass)
klass.plugin(:compression)
klass.default_options.encodings.register "br", self
end
end
@ -18,12 +18,13 @@ module HTTPX
module Deflater
module_function
def deflate(raw, buffer, chunk_size:)
def deflate(raw, buffer = "".b, chunk_size: 16_384)
while (chunk = raw.read(chunk_size))
compressed = ::Brotli.deflate(chunk)
buffer << compressed
yield compressed if block_given?
end
buffer
end
end

View File

@ -4,20 +4,20 @@ module HTTPX
module Plugins
module Compression
module Deflate
def self.load_dependencies(klass)
def self.load_dependencies(_klass)
require "stringio"
require "zlib"
klass.plugin(:"compression/gzip")
end
def self.configure(klass)
klass.plugin(:"compression/gzip")
klass.default_options.encodings.register "deflate", self
end
module Deflater
module_function
def deflate(raw, buffer, chunk_size:)
def deflate(raw, buffer = "".b, chunk_size: 16_384)
deflater = Zlib::Deflate.new
while (chunk = raw.read(chunk_size))
compressed = deflater.deflate(chunk)
@ -27,6 +27,7 @@ module HTTPX
last = deflater.finish
buffer << last
yield last if block_given?
buffer
ensure
deflater.close if deflater
end

View File

@ -19,7 +19,7 @@ module HTTPX
@compressed_chunk = "".b
end
def deflate(raw, buffer, chunk_size:)
def deflate(raw, buffer = "".b, chunk_size: 16_384)
gzip = Zlib::GzipWriter.new(self)
begin
@ -38,6 +38,7 @@ module HTTPX
buffer << compressed
yield compressed if block_given?
buffer
end
private

247
lib/httpx/plugins/grpc.rb Normal file
View File

@ -0,0 +1,247 @@
# frozen_string_literal: true
module HTTPX
GRPCError = Class.new(Error) do
attr_reader :status, :details, :metadata
def initialize(status, details, metadata)
@status = status
@details = details
@metadata = metadata
super("GRPC error, code=#{status}, details=#{details}, metadata=#{metadata}")
end
end
module Plugins
#
# This plugin adds DSL to build GRPC interfaces.
#
# https://gitlab.com/honeyryderchuck/httpx/wikis/GRPC
#
module GRPC
unless String.method_defined?(:underscore)
module StringExtensions
refine String do
def underscore
s = dup # Avoid mutating the argument, as it might be frozen.
s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2')
s.gsub!(/([a-z\d])([A-Z])/, '\1_\2')
s.tr!("-", "_")
s.downcase!
s
end
end
end
using StringExtensions
end
DEADLINE = 60
MARSHAL_METHOD = :encode
UNMARSHAL_METHOD = :decode
HEADERS = {
"content-type" => "application/grpc",
"te" => "trailers",
"accept" => "application/grpc",
# metadata fits here
# ex "foo-bin" => base64("bar")
}.freeze
class << self
def load_dependencies(*)
require "stringio"
require "google/protobuf"
require "httpx/plugins/grpc/message"
require "httpx/plugins/grpc/call"
end
def configure(klass)
klass.plugin(:persistent)
klass.plugin(:compression)
klass.plugin(:stream)
end
def extra_options(options)
Class.new(options.class) do
def_option(:grpc_service, <<-OUT)
String(value)
OUT
def_option(:grpc_compression, <<-OUT)
case value
when true, false
value
else
value.to_s
end
OUT
def_option(:grpc_rpcs, <<-OUT)
Hash[value]
OUT
def_option(:grpc_deadline, <<-OUT)
raise Error, ":grpc_deadline must be positive" unless value.positive?
value
OUT
def_option(:call_credentials, <<-OUT)
raise Error, ":call_credentials must respond to #call" unless value.respond_to?(:call)
value
OUT
end.new(options).merge(
fallback_protocol: "h2",
http2_settings: { wait_for_handshake: false },
grpc_rpcs: {}.freeze,
grpc_compression: false,
grpc_deadline: DEADLINE
)
end
end
module ResponseMethods
attr_reader :trailing_metadata
def merge_headers(trailers)
@trailing_metadata = Hash[trailers]
super
end
def encoders
@options.encodings
end
end
module InstanceMethods
def with_channel_credentials(ca_path, key = nil, cert = nil, **ssl_opts)
ssl_params = {
**ssl_opts,
ca_file: ca_path,
}
if key
key = File.read(key) if File.file?(key)
ssl_params[:key] = OpenSSL::PKey.read(key)
end
if cert
cert = File.read(cert) if File.file?(cert)
ssl_params[:cert] = OpenSSL::X509::Certificate.new(cert)
end
with(ssl: ssl_params)
end
def rpc(rpc_name, input, output, **opts)
rpc_name = rpc_name.to_s
raise Error, "rpc #{rpc_name} already defined" if @options.grpc_rpcs.key?(rpc_name)
rpc_opts = {
deadline: @options.grpc_deadline,
}.merge(opts)
with(grpc_rpcs: @options.grpc_rpcs.merge(
rpc_name.underscore => [rpc_name, input, output, rpc_opts]
).freeze)
end
def build_stub(origin, service: nil, compression: false)
scheme = @options.ssl.empty? ? "http" : "https"
origin = URI.parse("#{scheme}://#{origin}")
with(origin: origin, grpc_service: service, grpc_compression: compression)
end
def execute(rpc_method, input,
deadline: DEADLINE,
metadata: nil,
**opts)
grpc_request = build_grpc_request(rpc_method, input, deadline: deadline, metadata: metadata, **opts)
response = request(grpc_request, **opts)
response.raise_for_status
GRPC::Call.new(response, opts)
end
private
def rpc_execute(rpc_name, input, **opts)
rpc_name, input_enc, output_enc, rpc_opts = @options.grpc_rpcs[rpc_name.to_s] || raise(Error, "#{rpc_name}: undefined service")
exec_opts = rpc_opts.merge(opts)
marshal_method ||= exec_opts.delete(:marshal_method) || MARSHAL_METHOD
unmarshal_method ||= exec_opts.delete(:unmarshal_method) || UNMARSHAL_METHOD
messages = if input.respond_to?(:each)
Enumerator.new do |y|
input.each do |message|
y << input_enc.__send__(marshal_method, message)
end
end
else
input_enc.marshal(input)
end
call = execute(rpc_name, messages, **exec_opts)
call.decoder = output_enc.method(unmarshal_method)
call
end
def build_grpc_request(rpc_method, input, deadline:, metadata: nil, **)
uri = @options.origin.dup
rpc_method = "/#{rpc_method}" unless rpc_method.start_with?("/")
rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service
uri.path = rpc_method
headers = HEADERS.merge(
"grpc-accept-encoding" => ["identity", *@options.encodings.registry.keys]
)
unless deadline == Float::INFINITY
# convert to milliseconds
deadline = (deadline * 1000.0).to_i
headers["grpc-timeout"] = "#{deadline}m"
end
headers = headers.merge(metadata) if metadata
# prepare compressor
deflater = nil
compression = @options.grpc_compression == true ? "gzip" : @options.grpc_compression
if compression
headers["grpc-encoding"] = compression
deflater = @options.encodings.registry(compression).deflater
end
headers.merge!(@options.call_credentials.call) if @options.call_credentials
body = if input.respond_to?(:each)
Enumerator.new do |y|
input.each do |message|
y << Message.encode(message, deflater: deflater)
end
end
else
Message.encode(input, deflater: deflater)
end
build_request(:post, uri, headers: headers, body: body)
end
def respond_to_missing?(meth, *, &blk)
@options.grpc_rpcs.key?(meth.to_s) || super
end
def method_missing(meth, *args, **kwargs, &blk)
return rpc_execute(meth, *args, **kwargs, &blk) if @options.grpc_rpcs.key?(meth.to_s)
super
end
end
end
register_plugin :grpc, GRPC
end
end

View File

@ -0,0 +1,62 @@
# frozen_string_literal: true
module HTTPX
module Plugins
module GRPC
# Encapsulates call information
class Call
attr_writer :decoder
def initialize(response, options)
@response = response
@options = options
@decoder = ->(z) { z }
end
def inspect
"#GRPC::Call(#{grpc_response})"
end
def to_s
grpc_response.to_s
end
def metadata
response.headers
end
def trailing_metadata
return unless @response.body.closed?
@response.trailing_metadata
end
private
def grpc_response
return @grpc_response if defined?(@grpc_response)
@grpc_response = if @response.respond_to?(:each)
Enumerator.new do |y|
Message.stream(@response).each do |message|
y << @decoder.call(message)
end
end
else
@decoder.call(Message.unary(@response))
end
end
def respond_to_missing?(meth, *args, &blk)
grpc_response.respond_to?(meth, *args) || super
end
def method_missing(meth, *args, &blk)
return grpc_response.__send__(meth, *args, &blk) if grpc_response.respond_to?(meth)
super
end
end
end
end
end

View File

@ -0,0 +1,85 @@
# frozen_string_literal: true
module HTTPX
module Plugins
module GRPC
# Encoding module for GRPC responses
#
# Can encode and decode grpc messages.
module Message
module_function
# decodes a unary grpc response
def unary(response)
verify_status(response)
decode(response.to_s, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders)
end
# lazy decodes a grpc stream response
def stream(response, &block)
return enum_for(__method__, response) unless block_given?
response.each do |frame|
decode(frame, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders, &block)
end
verify_status(response)
end
# encodes a single grpc message
def encode(bytes, deflater:)
if deflater
compressed_flag = 1
bytes = deflater.deflate(StringIO.new(bytes))
else
compressed_flag = 0
end
"".b << [compressed_flag, bytes.bytesize].pack("CL>") << bytes.to_s
end
# decodes a single grpc message
def decode(message, encodings:, encoders:)
until message.empty?
compressed, size = message.unpack("CL>")
data = message.byteslice(5..size + 5 - 1)
if compressed == 1
encodings.reverse_each do |algo|
inflater = encoders.registry(algo).inflater(size)
data = inflater.inflate(data)
size = data.bytesize
end
end
return data unless block_given?
yield data
message = message.byteslice(5 + size..-1)
end
end
def cancel(request)
request.emit(:refuse, :client_cancellation)
end
# interprets the grpc call trailing metadata, and raises an
# exception in case of error code
def verify_status(response)
# return standard errors if need be
response.raise_for_status
status = Integer(response.headers["grpc-status"])
message = response.headers["grpc-message"]
return if status.zero?
response.close
raise GRPCError.new(status, message, response.trailing_metadata)
end
end
end
end
end

View File

@ -15,7 +15,7 @@ module HTTPX
class << self
RATE_LIMIT_CODES = [429, 503].freeze
def load_dependencies(klass)
def configure(klass)
klass.plugin(:retries,
retry_change_requests: true,
retry_on: method(:retry_on_rate_limited_response),

View File

@ -1,6 +1,93 @@
# frozen_string_literal: true
module HTTPX
class StreamResponse
def initialize(request, session, connections)
@request = request
@session = session
@connections = connections
@options = @request.options
end
def each(&block)
return enum_for(__method__) unless block_given?
raise Error, "response already streamed" if @response
@request.stream = self
begin
@on_chunk = block
if @request.response
# if we've already started collecting the payload, yield it first
# before proceeding
body = @request.response.body
body.each do |chunk|
on_chunk(chunk)
end
end
response.raise_for_status
response.close
ensure
@on_chunk = nil
end
end
def each_line
return enum_for(__method__) unless block_given?
line = +""
each do |chunk|
line << chunk
while (idx = line.index("\n"))
yield line.byteslice(0..idx - 1)
line = line.byteslice(idx + 1..-1)
end
end
end
# This is a ghost method. It's to be used ONLY internally, when processing streams
def on_chunk(chunk)
raise NoMethodError unless @on_chunk
@on_chunk.call(chunk)
end
# :nocov:
def inspect
"#<StreamResponse:#{object_id}>"
end
# :nocov:
def to_s
response.to_s
end
private
def response
@session.__send__(:receive_requests, [@request], @connections, @options) until @request.response
@request.response
end
def respond_to_missing?(meth, *args)
response.respond_to?(meth, *args) || super
end
def method_missing(meth, *args, &block)
return super unless response.respond_to?(meth)
response.__send__(meth, *args, &block)
end
end
module Plugins
#
# This plugin adds support for stream response (text/event-stream).
@ -15,10 +102,13 @@ module HTTPX
return super(*args, **options) unless stream
requests = args.first.is_a?(Request) ? args : build_requests(*args, options)
raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1
StreamResponse.new(requests.first, self)
request = requests.first
connections = _send_requests(requests, request.options)
StreamResponse.new(request, self, connections)
end
end
@ -53,78 +143,10 @@ module HTTPX
end
end
class StreamResponse
def initialize(request, session)
@request = request
@session = session
@options = @request.options
end
def each(&block)
return enum_for(__method__) unless block_given?
raise Error, "response already streamed" if @response
@request.stream = self
begin
@on_chunk = block
response.raise_for_status
response.close
ensure
@on_chunk = nil
end
end
def each_line
return enum_for(__method__) unless block_given?
line = +""
each do |chunk|
line << chunk
while (idx = line.index("\n"))
yield line.byteslice(0..idx - 1)
line = line.byteslice(idx + 1..-1)
end
end
end
# This is a ghost method. It's to be used ONLY internally, when processing streams
def on_chunk(chunk)
raise NoMethodError unless @on_chunk
@on_chunk.call(chunk)
end
# :nocov:
def inspect
"#<StreamResponse:#{object_id}>"
end
# :nocov:
def to_s
response.to_s
end
private
def response
@response ||= @session.__send__(:send_requests, @request, @options).first
end
def respond_to_missing?(*args)
@options.response_class.respond_to?(*args) || super
end
def method_missing(meth, *args, &block)
return super unless @options.response_class.public_method_defined?(meth)
response.__send__(meth, *args, &block)
end
def self.const_missing(const_name)
super unless const_name == :StreamResponse
warn "DEPRECATION WARNING: the class #{self}::StreamResponse is deprecated. Use HTTPX::StreamResponse instead."
HTTPX::StreamResponse
end
end
register_plugin :stream, Stream

View File

@ -35,14 +35,22 @@ module HTTPX
attr_reader :verb, :uri, :headers, :body, :state, :options, :response
# Exception raised during enumerable body writes
attr_reader :drain_error
def_delegator :@body, :empty?
def_delegator :@body, :chunk!
def initialize(verb, uri, options = {})
@verb = verb.to_s.downcase.to_sym
@uri = Utils.uri(uri)
@options = Options.new(options)
@uri = Utils.uri(uri)
if @uri.relative?
raise(Error, "invalid URI: #{@uri}") unless @options.origin
@uri = @options.origin.merge(@uri)
end
raise(Error, "unknown method: #{verb}") unless METHODS.include?(@verb)
@ -54,6 +62,14 @@ module HTTPX
@state = :idle
end
def trailers?
defined?(@trailers)
end
def trailers
@trailers ||= @options.headers_class.new
end
def interests
return :r if @state == :done || @state == :expect
@ -124,6 +140,9 @@ module HTTPX
chunk.dup
rescue StopIteration
nil
rescue StandardError => e
@drain_error = e
nil
end
# :nocov:
@ -200,7 +219,9 @@ module HTTPX
end
def unbounded_body?
chunked? || @body.bytesize == Float::INFINITY
return @unbounded_body if defined?(@unbounded_body)
@unbounded_body = (chunked? || @body.bytesize == Float::INFINITY)
end
def chunked?
@ -253,6 +274,8 @@ module HTTPX
nextstate = :expect
end
end
when :trailers
return unless @state == :body
when :done
return if @state == :expect
end

View File

@ -94,6 +94,10 @@ module HTTPX
@state = :idle
end
def closed?
@state == :closed
end
def write(chunk)
return if @state == :closed

View File

@ -175,12 +175,18 @@ module HTTPX
end
def send_requests(*requests, options)
connections = []
request_options = @options.merge(options)
connections = _send_requests(requests, request_options)
receive_requests(requests, connections, request_options)
end
def _send_requests(requests, options)
connections = []
requests.each do |request|
error = catch(:resolve_error) do
connection = find_connection(request, connections, request_options)
connection = find_connection(request, connections, options)
connection.send(request)
end
next unless error.is_a?(ResolveError)
@ -188,13 +194,17 @@ module HTTPX
request.emit(:response, ErrorResponse.new(request, error, options))
end
connections
end
def receive_requests(requests, connections, options)
responses = []
begin
# guarantee ordered responses
loop do
request = requests.first
pool.next_tick until (response = fetch_response(request, connections, request_options))
pool.next_tick until (response = fetch_response(request, connections, options))
responses << response
requests.shift
@ -208,7 +218,7 @@ module HTTPX
# opportunity to traverse the requests, hence we're returning only a fraction of the errors
# we were supposed to. This effectively fetches the existing responses and return them.
while (request = requests.shift)
responses << fetch_response(request, connections, request_options)
responses << fetch_response(request, connections, options)
end
break
end

View File

@ -20,7 +20,7 @@ module HTTPX::Transcoder
@raw.each do |chunk|
yield "#{chunk.bytesize.to_s(16)}#{CRLF}#{chunk}#{CRLF}"
end
yield "0#{CRLF}#{CRLF}"
yield "0#{CRLF}"
end
def respond_to_missing?(meth, *args)

View File

@ -9,6 +9,7 @@ module HTTPX
def only: (Symbol) { (*untyped) -> void } -> void
def emit: (Symbol, *untyped) -> void
def callbacks_for?: (Symbol) -> bool
def callbacks: () -> Hash[Symbol, Array[_Callable]]
| (Symbol) -> Array[_Callable]
end

View File

@ -60,6 +60,10 @@ module HTTPX
def join_headers: (Request request) -> void
def join_trailers: (Request request) -> void
def join_headers2: (Headers headers) -> void
def join_body: (Request request) -> void
def capitalized: (String field) -> String

View File

@ -53,6 +53,8 @@ module HTTPX
def join_headers: (HTTP2Next::Stream stream, Request request) -> void
def join_trailers: (HTTP2Next::Stream stream, Request request) -> void
def join_body: (HTTP2Next::Stream stream, Request request) -> void
def on_stream_headers: (HTTP2Next::Stream stream, Request request, Array[[String, String]] headers) -> void
@ -61,7 +63,7 @@ module HTTPX
def on_stream_data: (HTTP2Next::Stream stream, Request request, string data) -> void
def on_stream_close: (HTTP2Next::Stream stream, Request request, Symbol? error) -> void
def on_stream_close: (HTTP2Next::Stream stream, Request request, (Symbol | StandardError)? error) -> void
def on_frame: (string bytes) -> void

View File

@ -11,6 +11,10 @@ module HTTPX
def self.new: (options) -> instance
| () -> instance
# headers
attr_reader uri: URI?
def uri=: (uri) -> void
# headers
attr_reader headers: Headers?
def headers=: (headers) -> void

View File

@ -7,6 +7,8 @@ module HTTPX
def self.load_dependencies: (singleton(Session)) -> void
def self.configure: (singleton(Session)) -> void
def self.extra_options: (Options) -> (Options)
module InstanceMethods

View File

@ -3,6 +3,8 @@ module HTTPX
module BasicAuthentication
def self.load_dependencies: (singleton(Session)) -> void
def self.configure: (singleton(Session)) -> void
module InstanceMethods
def basic_authentication: (string user, string password) -> instance
end

View File

@ -6,8 +6,8 @@ module HTTPX
type deflatable = _Reader | _ToS
interface _Deflater
def deflate: (deflatable, _Writer, chunk_size: Integer) -> void
| (deflatable, _Writer, chunk_size: Integer) { (String) -> void } -> void
def deflate: (deflatable, ?_Writer, ?chunk_size: Integer) -> _ToS
| (deflatable, ?_Writer, ?chunk_size: Integer) { (String) -> void } -> _ToS
end
interface _Inflater

View File

@ -1,4 +1,21 @@
module HTTPX
class StreamResponse
include _ToS
def each: () { (String) -> void } -> void
| () -> Enumerable[String]
def each_line: () { (String) -> void } -> void
| () -> Enumerable[String]
def on_chunk: (string) -> void
private
def response: () -> response
def initialize: (Request, Session, Array[Connection]) -> untyped
end
module Plugins
module Stream
module InstanceMethods
@ -16,22 +33,6 @@ module HTTPX
def stream: () -> StreamResponse?
end
class StreamResponse
include _ToS
def each: () { (String) -> void } -> void
| () -> Enumerable[String]
def each_line: () { (String) -> void } -> void
| () -> Enumerable[String]
def on_chunk: (string) -> void
private
def response: () -> response
def initialize: (Request, Session) -> untyped
end
end
type sessionStream = Session & Plugins::Stream::InstanceMethods

View File

@ -11,7 +11,8 @@ module HTTPX
attr_reader body: Body
attr_reader state: Symbol
attr_reader options: Options
attr_reader response: Response?
attr_reader response: response?
attr_reader drain_error: StandardError?
def initialize: (verb | String, uri, ?options?) -> untyped
@ -21,7 +22,7 @@ module HTTPX
def scheme: () -> ("http" | "https")
def response=: (Response response) -> void
def response=: (response) -> void
def path: () -> String
@ -39,6 +40,10 @@ module HTTPX
def expects?: () -> boolish
def trailers: () -> Headers
def trailers?: () -> boolish
class Body
def initialize: (Headers, Options) -> untyped
def each: () { (String) -> void } -> void

View File

@ -47,6 +47,7 @@ module HTTPX
def empty?: () -> bool
def copy_to: (_ToPath | _Writer destination) -> void
def close: () -> void
def closed?: () -> bool
private

View File

@ -45,5 +45,9 @@ module HTTPX
def build_connection: (URI, Options) -> Connection
def send_requests: (*Request, options) -> Array[response]
def _send_requests: (Array[Request], options) -> Array[Connection]
def receive_requests: (Array[Request], Array[Connection], options) -> Array[response]
end
end

View File

@ -33,6 +33,6 @@ class ErrorResponseTest < Minitest::Test
private
def request_mock
Request.new(:get, "/")
Request.new(:get, "http://example.com/")
end
end

View File

@ -29,6 +29,7 @@ class HTTPTest < Minitest::Test
include Plugins::Stream
include Plugins::AWSAuthentication
include Plugins::Upgrade
include Plugins::GRPC unless RUBY_ENGINE == "jruby" || RUBY_VERSION < "2.3"
def test_verbose_log
log = StringIO.new

View File

@ -30,6 +30,7 @@ class HTTPSTest < Minitest::Test
include Plugins::Stream
include Plugins::AWSAuthentication
include Plugins::Upgrade
include Plugins::GRPC unless RUBY_ENGINE == "jruby" || RUBY_VERSION < "2.3"
def test_connection_coalescing
coalesced_origin = "https://#{ENV["HTTPBIN_COALESCING_HOST"]}"
@ -108,6 +109,30 @@ class HTTPSTest < Minitest::Test
verify_error_response(response, /settings_timeout/)
end
end
def test_http2_request_trailers
uri = build_uri("/post")
HTTPX.wrap do |http|
total_time = start_time = nil
trailered = false
request = http.build_request(:post, uri, body: %w[this is chunked])
request.on(:headers) do |_written_request|
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
request.on(:trailers) do |written_request|
total_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
written_request.trailers["x-time-spent"] = total_time
trailered = true
end
response = http.request(request)
verify_status(response, 200)
body = json_body(response)
# verify_header(body["headers"], "x-time-spent", total_time.to_s)
assert body.key?("data")
assert trailered, "trailer callback wasn't called"
end
end
end
def test_ssl_wrong_hostname

View File

@ -6,14 +6,14 @@ class RequestTest < Minitest::Test
include HTTPX
def test_request_unsupported_body
ex = assert_raises(HTTPX::Error) { Request.new(:post, "/", body: Object.new) }
ex = assert_raises(HTTPX::Error) { Request.new(:post, "http://example.com/", body: Object.new) }
assert ex.message =~ /cannot determine size of body/
end
def test_request_verb
r1 = Request.new(:get, "/")
r1 = Request.new(:get, "http://example.com/")
assert r1.verb == :get, "unexpected verb (#{r1.verb})"
r2 = Request.new("GET", "/")
r2 = Request.new("GET", "http://example.com/")
assert r2.verb == :get, "unexpected verb (#{r1.verb})"
end
@ -57,21 +57,21 @@ class RequestTest < Minitest::Test
end
def test_request_body_raw
req = Request.new(:post, "/", body: "bang")
req = Request.new(:post, "http://example.com/", body: "bang")
assert !req.body.empty?, "body should exist"
assert req.headers["content-type"] == "application/octet-stream", "content type is wrong"
assert req.headers["content-length"] == "4", "content length is wrong"
end
def test_request_body_form
req = Request.new(:post, "/", form: { "foo" => "bar" })
req = Request.new(:post, "http://example.com/", form: { "foo" => "bar" })
assert !req.body.empty?, "body should exist"
assert req.headers["content-type"] == "application/x-www-form-urlencoded", "content type is wrong"
assert req.headers["content-length"] == "7", "content length is wrong"
end
def test_request_body_json
req = Request.new(:post, "/", json: { "foo" => "bar" })
req = Request.new(:post, "http://example.com/", json: { "foo" => "bar" })
assert !req.body.empty?, "body should exist"
assert req.headers["content-type"] == "application/json; charset=utf-8", "content type is wrong"
assert req.headers["content-length"] == "13", "content length is wrong"

View File

@ -27,7 +27,7 @@ class SessionTest < Minitest::Test
assert session.options.respond_to?(:foo), "options methods weren't added"
assert session.options.foo == "options-foo", "option method is unexpected"
request = session.options.request_class.new(:get, "/", session.options)
request = session.options.request_class.new(:get, "http://example.com/", session.options)
assert request.respond_to?(:foo), "request methods haven't been added"
assert request.foo == "request-foo", "request method is unexpected"
assert request.headers.respond_to?(:foo), "headers methods haven't been added"

View File

@ -0,0 +1,203 @@
# frozen_string_literal: true
begin
require "grpc"
require "logging"
# A test message
class EchoMsg
attr_reader :msg
def initialize(msg: "")
@msg = msg
end
def self.marshal(o)
o.msg
end
def self.unmarshal(msg)
EchoMsg.new(msg: msg)
end
end
# a test service that checks the cert of its peer
class TestService
include GRPC::GenericService
rpc :an_rpc, EchoMsg, EchoMsg
rpc :a_cancellable_rpc, EchoMsg, EchoMsg
rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
def check_peer_cert(call)
# error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}"
# fail(error_msg) unless call.peer_cert == client_cert
end
def an_rpc(req, call)
check_peer_cert(call)
req
end
def a_cancellable_rpc(_req, call)
check_peer_cert(call)
raise GRPC::Cancelled, "dump"
end
def a_client_streaming_rpc(call)
check_peer_cert(call)
call.each_remote_read.each { |r| GRPC.logger.info(r) }
EchoMsg.new(msg: "client stream pong")
end
def a_server_streaming_rpc(_, call)
check_peer_cert(call)
call.send_initial_metadata
[EchoMsg.new(msg: "server stream pong"), EchoMsg.new(msg: "server stream pong")]
end
def a_bidi_rpc(requests, call)
check_peer_cert(call)
requests.each { |r| GRPC.logger.info(r) }
call.send_initial_metadata
[EchoMsg.new(msg: "bidi pong"), EchoMsg.new(msg: "bidi pong")]
end
end
if ENV.key?("HTTPX_DEBUG")
log_level = ENV["HTTPX_DEBUG"].to_i
log_level = log_level > 1 ? :debug : :info
module GRPC
extend Logging.globally
end
Logging.logger.root.appenders = Logging.appenders.stdout
Logging.logger.root.level = log_level
Logging.logger["GRPC"].level = log_level
Logging.logger["GRPC::ActiveCall"].level = log_level
Logging.logger["GRPC::BidiCall"].level = log_level
end
module GRPCHelpers
include ::GRPC::Core::StatusCodes
include ::GRPC::Core::TimeConsts
include ::GRPC::Core::CallOps
private
def teardown
super
if @grpc_server
@grpc_server.shutdown_and_notify(from_relative_time(2))
@grpc_server.close
@grpc_server_th.join if @grpc_server_th
end
return unless @rpc_server && !@rpc_server.stopped?
@rpc_server.stop
@rpc_server_th.join
end
def grpc_plugin
grpc = HTTPX.plugin(:grpc)
grpc = grpc.with_channel_credentials(*channel_credentials_paths, hostname: "foo.test.google.fr") if origin.start_with?("https")
grpc
end
def grpc_channel_uri(server_port)
scheme = URI(origin).scheme
"#{scheme}://localhost:#{server_port}"
end
def run_rpc(service, server_args: {})
@rpc_server = ::GRPC::RpcServer.new(server_args: server_args.merge("grpc.so_reuseport" => 0))
cred = origin.start_with?("https") ? server_credentials : :this_port_is_insecure
server_port = @rpc_server.add_http2_port("localhost:0", cred)
@rpc_server.handle(service)
@rpc_server_th = Thread.new { @rpc_server.run }
@rpc_server.wait_till_running
server_port
end
def run_request_response(resp, status, marshal: nil, server_args: {}, server_initial_md: {}, server_trailing_md: {})
@grpc_server = ::GRPC::Core::Server.new(server_args.merge("grpc.so_reuseport" => 0))
cred = origin.start_with?("https") ? server_credentials : :this_port_is_insecure
server_port = @grpc_server.add_http2_port("localhost:0", cred)
@grpc_server_th = wakey_thread do |notifier|
c = expect_server_to_be_invoked(notifier, metadata_to_send: server_initial_md, marshal: marshal)
begin
yield c
ensure
c.remote_send(resp)
c.send_status(status, status == OK ? "OK" : "NOK", true, metadata: server_trailing_md)
c.send(:set_input_stream_done)
c.send(:set_output_stream_done)
end
end
server_port
end
def wakey_thread(&blk)
n = ::GRPC::Notifier.new
t = Thread.new do
begin
blk.call(n)
rescue GRPC::Core::CallError
end
end
t.abort_on_exception = true
n.wait
t
end
def expect_server_to_be_invoked(notifier, metadata_to_send: nil, marshal: nil)
@grpc_server.start
notifier.notify(nil)
recvd_rpc = @grpc_server.request_call
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
::GRPC::ActiveCall.new(recvd_call, marshal, marshal, INFINITE_FUTURE, metadata_received: true)
end
def server_credentials
creds = ["ca.pem", "server1.key", "server1.pem"]
.map { |path| File.join(grpc_testdata_path, path) }
.map(&File.method(:read))
GRPC::Core::ServerCredentials.new(
creds[0],
[{ private_key: creds[1], cert_chain: creds[2] }],
true
) # force client auth
end
def channel_credentials_paths
["ca.pem", "client.key", "client.pem"]
.map { |path| File.join(grpc_testdata_path, path) }
# .map(&File.method(:read))
# GRPC::Core::ChannelCredentials.new(*creds)
end
def grpc_testdata_path
grpc_path = Gem::Specification.find_by_path("grpc").full_gem_path
File.join(grpc_path, "src", "ruby", "spec", "testdata")
end
end
rescue LoadError
module GRPCHelpers
end
end

View File

@ -3,6 +3,8 @@
require "time"
module Requests
using HTTPX::URIExtensions
module Get
def test_http_get
uri = build_uri("/get")
@ -11,6 +13,13 @@ module Requests
verify_body_length(response)
end
def test_http_get_option_origin
uri = URI(build_uri("/get"))
response = HTTPX.with(origin: uri.origin).get(uri.path)
verify_status(response, 200)
verify_body_length(response)
end
def test_http_request
uri = build_uri("/get")
response = HTTPX.request(:get, uri)

View File

@ -0,0 +1,204 @@
# frozen_string_literal: true
module Requests
module Plugins
module GRPC
include GRPCHelpers
def test_plugin_grpc_unary_plain_bytestreams
no_marshal = proc { |x| x }
server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call|
assert call.remote_read == "a_request"
assert call.metadata["k1"] == "v1"
assert call.metadata["k2"] == "v2"
end
grpc = grpc_plugin
# build service
stub = grpc.build_stub("localhost:#{server_port}")
result = stub.execute("an_rpc_method", "a_request", metadata: { k1: "v1", k2: "v2" })
assert result.to_s == "a_reply"
end
def test_plugin_grpc_call_credentials
return unless origin.start_with?("https")
call_credentials = -> { { "k1" => "updated-k1" } }
no_marshal = proc { |x| x }
server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call|
assert call.remote_read == "a_request"
assert call.metadata["k1"] == "updated-k1"
assert call.metadata["k2"] == "v2"
end
grpc = grpc_plugin
# build service
stub = grpc.with_call_credentials(call_credentials).build_stub("localhost:#{server_port}")
result = stub.execute("an_rpc_method", "a_request", metadata: { k1: "v1", k2: "v2" })
assert result.to_s == "a_reply"
end
def test_plugin_grpc_compressed_request
no_marshal = proc { |x| x }
server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call|
# assert call.metadata["grpc-encoding"] == "gzip", "request wasn't compressed"
# TODO: find a way to test if request payload was indeed compressed
assert call.remote_read == "A" * 2000
end
grpc = grpc_plugin
# build service
stub = grpc.build_stub("localhost:#{server_port}", compression: "gzip")
result = stub.execute("an_rpc_method", "A" * 2000)
assert result.to_s == "a_reply"
end
def test_plugin_grpc_compressed_response
no_marshal = proc { |x| x }
server_port = run_request_response("A" * 2000, OK, marshal: no_marshal,
server_initial_md: { "grpc-internal-encoding-request" => "gzip" }) do |call|
assert call.remote_read == "a_request"
end
grpc = grpc_plugin
# build service
stub = grpc.build_stub("localhost:#{server_port}")
result = stub.execute("an_rpc_method", "a_request")
assert result.to_s == "A" * 2000
end
# Cancellation on error
def test_plugin_grpc_deadline_exceeded
no_marshal = proc { |x| x }
server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call|
sleep(3)
assert call.remote_read == "a_request"
end
grpc = grpc_plugin
# build service
stub = grpc.build_stub("localhost:#{server_port}")
error = assert_raises(HTTPX::GRPCError) { stub.execute("an_rpc_method", "a request", deadline: 2).to_s }
assert error.status == ::GRPC::Core::StatusCodes::DEADLINE_EXCEEDED
end
def test_plugin_grpc_cancellation_on_client_error
no_marshal = proc { |x| x }
input = Enumerator.new do |_y|
# y << "a_request"
raise "oh crap"
end
server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call|
# not supposed to arrive here
begin
call.remote_read
rescue StandardError
nil
end
end
grpc = grpc_plugin
# build service
stub = grpc.build_stub("localhost:#{server_port}")
error = assert_raises(HTTPX::Error) { stub.execute("an_rpc_method", input).to_s }
assert error.message =~ /oh crap/
end
def test_plugin_grpc_cancellation_on_server_error
server_port = run_rpc(TestService)
grpc = grpc_plugin
# build service
test_service_rpcs = grpc.rpc(:a_cancellable_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal)
test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService)
error = assert_raises(HTTPX::GRPCError) { test_service_stub.a_cancellable_rpc(EchoMsg.new(msg: "ping")).to_s }
assert error.status == 1
assert error.details == "dump"
end
def test_plugin_grpc_unary_protobuf
server_port = run_rpc(TestService)
grpc = grpc_plugin
# build service
test_service_rpcs = grpc.rpc(:an_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal)
test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService)
echo_response = test_service_stub.an_rpc(EchoMsg.new(msg: "ping"))
assert echo_response.msg == "ping"
assert echo_response.trailing_metadata["grpc-message"] == "OK"
end
def test_plugin_grpc_client_stream_protobuf
server_port = run_rpc(TestService)
grpc = grpc_plugin
# build service
test_service_rpcs = grpc.rpc(:a_client_streaming_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal)
test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService)
input = [EchoMsg.new(msg: "ping"), EchoMsg.new(msg: "ping")]
response = test_service_stub.a_client_streaming_rpc(input)
assert response.msg == "client stream pong"
assert response.trailing_metadata["grpc-message"] == "OK"
end
def test_plugin_grpc_server_stream_protobuf
server_port = run_rpc(TestService)
grpc = grpc_plugin
# build service
test_service_rpcs = grpc.rpc(:a_server_streaming_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal,
stream: true)
test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService)
streaming_response = test_service_stub.a_server_streaming_rpc(EchoMsg.new(msg: "ping"))
assert streaming_response.respond_to?(:each)
assert streaming_response.trailing_metadata.nil?
echo_responses = streaming_response.each.to_a
assert echo_responses.map(&:msg) == ["server stream pong", "server stream pong"]
assert streaming_response.trailing_metadata["grpc-message"] == "OK"
end
def test_plugin_grpc_bidi_stream_protobuf
server_port = run_rpc(TestService)
grpc = grpc_plugin
# build service
test_service_rpcs = grpc.rpc(:a_bidi_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal, stream: true)
test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService)
input = [EchoMsg.new(msg: "ping"), EchoMsg.new(msg: "ping")]
streaming_response = test_service_stub.a_bidi_rpc(input)
assert streaming_response.respond_to?(:each)
assert streaming_response.trailing_metadata.nil?
echo_responses = streaming_response.each.to_a
assert echo_responses.map(&:msg) == ["bidi pong", "bidi pong"]
assert streaming_response.trailing_metadata["grpc-message"] == "OK"
end
end
end
end

View File

@ -14,6 +14,34 @@ module Requests
# assert body["data"] == "thisisachunkedresponse",
# "unexpected body (#{body["data"]})"
end
define_method :"test_#{meth}_chunked_body_trailer" do
uri = build_uri("/#{meth}")
http = HTTPX.with_headers("transfer-encoding" => "chunked")
total_time = start_time = nil
trailered = false
request = http.build_request(meth, uri, headers: { "trailer" => "X-Time-Spent" }, body: %w[this is a chunked response])
request.on(:headers) do |_written_request|
start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
request.on(:trailers) do |written_request|
total_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time
written_request.trailers["x-time-spent"] = total_time
trailered = true
end
response = http.request(request)
verify_status(response, 200)
body = json_body(response)
verify_header(body["headers"], "Transfer-Encoding", "chunked")
# httpbin sadly doesn't receive trailers...
# verify_header(body["headers"], "X-Time-Spent", total_time.to_s)
assert body.key?("data")
assert trailered, "trailer callback wasn't called"
# assert body["data"] == "thisisachunkedresponse",
# "unexpected body (#{body["data"]})"
end
end
end
end