diff --git a/Gemfile b/Gemfile index ec613d67..eb28ce36 100644 --- a/Gemfile +++ b/Gemfile @@ -37,6 +37,11 @@ group :test do gem "brotli" gem "ed25519" gem "net-ssh-gateway" + + if RUBY_VERSION >= "2.3" + gem "grpc" + gem "logging" + end end platform :mri_21 do diff --git a/doc/release_notes/0_10_1.md b/doc/release_notes/0_10_1.md index a7204ce4..f9add38f 100644 --- a/doc/release_notes/0_10_1.md +++ b/doc/release_notes/0_10_1.md @@ -26,7 +26,7 @@ From now on, both headers and the responnse payload will also appear, so expecte ## Bugfixes * HTTP/2 and HTTP/1.1 exhausted connections now get properly migrated into a new connection; -* HTTP/2 421 responses will now correctly migrate the connection and pendign requests to HTTP/1.1 (a hanging loop was being caused); +* HTTP/2 421 responses will now correctly migrate the connection and pending requests to HTTP/1.1 (a hanging loop was being caused); * HTTP/2 connection failed with a GOAWAY settings timeout will now return error responses (instead of hanging indefinitely); * Non-IP proxy name-resolving errors will now move on to the next available proxy in the list (instead of hanging indefinitely); * Non-IP DNS resolve errors for `native` and `https` variants will now return the appropriate error response (instead of hanging indefinitely); diff --git a/doc/release_notes/0_13_0.md b/doc/release_notes/0_13_0.md index e09f0bd2..5796bc65 100644 --- a/doc/release_notes/0_13_0.md +++ b/doc/release_notes/0_13_0.md @@ -41,7 +41,7 @@ The `:transport_options` are therefore deprecated, and will be moved in a major ## Improvements -Some internal improvements that allow certainn plugins not to "leak" globally, such as the `:compression` plugin, which used to enable compression for all the `httpx` sessions from the same process. It doesn't anymore. +Some internal improvements that allow certain plugins not to "leak" globally, such as the `:compression` plugin, which used to enable compression for all the `httpx` sessions from the same process. It doesn't anymore. Using exceptionless nonblocking connect calls in the supported rubies. @@ -55,4 +55,4 @@ When passing open IO objects for origins (the `:io` option), `httpx` was still t Fixed usage of `:io` option when passed an "authority/io" hash. -Fixing some issues around trying to connnect to the next available IPAddress when the previous one was unreachable or ETIMEDOUT. \ No newline at end of file +Fixing some issues around trying to connnect to the next available IPAddress when the previous one was unreachable or ETIMEDOUT. diff --git a/doc/release_notes/0_13_1.md b/doc/release_notes/0_13_1.md index d618a7c0..8a210bfe 100644 --- a/doc/release_notes/0_13_1.md +++ b/doc/release_notes/0_13_1.md @@ -2,4 +2,4 @@ ## Bugfixes -Rescue `Errno::EALREADY` on calls to `connect_nonblock(exception: false)` (there are exceptionns after all...). \ No newline at end of file +Rescue `Errno::EALREADY` on calls to `connect_nonblock(exception: false)` (there are exceptions after all...). \ No newline at end of file diff --git a/httpx.gemspec b/httpx.gemspec index 29ebb91d..ef9f23ff 100644 --- a/httpx.gemspec +++ b/httpx.gemspec @@ -30,6 +30,6 @@ Gem::Specification.new do |gem| gem.require_paths = ["lib"] - gem.add_runtime_dependency "http-2-next", ">= 0.1.2" + gem.add_runtime_dependency "http-2-next", ">= 0.4.1" gem.add_runtime_dependency "timers" end diff --git a/lib/httpx/callbacks.rb b/lib/httpx/callbacks.rb index d9355d44..59f54e5b 100644 --- a/lib/httpx/callbacks.rb +++ b/lib/httpx/callbacks.rb @@ -24,6 +24,10 @@ module HTTPX protected + def callbacks_for?(type) + @callbacks.key?(type) && !@callbacks[type].empty? + end + def callbacks(type = nil) return @callbacks unless type diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index 01a6d9e7..9e890deb 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -443,6 +443,7 @@ module HTTPX emit(:misdirected, request) else response = ErrorResponse.new(request, ex, @options) + request.response = response request.emit(:response, response) end end @@ -542,7 +543,9 @@ module HTTPX def handle_error(error) parser.handle_error(error) if @parser && parser.respond_to?(:handle_error) while (request = @pending.shift) - request.emit(:response, ErrorResponse.new(request, error, @options)) + response = ErrorResponse.new(request, error, @options) + request.response = response + request.emit(:response, response) end end diff --git a/lib/httpx/connection/http1.rb b/lib/httpx/connection/http1.rb index 0283c2cf..3d9721a3 100644 --- a/lib/httpx/connection/http1.rb +++ b/lib/httpx/connection/http1.rb @@ -272,23 +272,18 @@ module HTTPX join_headers(request) if request.state == :headers request.transition(:body) join_body(request) if request.state == :body + request.transition(:trailers) + # HTTP/1.1 trailers should only work for chunked encoding + join_trailers(request) if request.body.chunked? && request.state == :trailers request.transition(:done) end end def join_headers(request) - buffer = +"" - buffer << "#{request.verb.to_s.upcase} #{headline_uri(request)} HTTP/#{@version.join(".")}" << CRLF - log(color: :yellow) { "<- HEADLINE: #{buffer.chomp.inspect}" } - @buffer << buffer - buffer.clear + @buffer << "#{request.verb.to_s.upcase} #{headline_uri(request)} HTTP/#{@version.join(".")}" << CRLF + log(color: :yellow) { "<- HEADLINE: #{@buffer.to_s.chomp.inspect}" } set_protocol_headers(request) - request.headers.each do |field, value| - buffer << "#{capitalized(field)}: #{value}" << CRLF - log(color: :yellow) { "<- HEADER: #{buffer.chomp}" } - @buffer << buffer - buffer.clear - end + join_headers2(request.headers) log { "<- " } @buffer << CRLF end @@ -302,6 +297,26 @@ module HTTPX @buffer << chunk throw(:buffer_full, request) if @buffer.full? end + + raise request.drain_error if request.drain_error + end + + def join_trailers(request) + return unless request.trailers? && request.callbacks_for?(:trailers) + + join_headers2(request.trailers) + log { "<- " } + @buffer << CRLF + end + + def join_headers2(headers) + buffer = "".b + headers.each do |field, value| + buffer << "#{capitalized(field)}: #{value}" << CRLF + log(color: :yellow) { "<- HEADER: #{buffer.chomp}" } + @buffer << buffer + buffer.clear + end end UPCASED = { diff --git a/lib/httpx/connection/http2.rb b/lib/httpx/connection/http2.rb index f1ddf86e..2ca5c80d 100644 --- a/lib/httpx/connection/http2.rb +++ b/lib/httpx/connection/http2.rb @@ -38,7 +38,11 @@ module HTTPX # waiting for WINDOW_UPDATE frames return :r if @buffer.full? - return :w if @connection.state == :closed + if @connection.state == :closed + return unless @handshake_completed + + return :w + end unless (@connection.state == :connected && @handshake_completed) return @buffer.empty? ? :r : :rw @@ -146,6 +150,8 @@ module HTTPX join_headers(stream, request) if request.state == :headers request.transition(:body) join_body(stream, request) if request.state == :body + request.transition(:trailers) + join_trailers(stream, request) if request.state == :trailers && !request.body.empty? request.transition(:done) end end @@ -175,6 +181,7 @@ module HTTPX public :reset def handle_stream(stream, request) + request.on(:refuse, &method(:on_stream_refuse).curry(3)[stream, request]) stream.on(:close, &method(:on_stream_close).curry(3)[stream, request]) stream.on(:half_close) do log(level: 2) { "#{stream.id}: waiting for response..." } @@ -199,6 +206,18 @@ module HTTPX stream.headers(request.headers.each, end_stream: request.empty?) end + def join_trailers(stream, request) + unless request.trailers? + stream.data("", end_stream: true) if request.callbacks_for?(:trailers) + return + end + + log(level: 1, color: :yellow) do + request.trailers.each.map { |k, v| "#{stream.id}: -> HEADER: #{k}: #{v}" }.join("\n") + end + stream.headers(request.trailers.each, end_stream: true) + end + def join_body(stream, request) return if request.empty? @@ -207,13 +226,15 @@ module HTTPX next_chunk = request.drain_body log(level: 1, color: :green) { "#{stream.id}: -> DATA: #{chunk.bytesize} bytes..." } log(level: 2, color: :green) { "#{stream.id}: -> #{chunk.inspect}" } - stream.data(chunk, end_stream: !next_chunk) - if next_chunk && @buffer.full? + stream.data(chunk, end_stream: !(next_chunk || request.trailers? || request.callbacks_for?(:trailers))) + if next_chunk && (@buffer.full? || request.body.unbounded_body?) @drains[request] = next_chunk throw(:buffer_full) end chunk = next_chunk end + + on_stream_refuse(stream, request, request.drain_error) if request.drain_error end ###### @@ -251,6 +272,11 @@ module HTTPX request.response << data end + def on_stream_refuse(stream, request, error) + stream.close + on_stream_close(stream, request, error) + end + def on_stream_close(stream, request, error) log(level: 2) { "#{stream.id}: closing stream" } @drains.delete(request) diff --git a/lib/httpx/options.rb b/lib/httpx/options.rb index ece26f64..02b1c5a6 100644 --- a/lib/httpx/options.rb +++ b/lib/httpx/options.rb @@ -88,6 +88,10 @@ module HTTPX end end + def_option(:origin, <<-OUT) + URI(value) + OUT + def_option(:headers, <<-OUT) if self.headers self.headers.merge(value) diff --git a/lib/httpx/plugins/aws_sdk_authentication.rb b/lib/httpx/plugins/aws_sdk_authentication.rb index b3cd2ad2..48ed9064 100644 --- a/lib/httpx/plugins/aws_sdk_authentication.rb +++ b/lib/httpx/plugins/aws_sdk_authentication.rb @@ -32,9 +32,8 @@ module HTTPX class << self attr_reader :credentials, :region - def load_dependencies(klass) + def load_dependencies(_klass) require "aws-sdk-core" - klass.plugin(:aws_sigv4) client = Class.new(Seahorse::Client::Base) do @identifier = :httpx @@ -50,6 +49,10 @@ module HTTPX @region = client.config[:region] end + def configure(klass) + klass.plugin(:aws_sigv4) + end + def extra_options(options) options.merge(max_concurrent_requests: 1) end diff --git a/lib/httpx/plugins/aws_sigv4.rb b/lib/httpx/plugins/aws_sigv4.rb index 1894ad18..4adf0898 100644 --- a/lib/httpx/plugins/aws_sigv4.rb +++ b/lib/httpx/plugins/aws_sigv4.rb @@ -146,7 +146,7 @@ module HTTPX def_option(:sigv4_signer, <<-OUT) value.is_a?(#{Signer}) ? value : #{Signer}.new(value) OUT - end.new.merge(options) + end.new(options) end def load_dependencies(klass) diff --git a/lib/httpx/plugins/basic_authentication.rb b/lib/httpx/plugins/basic_authentication.rb index ec1409d8..dc68d679 100644 --- a/lib/httpx/plugins/basic_authentication.rb +++ b/lib/httpx/plugins/basic_authentication.rb @@ -8,9 +8,14 @@ module HTTPX # https://gitlab.com/honeyryderchuck/httpx/wikis/Authentication#basic-authentication # module BasicAuthentication - def self.load_dependencies(klass) - require "base64" - klass.plugin(:authentication) + class << self + def load_dependencies(_klass) + require "base64" + end + + def configure(klass) + klass.plugin(:authentication) + end end module InstanceMethods diff --git a/lib/httpx/plugins/compression.rb b/lib/httpx/plugins/compression.rb index 5929afe1..a538c3ed 100644 --- a/lib/httpx/plugins/compression.rb +++ b/lib/httpx/plugins/compression.rb @@ -66,7 +66,7 @@ module HTTPX @body = Encoder.new(@body, options.encodings.registry(encoding).deflater) end - @headers["content-length"] = @body.bytesize unless chunked? + @headers["content-length"] = @body.bytesize unless unbounded_body? end end diff --git a/lib/httpx/plugins/compression/brotli.rb b/lib/httpx/plugins/compression/brotli.rb index a27b23b2..dc37e378 100644 --- a/lib/httpx/plugins/compression/brotli.rb +++ b/lib/httpx/plugins/compression/brotli.rb @@ -5,12 +5,12 @@ module HTTPX module Compression module Brotli class << self - def load_dependencies(klass) - klass.plugin(:compression) + def load_dependencies(_klass) require "brotli" end def configure(klass) + klass.plugin(:compression) klass.default_options.encodings.register "br", self end end @@ -18,12 +18,13 @@ module HTTPX module Deflater module_function - def deflate(raw, buffer, chunk_size:) + def deflate(raw, buffer = "".b, chunk_size: 16_384) while (chunk = raw.read(chunk_size)) compressed = ::Brotli.deflate(chunk) buffer << compressed yield compressed if block_given? end + buffer end end diff --git a/lib/httpx/plugins/compression/deflate.rb b/lib/httpx/plugins/compression/deflate.rb index 7ccad027..ed45a5a1 100644 --- a/lib/httpx/plugins/compression/deflate.rb +++ b/lib/httpx/plugins/compression/deflate.rb @@ -4,20 +4,20 @@ module HTTPX module Plugins module Compression module Deflate - def self.load_dependencies(klass) + def self.load_dependencies(_klass) require "stringio" require "zlib" - klass.plugin(:"compression/gzip") end def self.configure(klass) + klass.plugin(:"compression/gzip") klass.default_options.encodings.register "deflate", self end module Deflater module_function - def deflate(raw, buffer, chunk_size:) + def deflate(raw, buffer = "".b, chunk_size: 16_384) deflater = Zlib::Deflate.new while (chunk = raw.read(chunk_size)) compressed = deflater.deflate(chunk) @@ -27,6 +27,7 @@ module HTTPX last = deflater.finish buffer << last yield last if block_given? + buffer ensure deflater.close if deflater end diff --git a/lib/httpx/plugins/compression/gzip.rb b/lib/httpx/plugins/compression/gzip.rb index e6fc5d31..56e06a37 100644 --- a/lib/httpx/plugins/compression/gzip.rb +++ b/lib/httpx/plugins/compression/gzip.rb @@ -19,7 +19,7 @@ module HTTPX @compressed_chunk = "".b end - def deflate(raw, buffer, chunk_size:) + def deflate(raw, buffer = "".b, chunk_size: 16_384) gzip = Zlib::GzipWriter.new(self) begin @@ -38,6 +38,7 @@ module HTTPX buffer << compressed yield compressed if block_given? + buffer end private diff --git a/lib/httpx/plugins/grpc.rb b/lib/httpx/plugins/grpc.rb new file mode 100644 index 00000000..b80c99fa --- /dev/null +++ b/lib/httpx/plugins/grpc.rb @@ -0,0 +1,247 @@ +# frozen_string_literal: true + +module HTTPX + GRPCError = Class.new(Error) do + attr_reader :status, :details, :metadata + + def initialize(status, details, metadata) + @status = status + @details = details + @metadata = metadata + super("GRPC error, code=#{status}, details=#{details}, metadata=#{metadata}") + end + end + + module Plugins + # + # This plugin adds DSL to build GRPC interfaces. + # + # https://gitlab.com/honeyryderchuck/httpx/wikis/GRPC + # + module GRPC + unless String.method_defined?(:underscore) + module StringExtensions + refine String do + def underscore + s = dup # Avoid mutating the argument, as it might be frozen. + s.gsub!(/([A-Z]+)([A-Z][a-z])/, '\1_\2') + s.gsub!(/([a-z\d])([A-Z])/, '\1_\2') + s.tr!("-", "_") + s.downcase! + s + end + end + end + using StringExtensions + end + + DEADLINE = 60 + MARSHAL_METHOD = :encode + UNMARSHAL_METHOD = :decode + HEADERS = { + "content-type" => "application/grpc", + "te" => "trailers", + "accept" => "application/grpc", + # metadata fits here + # ex "foo-bin" => base64("bar") + }.freeze + + class << self + def load_dependencies(*) + require "stringio" + require "google/protobuf" + require "httpx/plugins/grpc/message" + require "httpx/plugins/grpc/call" + end + + def configure(klass) + klass.plugin(:persistent) + klass.plugin(:compression) + klass.plugin(:stream) + end + + def extra_options(options) + Class.new(options.class) do + def_option(:grpc_service, <<-OUT) + String(value) + OUT + + def_option(:grpc_compression, <<-OUT) + case value + when true, false + value + else + value.to_s + end + OUT + + def_option(:grpc_rpcs, <<-OUT) + Hash[value] + OUT + + def_option(:grpc_deadline, <<-OUT) + raise Error, ":grpc_deadline must be positive" unless value.positive? + + value + OUT + + def_option(:call_credentials, <<-OUT) + raise Error, ":call_credentials must respond to #call" unless value.respond_to?(:call) + + value + OUT + end.new(options).merge( + fallback_protocol: "h2", + http2_settings: { wait_for_handshake: false }, + grpc_rpcs: {}.freeze, + grpc_compression: false, + grpc_deadline: DEADLINE + ) + end + end + + module ResponseMethods + attr_reader :trailing_metadata + + def merge_headers(trailers) + @trailing_metadata = Hash[trailers] + super + end + + def encoders + @options.encodings + end + end + + module InstanceMethods + def with_channel_credentials(ca_path, key = nil, cert = nil, **ssl_opts) + ssl_params = { + **ssl_opts, + ca_file: ca_path, + } + if key + key = File.read(key) if File.file?(key) + ssl_params[:key] = OpenSSL::PKey.read(key) + end + + if cert + cert = File.read(cert) if File.file?(cert) + ssl_params[:cert] = OpenSSL::X509::Certificate.new(cert) + end + + with(ssl: ssl_params) + end + + def rpc(rpc_name, input, output, **opts) + rpc_name = rpc_name.to_s + raise Error, "rpc #{rpc_name} already defined" if @options.grpc_rpcs.key?(rpc_name) + + rpc_opts = { + deadline: @options.grpc_deadline, + }.merge(opts) + + with(grpc_rpcs: @options.grpc_rpcs.merge( + rpc_name.underscore => [rpc_name, input, output, rpc_opts] + ).freeze) + end + + def build_stub(origin, service: nil, compression: false) + scheme = @options.ssl.empty? ? "http" : "https" + + origin = URI.parse("#{scheme}://#{origin}") + + with(origin: origin, grpc_service: service, grpc_compression: compression) + end + + def execute(rpc_method, input, + deadline: DEADLINE, + metadata: nil, + **opts) + grpc_request = build_grpc_request(rpc_method, input, deadline: deadline, metadata: metadata, **opts) + response = request(grpc_request, **opts) + response.raise_for_status + GRPC::Call.new(response, opts) + end + + private + + def rpc_execute(rpc_name, input, **opts) + rpc_name, input_enc, output_enc, rpc_opts = @options.grpc_rpcs[rpc_name.to_s] || raise(Error, "#{rpc_name}: undefined service") + + exec_opts = rpc_opts.merge(opts) + + marshal_method ||= exec_opts.delete(:marshal_method) || MARSHAL_METHOD + unmarshal_method ||= exec_opts.delete(:unmarshal_method) || UNMARSHAL_METHOD + + messages = if input.respond_to?(:each) + Enumerator.new do |y| + input.each do |message| + y << input_enc.__send__(marshal_method, message) + end + end + else + input_enc.marshal(input) + end + + call = execute(rpc_name, messages, **exec_opts) + + call.decoder = output_enc.method(unmarshal_method) + + call + end + + def build_grpc_request(rpc_method, input, deadline:, metadata: nil, **) + uri = @options.origin.dup + rpc_method = "/#{rpc_method}" unless rpc_method.start_with?("/") + rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service + uri.path = rpc_method + + headers = HEADERS.merge( + "grpc-accept-encoding" => ["identity", *@options.encodings.registry.keys] + ) + unless deadline == Float::INFINITY + # convert to milliseconds + deadline = (deadline * 1000.0).to_i + headers["grpc-timeout"] = "#{deadline}m" + end + + headers = headers.merge(metadata) if metadata + + # prepare compressor + deflater = nil + compression = @options.grpc_compression == true ? "gzip" : @options.grpc_compression + + if compression + headers["grpc-encoding"] = compression + deflater = @options.encodings.registry(compression).deflater + end + + headers.merge!(@options.call_credentials.call) if @options.call_credentials + + body = if input.respond_to?(:each) + Enumerator.new do |y| + input.each do |message| + y << Message.encode(message, deflater: deflater) + end + end + else + Message.encode(input, deflater: deflater) + end + + build_request(:post, uri, headers: headers, body: body) + end + + def respond_to_missing?(meth, *, &blk) + @options.grpc_rpcs.key?(meth.to_s) || super + end + + def method_missing(meth, *args, **kwargs, &blk) + return rpc_execute(meth, *args, **kwargs, &blk) if @options.grpc_rpcs.key?(meth.to_s) + + super + end + end + end + register_plugin :grpc, GRPC + end +end diff --git a/lib/httpx/plugins/grpc/call.rb b/lib/httpx/plugins/grpc/call.rb new file mode 100644 index 00000000..50a98556 --- /dev/null +++ b/lib/httpx/plugins/grpc/call.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +module HTTPX + module Plugins + module GRPC + # Encapsulates call information + class Call + attr_writer :decoder + + def initialize(response, options) + @response = response + @options = options + @decoder = ->(z) { z } + end + + def inspect + "#GRPC::Call(#{grpc_response})" + end + + def to_s + grpc_response.to_s + end + + def metadata + response.headers + end + + def trailing_metadata + return unless @response.body.closed? + + @response.trailing_metadata + end + + private + + def grpc_response + return @grpc_response if defined?(@grpc_response) + + @grpc_response = if @response.respond_to?(:each) + Enumerator.new do |y| + Message.stream(@response).each do |message| + y << @decoder.call(message) + end + end + else + @decoder.call(Message.unary(@response)) + end + end + + def respond_to_missing?(meth, *args, &blk) + grpc_response.respond_to?(meth, *args) || super + end + + def method_missing(meth, *args, &blk) + return grpc_response.__send__(meth, *args, &blk) if grpc_response.respond_to?(meth) + + super + end + end + end + end +end diff --git a/lib/httpx/plugins/grpc/message.rb b/lib/httpx/plugins/grpc/message.rb new file mode 100644 index 00000000..ea0de054 --- /dev/null +++ b/lib/httpx/plugins/grpc/message.rb @@ -0,0 +1,85 @@ +# frozen_string_literal: true + +module HTTPX + module Plugins + module GRPC + # Encoding module for GRPC responses + # + # Can encode and decode grpc messages. + module Message + module_function + + # decodes a unary grpc response + def unary(response) + verify_status(response) + decode(response.to_s, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders) + end + + # lazy decodes a grpc stream response + def stream(response, &block) + return enum_for(__method__, response) unless block_given? + + response.each do |frame| + decode(frame, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders, &block) + end + + verify_status(response) + end + + # encodes a single grpc message + def encode(bytes, deflater:) + if deflater + compressed_flag = 1 + bytes = deflater.deflate(StringIO.new(bytes)) + else + compressed_flag = 0 + end + + "".b << [compressed_flag, bytes.bytesize].pack("CL>") << bytes.to_s + end + + # decodes a single grpc message + def decode(message, encodings:, encoders:) + until message.empty? + + compressed, size = message.unpack("CL>") + + data = message.byteslice(5..size + 5 - 1) + if compressed == 1 + encodings.reverse_each do |algo| + inflater = encoders.registry(algo).inflater(size) + data = inflater.inflate(data) + size = data.bytesize + end + end + + return data unless block_given? + + yield data + + message = message.byteslice(5 + size..-1) + end + end + + def cancel(request) + request.emit(:refuse, :client_cancellation) + end + + # interprets the grpc call trailing metadata, and raises an + # exception in case of error code + def verify_status(response) + # return standard errors if need be + response.raise_for_status + + status = Integer(response.headers["grpc-status"]) + message = response.headers["grpc-message"] + + return if status.zero? + + response.close + raise GRPCError.new(status, message, response.trailing_metadata) + end + end + end + end +end diff --git a/lib/httpx/plugins/rate_limiter.rb b/lib/httpx/plugins/rate_limiter.rb index bb0a2bc8..804eb99f 100644 --- a/lib/httpx/plugins/rate_limiter.rb +++ b/lib/httpx/plugins/rate_limiter.rb @@ -15,7 +15,7 @@ module HTTPX class << self RATE_LIMIT_CODES = [429, 503].freeze - def load_dependencies(klass) + def configure(klass) klass.plugin(:retries, retry_change_requests: true, retry_on: method(:retry_on_rate_limited_response), diff --git a/lib/httpx/plugins/stream.rb b/lib/httpx/plugins/stream.rb index f02bfbf9..1b20d9de 100644 --- a/lib/httpx/plugins/stream.rb +++ b/lib/httpx/plugins/stream.rb @@ -1,6 +1,93 @@ # frozen_string_literal: true module HTTPX + class StreamResponse + def initialize(request, session, connections) + @request = request + @session = session + @connections = connections + @options = @request.options + end + + def each(&block) + return enum_for(__method__) unless block_given? + + raise Error, "response already streamed" if @response + + @request.stream = self + + begin + @on_chunk = block + + if @request.response + # if we've already started collecting the payload, yield it first + # before proceeding + body = @request.response.body + + body.each do |chunk| + on_chunk(chunk) + end + end + + response.raise_for_status + response.close + ensure + @on_chunk = nil + end + end + + def each_line + return enum_for(__method__) unless block_given? + + line = +"" + + each do |chunk| + line << chunk + + while (idx = line.index("\n")) + yield line.byteslice(0..idx - 1) + + line = line.byteslice(idx + 1..-1) + end + end + end + + # This is a ghost method. It's to be used ONLY internally, when processing streams + def on_chunk(chunk) + raise NoMethodError unless @on_chunk + + @on_chunk.call(chunk) + end + + # :nocov: + def inspect + "#" + end + # :nocov: + + def to_s + response.to_s + end + + private + + def response + @session.__send__(:receive_requests, [@request], @connections, @options) until @request.response + + @request.response + end + + def respond_to_missing?(meth, *args) + response.respond_to?(meth, *args) || super + end + + def method_missing(meth, *args, &block) + return super unless response.respond_to?(meth) + + response.__send__(meth, *args, &block) + end + end + module Plugins # # This plugin adds support for stream response (text/event-stream). @@ -15,10 +102,13 @@ module HTTPX return super(*args, **options) unless stream requests = args.first.is_a?(Request) ? args : build_requests(*args, options) - raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1 - StreamResponse.new(requests.first, self) + request = requests.first + + connections = _send_requests(requests, request.options) + + StreamResponse.new(request, self, connections) end end @@ -53,78 +143,10 @@ module HTTPX end end - class StreamResponse - def initialize(request, session) - @request = request - @session = session - @options = @request.options - end - - def each(&block) - return enum_for(__method__) unless block_given? - - raise Error, "response already streamed" if @response - - @request.stream = self - - begin - @on_chunk = block - - response.raise_for_status - response.close - ensure - @on_chunk = nil - end - end - - def each_line - return enum_for(__method__) unless block_given? - - line = +"" - - each do |chunk| - line << chunk - - while (idx = line.index("\n")) - yield line.byteslice(0..idx - 1) - - line = line.byteslice(idx + 1..-1) - end - end - end - - # This is a ghost method. It's to be used ONLY internally, when processing streams - def on_chunk(chunk) - raise NoMethodError unless @on_chunk - - @on_chunk.call(chunk) - end - - # :nocov: - def inspect - "#" - end - # :nocov: - - def to_s - response.to_s - end - - private - - def response - @response ||= @session.__send__(:send_requests, @request, @options).first - end - - def respond_to_missing?(*args) - @options.response_class.respond_to?(*args) || super - end - - def method_missing(meth, *args, &block) - return super unless @options.response_class.public_method_defined?(meth) - - response.__send__(meth, *args, &block) - end + def self.const_missing(const_name) + super unless const_name == :StreamResponse + warn "DEPRECATION WARNING: the class #{self}::StreamResponse is deprecated. Use HTTPX::StreamResponse instead." + HTTPX::StreamResponse end end register_plugin :stream, Stream diff --git a/lib/httpx/request.rb b/lib/httpx/request.rb index dc502639..5aca7506 100644 --- a/lib/httpx/request.rb +++ b/lib/httpx/request.rb @@ -35,14 +35,22 @@ module HTTPX attr_reader :verb, :uri, :headers, :body, :state, :options, :response + # Exception raised during enumerable body writes + attr_reader :drain_error + def_delegator :@body, :empty? def_delegator :@body, :chunk! def initialize(verb, uri, options = {}) @verb = verb.to_s.downcase.to_sym - @uri = Utils.uri(uri) @options = Options.new(options) + @uri = Utils.uri(uri) + if @uri.relative? + raise(Error, "invalid URI: #{@uri}") unless @options.origin + + @uri = @options.origin.merge(@uri) + end raise(Error, "unknown method: #{verb}") unless METHODS.include?(@verb) @@ -54,6 +62,14 @@ module HTTPX @state = :idle end + def trailers? + defined?(@trailers) + end + + def trailers + @trailers ||= @options.headers_class.new + end + def interests return :r if @state == :done || @state == :expect @@ -124,6 +140,9 @@ module HTTPX chunk.dup rescue StopIteration nil + rescue StandardError => e + @drain_error = e + nil end # :nocov: @@ -200,7 +219,9 @@ module HTTPX end def unbounded_body? - chunked? || @body.bytesize == Float::INFINITY + return @unbounded_body if defined?(@unbounded_body) + + @unbounded_body = (chunked? || @body.bytesize == Float::INFINITY) end def chunked? @@ -253,6 +274,8 @@ module HTTPX nextstate = :expect end end + when :trailers + return unless @state == :body when :done return if @state == :expect end diff --git a/lib/httpx/response.rb b/lib/httpx/response.rb index ccf4759f..00823d93 100644 --- a/lib/httpx/response.rb +++ b/lib/httpx/response.rb @@ -94,6 +94,10 @@ module HTTPX @state = :idle end + def closed? + @state == :closed + end + def write(chunk) return if @state == :closed diff --git a/lib/httpx/session.rb b/lib/httpx/session.rb index af77844c..047e189b 100644 --- a/lib/httpx/session.rb +++ b/lib/httpx/session.rb @@ -175,12 +175,18 @@ module HTTPX end def send_requests(*requests, options) - connections = [] request_options = @options.merge(options) + connections = _send_requests(requests, request_options) + receive_requests(requests, connections, request_options) + end + + def _send_requests(requests, options) + connections = [] + requests.each do |request| error = catch(:resolve_error) do - connection = find_connection(request, connections, request_options) + connection = find_connection(request, connections, options) connection.send(request) end next unless error.is_a?(ResolveError) @@ -188,13 +194,17 @@ module HTTPX request.emit(:response, ErrorResponse.new(request, error, options)) end + connections + end + + def receive_requests(requests, connections, options) responses = [] begin # guarantee ordered responses loop do request = requests.first - pool.next_tick until (response = fetch_response(request, connections, request_options)) + pool.next_tick until (response = fetch_response(request, connections, options)) responses << response requests.shift @@ -208,7 +218,7 @@ module HTTPX # opportunity to traverse the requests, hence we're returning only a fraction of the errors # we were supposed to. This effectively fetches the existing responses and return them. while (request = requests.shift) - responses << fetch_response(request, connections, request_options) + responses << fetch_response(request, connections, options) end break end diff --git a/lib/httpx/transcoder/chunker.rb b/lib/httpx/transcoder/chunker.rb index ee7d5744..80d2f088 100644 --- a/lib/httpx/transcoder/chunker.rb +++ b/lib/httpx/transcoder/chunker.rb @@ -20,7 +20,7 @@ module HTTPX::Transcoder @raw.each do |chunk| yield "#{chunk.bytesize.to_s(16)}#{CRLF}#{chunk}#{CRLF}" end - yield "0#{CRLF}#{CRLF}" + yield "0#{CRLF}" end def respond_to_missing?(meth, *args) diff --git a/sig/callbacks.rbs b/sig/callbacks.rbs index 7046047a..ecbe40c8 100644 --- a/sig/callbacks.rbs +++ b/sig/callbacks.rbs @@ -9,6 +9,7 @@ module HTTPX def only: (Symbol) { (*untyped) -> void } -> void def emit: (Symbol, *untyped) -> void + def callbacks_for?: (Symbol) -> bool def callbacks: () -> Hash[Symbol, Array[_Callable]] | (Symbol) -> Array[_Callable] end diff --git a/sig/connection/http1.rbs b/sig/connection/http1.rbs index 48ca8b41..ece85766 100644 --- a/sig/connection/http1.rbs +++ b/sig/connection/http1.rbs @@ -60,6 +60,10 @@ module HTTPX def join_headers: (Request request) -> void + def join_trailers: (Request request) -> void + + def join_headers2: (Headers headers) -> void + def join_body: (Request request) -> void def capitalized: (String field) -> String diff --git a/sig/connection/http2.rbs b/sig/connection/http2.rbs index 40e3f42c..57ce573e 100644 --- a/sig/connection/http2.rbs +++ b/sig/connection/http2.rbs @@ -53,6 +53,8 @@ module HTTPX def join_headers: (HTTP2Next::Stream stream, Request request) -> void + def join_trailers: (HTTP2Next::Stream stream, Request request) -> void + def join_body: (HTTP2Next::Stream stream, Request request) -> void def on_stream_headers: (HTTP2Next::Stream stream, Request request, Array[[String, String]] headers) -> void @@ -61,7 +63,7 @@ module HTTPX def on_stream_data: (HTTP2Next::Stream stream, Request request, string data) -> void - def on_stream_close: (HTTP2Next::Stream stream, Request request, Symbol? error) -> void + def on_stream_close: (HTTP2Next::Stream stream, Request request, (Symbol | StandardError)? error) -> void def on_frame: (string bytes) -> void diff --git a/sig/options.rbs b/sig/options.rbs index 7146db43..f61917ca 100644 --- a/sig/options.rbs +++ b/sig/options.rbs @@ -11,6 +11,10 @@ module HTTPX def self.new: (options) -> instance | () -> instance + # headers + attr_reader uri: URI? + def uri=: (uri) -> void + # headers attr_reader headers: Headers? def headers=: (headers) -> void diff --git a/sig/plugins/aws_sdk_authentication.rbs b/sig/plugins/aws_sdk_authentication.rbs index d7b207f6..acfc7ef1 100644 --- a/sig/plugins/aws_sdk_authentication.rbs +++ b/sig/plugins/aws_sdk_authentication.rbs @@ -7,6 +7,8 @@ module HTTPX def self.load_dependencies: (singleton(Session)) -> void + def self.configure: (singleton(Session)) -> void + def self.extra_options: (Options) -> (Options) module InstanceMethods diff --git a/sig/plugins/basic_authentication.rbs b/sig/plugins/basic_authentication.rbs index 80dce5e2..b92d00d0 100644 --- a/sig/plugins/basic_authentication.rbs +++ b/sig/plugins/basic_authentication.rbs @@ -3,6 +3,8 @@ module HTTPX module BasicAuthentication def self.load_dependencies: (singleton(Session)) -> void + def self.configure: (singleton(Session)) -> void + module InstanceMethods def basic_authentication: (string user, string password) -> instance end diff --git a/sig/plugins/compression.rbs b/sig/plugins/compression.rbs index 294655da..148d22c1 100644 --- a/sig/plugins/compression.rbs +++ b/sig/plugins/compression.rbs @@ -6,8 +6,8 @@ module HTTPX type deflatable = _Reader | _ToS interface _Deflater - def deflate: (deflatable, _Writer, chunk_size: Integer) -> void - | (deflatable, _Writer, chunk_size: Integer) { (String) -> void } -> void + def deflate: (deflatable, ?_Writer, ?chunk_size: Integer) -> _ToS + | (deflatable, ?_Writer, ?chunk_size: Integer) { (String) -> void } -> _ToS end interface _Inflater diff --git a/sig/plugins/stream.rbs b/sig/plugins/stream.rbs index 12509cf6..6b7a57b3 100644 --- a/sig/plugins/stream.rbs +++ b/sig/plugins/stream.rbs @@ -1,4 +1,21 @@ module HTTPX + class StreamResponse + include _ToS + + def each: () { (String) -> void } -> void + | () -> Enumerable[String] + + def each_line: () { (String) -> void } -> void + | () -> Enumerable[String] + + def on_chunk: (string) -> void + + private + + def response: () -> response + def initialize: (Request, Session, Array[Connection]) -> untyped + end + module Plugins module Stream module InstanceMethods @@ -16,22 +33,6 @@ module HTTPX def stream: () -> StreamResponse? end - class StreamResponse - include _ToS - - def each: () { (String) -> void } -> void - | () -> Enumerable[String] - - def each_line: () { (String) -> void } -> void - | () -> Enumerable[String] - - def on_chunk: (string) -> void - - private - - def response: () -> response - def initialize: (Request, Session) -> untyped - end end type sessionStream = Session & Plugins::Stream::InstanceMethods diff --git a/sig/request.rbs b/sig/request.rbs index bd752af3..49f93596 100644 --- a/sig/request.rbs +++ b/sig/request.rbs @@ -11,7 +11,8 @@ module HTTPX attr_reader body: Body attr_reader state: Symbol attr_reader options: Options - attr_reader response: Response? + attr_reader response: response? + attr_reader drain_error: StandardError? def initialize: (verb | String, uri, ?options?) -> untyped @@ -21,7 +22,7 @@ module HTTPX def scheme: () -> ("http" | "https") - def response=: (Response response) -> void + def response=: (response) -> void def path: () -> String @@ -39,6 +40,10 @@ module HTTPX def expects?: () -> boolish + def trailers: () -> Headers + + def trailers?: () -> boolish + class Body def initialize: (Headers, Options) -> untyped def each: () { (String) -> void } -> void diff --git a/sig/response.rbs b/sig/response.rbs index f30714a2..451c3918 100644 --- a/sig/response.rbs +++ b/sig/response.rbs @@ -47,6 +47,7 @@ module HTTPX def empty?: () -> bool def copy_to: (_ToPath | _Writer destination) -> void def close: () -> void + def closed?: () -> bool private diff --git a/sig/session.rbs b/sig/session.rbs index fe01c37f..14b64822 100644 --- a/sig/session.rbs +++ b/sig/session.rbs @@ -45,5 +45,9 @@ module HTTPX def build_connection: (URI, Options) -> Connection def send_requests: (*Request, options) -> Array[response] + + def _send_requests: (Array[Request], options) -> Array[Connection] + + def receive_requests: (Array[Request], Array[Connection], options) -> Array[response] end end \ No newline at end of file diff --git a/test/error_response_test.rb b/test/error_response_test.rb index a4607c8d..dd5de26d 100644 --- a/test/error_response_test.rb +++ b/test/error_response_test.rb @@ -33,6 +33,6 @@ class ErrorResponseTest < Minitest::Test private def request_mock - Request.new(:get, "/") + Request.new(:get, "http://example.com/") end end diff --git a/test/http_test.rb b/test/http_test.rb index 09701705..be2934c2 100644 --- a/test/http_test.rb +++ b/test/http_test.rb @@ -29,6 +29,7 @@ class HTTPTest < Minitest::Test include Plugins::Stream include Plugins::AWSAuthentication include Plugins::Upgrade + include Plugins::GRPC unless RUBY_ENGINE == "jruby" || RUBY_VERSION < "2.3" def test_verbose_log log = StringIO.new diff --git a/test/https_test.rb b/test/https_test.rb index 85ec687d..06ab9762 100644 --- a/test/https_test.rb +++ b/test/https_test.rb @@ -30,6 +30,7 @@ class HTTPSTest < Minitest::Test include Plugins::Stream include Plugins::AWSAuthentication include Plugins::Upgrade + include Plugins::GRPC unless RUBY_ENGINE == "jruby" || RUBY_VERSION < "2.3" def test_connection_coalescing coalesced_origin = "https://#{ENV["HTTPBIN_COALESCING_HOST"]}" @@ -108,6 +109,30 @@ class HTTPSTest < Minitest::Test verify_error_response(response, /settings_timeout/) end end + + def test_http2_request_trailers + uri = build_uri("/post") + + HTTPX.wrap do |http| + total_time = start_time = nil + trailered = false + request = http.build_request(:post, uri, body: %w[this is chunked]) + request.on(:headers) do |_written_request| + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + request.on(:trailers) do |written_request| + total_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time + written_request.trailers["x-time-spent"] = total_time + trailered = true + end + response = http.request(request) + verify_status(response, 200) + body = json_body(response) + # verify_header(body["headers"], "x-time-spent", total_time.to_s) + assert body.key?("data") + assert trailered, "trailer callback wasn't called" + end + end end def test_ssl_wrong_hostname diff --git a/test/request_test.rb b/test/request_test.rb index 39a9430f..fde0bebc 100644 --- a/test/request_test.rb +++ b/test/request_test.rb @@ -6,14 +6,14 @@ class RequestTest < Minitest::Test include HTTPX def test_request_unsupported_body - ex = assert_raises(HTTPX::Error) { Request.new(:post, "/", body: Object.new) } + ex = assert_raises(HTTPX::Error) { Request.new(:post, "http://example.com/", body: Object.new) } assert ex.message =~ /cannot determine size of body/ end def test_request_verb - r1 = Request.new(:get, "/") + r1 = Request.new(:get, "http://example.com/") assert r1.verb == :get, "unexpected verb (#{r1.verb})" - r2 = Request.new("GET", "/") + r2 = Request.new("GET", "http://example.com/") assert r2.verb == :get, "unexpected verb (#{r1.verb})" end @@ -57,21 +57,21 @@ class RequestTest < Minitest::Test end def test_request_body_raw - req = Request.new(:post, "/", body: "bang") + req = Request.new(:post, "http://example.com/", body: "bang") assert !req.body.empty?, "body should exist" assert req.headers["content-type"] == "application/octet-stream", "content type is wrong" assert req.headers["content-length"] == "4", "content length is wrong" end def test_request_body_form - req = Request.new(:post, "/", form: { "foo" => "bar" }) + req = Request.new(:post, "http://example.com/", form: { "foo" => "bar" }) assert !req.body.empty?, "body should exist" assert req.headers["content-type"] == "application/x-www-form-urlencoded", "content type is wrong" assert req.headers["content-length"] == "7", "content length is wrong" end def test_request_body_json - req = Request.new(:post, "/", json: { "foo" => "bar" }) + req = Request.new(:post, "http://example.com/", json: { "foo" => "bar" }) assert !req.body.empty?, "body should exist" assert req.headers["content-type"] == "application/json; charset=utf-8", "content type is wrong" assert req.headers["content-length"] == "13", "content length is wrong" diff --git a/test/session_test.rb b/test/session_test.rb index 7a80c5d6..7ead3a79 100644 --- a/test/session_test.rb +++ b/test/session_test.rb @@ -27,7 +27,7 @@ class SessionTest < Minitest::Test assert session.options.respond_to?(:foo), "options methods weren't added" assert session.options.foo == "options-foo", "option method is unexpected" - request = session.options.request_class.new(:get, "/", session.options) + request = session.options.request_class.new(:get, "http://example.com/", session.options) assert request.respond_to?(:foo), "request methods haven't been added" assert request.foo == "request-foo", "request method is unexpected" assert request.headers.respond_to?(:foo), "headers methods haven't been added" diff --git a/test/support/grpc_helpers.rb b/test/support/grpc_helpers.rb new file mode 100644 index 00000000..ce6e028b --- /dev/null +++ b/test/support/grpc_helpers.rb @@ -0,0 +1,203 @@ +# frozen_string_literal: true + +begin + require "grpc" + require "logging" + + # A test message + class EchoMsg + attr_reader :msg + + def initialize(msg: "") + @msg = msg + end + + def self.marshal(o) + o.msg + end + + def self.unmarshal(msg) + EchoMsg.new(msg: msg) + end + end + + # a test service that checks the cert of its peer + class TestService + include GRPC::GenericService + rpc :an_rpc, EchoMsg, EchoMsg + rpc :a_cancellable_rpc, EchoMsg, EchoMsg + rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg + rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) + rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) + + def check_peer_cert(call) + # error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}" + # fail(error_msg) unless call.peer_cert == client_cert + end + + def an_rpc(req, call) + check_peer_cert(call) + req + end + + def a_cancellable_rpc(_req, call) + check_peer_cert(call) + raise GRPC::Cancelled, "dump" + end + + def a_client_streaming_rpc(call) + check_peer_cert(call) + call.each_remote_read.each { |r| GRPC.logger.info(r) } + EchoMsg.new(msg: "client stream pong") + end + + def a_server_streaming_rpc(_, call) + check_peer_cert(call) + call.send_initial_metadata + [EchoMsg.new(msg: "server stream pong"), EchoMsg.new(msg: "server stream pong")] + end + + def a_bidi_rpc(requests, call) + check_peer_cert(call) + requests.each { |r| GRPC.logger.info(r) } + call.send_initial_metadata + [EchoMsg.new(msg: "bidi pong"), EchoMsg.new(msg: "bidi pong")] + end + end + + if ENV.key?("HTTPX_DEBUG") + log_level = ENV["HTTPX_DEBUG"].to_i + log_level = log_level > 1 ? :debug : :info + + module GRPC + extend Logging.globally + end + Logging.logger.root.appenders = Logging.appenders.stdout + Logging.logger.root.level = log_level + Logging.logger["GRPC"].level = log_level + Logging.logger["GRPC::ActiveCall"].level = log_level + Logging.logger["GRPC::BidiCall"].level = log_level + end + + module GRPCHelpers + include ::GRPC::Core::StatusCodes + include ::GRPC::Core::TimeConsts + include ::GRPC::Core::CallOps + + private + + def teardown + super + if @grpc_server + + @grpc_server.shutdown_and_notify(from_relative_time(2)) + @grpc_server.close + @grpc_server_th.join if @grpc_server_th + end + + return unless @rpc_server && !@rpc_server.stopped? + + @rpc_server.stop + @rpc_server_th.join + end + + def grpc_plugin + grpc = HTTPX.plugin(:grpc) + + grpc = grpc.with_channel_credentials(*channel_credentials_paths, hostname: "foo.test.google.fr") if origin.start_with?("https") + + grpc + end + + def grpc_channel_uri(server_port) + scheme = URI(origin).scheme + "#{scheme}://localhost:#{server_port}" + end + + def run_rpc(service, server_args: {}) + @rpc_server = ::GRPC::RpcServer.new(server_args: server_args.merge("grpc.so_reuseport" => 0)) + + cred = origin.start_with?("https") ? server_credentials : :this_port_is_insecure + + server_port = @rpc_server.add_http2_port("localhost:0", cred) + @rpc_server.handle(service) + + @rpc_server_th = Thread.new { @rpc_server.run } + @rpc_server.wait_till_running + + server_port + end + + def run_request_response(resp, status, marshal: nil, server_args: {}, server_initial_md: {}, server_trailing_md: {}) + @grpc_server = ::GRPC::Core::Server.new(server_args.merge("grpc.so_reuseport" => 0)) + + cred = origin.start_with?("https") ? server_credentials : :this_port_is_insecure + + server_port = @grpc_server.add_http2_port("localhost:0", cred) + + @grpc_server_th = wakey_thread do |notifier| + c = expect_server_to_be_invoked(notifier, metadata_to_send: server_initial_md, marshal: marshal) + begin + yield c + ensure + c.remote_send(resp) + c.send_status(status, status == OK ? "OK" : "NOK", true, metadata: server_trailing_md) + c.send(:set_input_stream_done) + c.send(:set_output_stream_done) + end + end + + server_port + end + + def wakey_thread(&blk) + n = ::GRPC::Notifier.new + t = Thread.new do + begin + blk.call(n) + rescue GRPC::Core::CallError + end + end + t.abort_on_exception = true + n.wait + t + end + + def expect_server_to_be_invoked(notifier, metadata_to_send: nil, marshal: nil) + @grpc_server.start + notifier.notify(nil) + recvd_rpc = @grpc_server.request_call + recvd_call = recvd_rpc.call + recvd_call.metadata = recvd_rpc.metadata + recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) + ::GRPC::ActiveCall.new(recvd_call, marshal, marshal, INFINITE_FUTURE, metadata_received: true) + end + + def server_credentials + creds = ["ca.pem", "server1.key", "server1.pem"] + .map { |path| File.join(grpc_testdata_path, path) } + .map(&File.method(:read)) + + GRPC::Core::ServerCredentials.new( + creds[0], + [{ private_key: creds[1], cert_chain: creds[2] }], + true + ) # force client auth + end + + def channel_credentials_paths + ["ca.pem", "client.key", "client.pem"] + .map { |path| File.join(grpc_testdata_path, path) } + # .map(&File.method(:read)) + # GRPC::Core::ChannelCredentials.new(*creds) + end + + def grpc_testdata_path + grpc_path = Gem::Specification.find_by_path("grpc").full_gem_path + File.join(grpc_path, "src", "ruby", "spec", "testdata") + end + end +rescue LoadError + module GRPCHelpers + end +end diff --git a/test/support/requests/get.rb b/test/support/requests/get.rb index e4d7083c..be77d4e1 100644 --- a/test/support/requests/get.rb +++ b/test/support/requests/get.rb @@ -3,6 +3,8 @@ require "time" module Requests + using HTTPX::URIExtensions + module Get def test_http_get uri = build_uri("/get") @@ -11,6 +13,13 @@ module Requests verify_body_length(response) end + def test_http_get_option_origin + uri = URI(build_uri("/get")) + response = HTTPX.with(origin: uri.origin).get(uri.path) + verify_status(response, 200) + verify_body_length(response) + end + def test_http_request uri = build_uri("/get") response = HTTPX.request(:get, uri) diff --git a/test/support/requests/plugins/grpc.rb b/test/support/requests/plugins/grpc.rb new file mode 100644 index 00000000..fb653446 --- /dev/null +++ b/test/support/requests/plugins/grpc.rb @@ -0,0 +1,204 @@ +# frozen_string_literal: true + +module Requests + module Plugins + module GRPC + include GRPCHelpers + + def test_plugin_grpc_unary_plain_bytestreams + no_marshal = proc { |x| x } + + server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call| + assert call.remote_read == "a_request" + assert call.metadata["k1"] == "v1" + assert call.metadata["k2"] == "v2" + end + + grpc = grpc_plugin + # build service + stub = grpc.build_stub("localhost:#{server_port}") + result = stub.execute("an_rpc_method", "a_request", metadata: { k1: "v1", k2: "v2" }) + + assert result.to_s == "a_reply" + end + + def test_plugin_grpc_call_credentials + return unless origin.start_with?("https") + + call_credentials = -> { { "k1" => "updated-k1" } } + no_marshal = proc { |x| x } + + server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call| + assert call.remote_read == "a_request" + assert call.metadata["k1"] == "updated-k1" + assert call.metadata["k2"] == "v2" + end + + grpc = grpc_plugin + # build service + stub = grpc.with_call_credentials(call_credentials).build_stub("localhost:#{server_port}") + result = stub.execute("an_rpc_method", "a_request", metadata: { k1: "v1", k2: "v2" }) + + assert result.to_s == "a_reply" + end + + def test_plugin_grpc_compressed_request + no_marshal = proc { |x| x } + + server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call| + # assert call.metadata["grpc-encoding"] == "gzip", "request wasn't compressed" + # TODO: find a way to test if request payload was indeed compressed + assert call.remote_read == "A" * 2000 + end + + grpc = grpc_plugin + # build service + stub = grpc.build_stub("localhost:#{server_port}", compression: "gzip") + result = stub.execute("an_rpc_method", "A" * 2000) + + assert result.to_s == "a_reply" + end + + def test_plugin_grpc_compressed_response + no_marshal = proc { |x| x } + + server_port = run_request_response("A" * 2000, OK, marshal: no_marshal, + server_initial_md: { "grpc-internal-encoding-request" => "gzip" }) do |call| + assert call.remote_read == "a_request" + end + + grpc = grpc_plugin + # build service + stub = grpc.build_stub("localhost:#{server_port}") + result = stub.execute("an_rpc_method", "a_request") + + assert result.to_s == "A" * 2000 + end + + # Cancellation on error + + def test_plugin_grpc_deadline_exceeded + no_marshal = proc { |x| x } + + server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call| + sleep(3) + assert call.remote_read == "a_request" + end + + grpc = grpc_plugin + # build service + stub = grpc.build_stub("localhost:#{server_port}") + + error = assert_raises(HTTPX::GRPCError) { stub.execute("an_rpc_method", "a request", deadline: 2).to_s } + assert error.status == ::GRPC::Core::StatusCodes::DEADLINE_EXCEEDED + end + + def test_plugin_grpc_cancellation_on_client_error + no_marshal = proc { |x| x } + + input = Enumerator.new do |_y| + # y << "a_request" + raise "oh crap" + end + + server_port = run_request_response("a_reply", OK, marshal: no_marshal) do |call| + # not supposed to arrive here + begin + call.remote_read + rescue StandardError + nil + end + end + + grpc = grpc_plugin + # build service + stub = grpc.build_stub("localhost:#{server_port}") + + error = assert_raises(HTTPX::Error) { stub.execute("an_rpc_method", input).to_s } + assert error.message =~ /oh crap/ + end + + def test_plugin_grpc_cancellation_on_server_error + server_port = run_rpc(TestService) + + grpc = grpc_plugin + + # build service + test_service_rpcs = grpc.rpc(:a_cancellable_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal) + test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService) + error = assert_raises(HTTPX::GRPCError) { test_service_stub.a_cancellable_rpc(EchoMsg.new(msg: "ping")).to_s } + + assert error.status == 1 + assert error.details == "dump" + end + + def test_plugin_grpc_unary_protobuf + server_port = run_rpc(TestService) + + grpc = grpc_plugin + + # build service + test_service_rpcs = grpc.rpc(:an_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal) + test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService) + echo_response = test_service_stub.an_rpc(EchoMsg.new(msg: "ping")) + + assert echo_response.msg == "ping" + assert echo_response.trailing_metadata["grpc-message"] == "OK" + end + + def test_plugin_grpc_client_stream_protobuf + server_port = run_rpc(TestService) + + grpc = grpc_plugin + + # build service + test_service_rpcs = grpc.rpc(:a_client_streaming_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal) + test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService) + + input = [EchoMsg.new(msg: "ping"), EchoMsg.new(msg: "ping")] + response = test_service_stub.a_client_streaming_rpc(input) + + assert response.msg == "client stream pong" + assert response.trailing_metadata["grpc-message"] == "OK" + end + + def test_plugin_grpc_server_stream_protobuf + server_port = run_rpc(TestService) + + grpc = grpc_plugin + + # build service + test_service_rpcs = grpc.rpc(:a_server_streaming_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal, + stream: true) + test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService) + streaming_response = test_service_stub.a_server_streaming_rpc(EchoMsg.new(msg: "ping")) + + assert streaming_response.respond_to?(:each) + assert streaming_response.trailing_metadata.nil? + + echo_responses = streaming_response.each.to_a + assert echo_responses.map(&:msg) == ["server stream pong", "server stream pong"] + assert streaming_response.trailing_metadata["grpc-message"] == "OK" + end + + def test_plugin_grpc_bidi_stream_protobuf + server_port = run_rpc(TestService) + + grpc = grpc_plugin + + # build service + test_service_rpcs = grpc.rpc(:a_bidi_rpc, EchoMsg, EchoMsg, marshal_method: :marshal, unmarshal_method: :unmarshal, stream: true) + test_service_stub = test_service_rpcs.build_stub("localhost:#{server_port}", service: TestService) + input = [EchoMsg.new(msg: "ping"), EchoMsg.new(msg: "ping")] + streaming_response = test_service_stub.a_bidi_rpc(input) + + assert streaming_response.respond_to?(:each) + assert streaming_response.trailing_metadata.nil? + + echo_responses = streaming_response.each.to_a + assert echo_responses.map(&:msg) == ["bidi pong", "bidi pong"] + assert streaming_response.trailing_metadata["grpc-message"] == "OK" + end + end + end +end diff --git a/test/support/requests/with_chunked_body.rb b/test/support/requests/with_chunked_body.rb index ff72cc90..131b3db8 100644 --- a/test/support/requests/with_chunked_body.rb +++ b/test/support/requests/with_chunked_body.rb @@ -14,6 +14,34 @@ module Requests # assert body["data"] == "thisisachunkedresponse", # "unexpected body (#{body["data"]})" end + + define_method :"test_#{meth}_chunked_body_trailer" do + uri = build_uri("/#{meth}") + + http = HTTPX.with_headers("transfer-encoding" => "chunked") + + total_time = start_time = nil + trailered = false + request = http.build_request(meth, uri, headers: { "trailer" => "X-Time-Spent" }, body: %w[this is a chunked response]) + request.on(:headers) do |_written_request| + start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + request.on(:trailers) do |written_request| + total_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start_time + written_request.trailers["x-time-spent"] = total_time + trailered = true + end + response = http.request(request) + verify_status(response, 200) + body = json_body(response) + verify_header(body["headers"], "Transfer-Encoding", "chunked") + # httpbin sadly doesn't receive trailers... + # verify_header(body["headers"], "X-Time-Spent", total_time.to_s) + assert body.key?("data") + assert trailered, "trailer callback wasn't called" + # assert body["data"] == "thisisachunkedresponse", + # "unexpected body (#{body["data"]})" + end end end end