allowing .deflate to be called synchronously

This commit is contained in:
HoneyryderChuck 2021-04-28 19:00:54 +01:00
parent 4d0750c950
commit 837c7ddf17
6 changed files with 57 additions and 23 deletions

View File

@ -18,12 +18,13 @@ module HTTPX
module Deflater module Deflater
module_function module_function
def deflate(raw, buffer, chunk_size:) def deflate(raw, buffer = "".b, chunk_size: 16_384)
while (chunk = raw.read(chunk_size)) while (chunk = raw.read(chunk_size))
compressed = ::Brotli.deflate(chunk) compressed = ::Brotli.deflate(chunk)
buffer << compressed buffer << compressed
yield compressed if block_given? yield compressed if block_given?
end end
buffer
end end
end end

View File

@ -17,7 +17,7 @@ module HTTPX
module Deflater module Deflater
module_function module_function
def deflate(raw, buffer, chunk_size:) def deflate(raw, buffer = "".b, chunk_size: 16_384)
deflater = Zlib::Deflate.new deflater = Zlib::Deflate.new
while (chunk = raw.read(chunk_size)) while (chunk = raw.read(chunk_size))
compressed = deflater.deflate(chunk) compressed = deflater.deflate(chunk)
@ -27,6 +27,7 @@ module HTTPX
last = deflater.finish last = deflater.finish
buffer << last buffer << last
yield last if block_given? yield last if block_given?
buffer
ensure ensure
deflater.close if deflater deflater.close if deflater
end end

View File

@ -19,7 +19,7 @@ module HTTPX
@compressed_chunk = "".b @compressed_chunk = "".b
end end
def deflate(raw, buffer, chunk_size:) def deflate(raw, buffer = "".b, chunk_size: 16_384)
gzip = Zlib::GzipWriter.new(self) gzip = Zlib::GzipWriter.new(self)
begin begin
@ -38,6 +38,7 @@ module HTTPX
buffer << compressed buffer << compressed
yield compressed if block_given? yield compressed if block_given?
buffer
end end
private private

View File

@ -35,9 +35,7 @@ module HTTPX
DEADLINE = 60 DEADLINE = 60
HEADERS = { HEADERS = {
# "accept-encoding" => "identity", "content-type" => "application/grpc",
"grpc-accept-encoding" => "identity",
"content-type" => "application/grpc+proto",
"grpc-timeout" => "#{DEADLINE}S", "grpc-timeout" => "#{DEADLINE}S",
"te" => "trailers", "te" => "trailers",
"accept" => "application/grpc", "accept" => "application/grpc",
@ -54,7 +52,7 @@ module HTTPX
# decodes a unary grpc response # decodes a unary grpc response
def unary(response) def unary(response)
verify_status(response) verify_status(response)
decode(response.to_s) decode(response.to_s, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders)
end end
# lazy decodes a grpc stream response # lazy decodes a grpc stream response
@ -62,19 +60,34 @@ module HTTPX
return enum_for(__method__, response) unless block_given? return enum_for(__method__, response) unless block_given?
response.each do |frame| response.each do |frame|
yield decode(frame) yield decode(frame, encodings: response.headers.get("grpc-encoding"), encoders: response.encoders)
end end
end end
# encodes a single grpc message # encodes a single grpc message
def encode(bytes) def encode(bytes, deflater:)
"".b << [0, bytes.bytesize].pack("CL>") << bytes if deflater
compressed_flag = 1
bytes = deflater.deflate(StringIO.new(bytes))
else
compressed_flag = 0
end
"".b << [compressed_flag, bytes.bytesize].pack("CL>") << bytes.to_s
end end
# decodes a single grpc message # decodes a single grpc message
def decode(message) def decode(message, encodings:, encoders:)
_compressed, size = message.unpack("CL>") compressed, size = message.unpack("CL>")
message.byteslice(5..size + 5 - 1) data = message.byteslice(5..size + 5 - 1)
if compressed == 1
encodings.reverse_each do |algo|
inflater = encoders.registry(algo).inflater(size)
data = inflater.inflate(data)
size = data.bytesize
end
end
data
end end
# interprets the grpc call trailing metadata, and raises an # interprets the grpc call trailing metadata, and raises an
@ -125,6 +138,10 @@ module HTTPX
@trailing_metadata = Hash[trailers] @trailing_metadata = Hash[trailers]
super super
end end
def encoders
@options.encodings
end
end end
module InstanceMethods module InstanceMethods
@ -184,7 +201,10 @@ module HTTPX
rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service rpc_method = "/#{@options.grpc_service}#{rpc_method}" if @options.grpc_service
uri.path = rpc_method uri.path = rpc_method
headers = HEADERS headers = HEADERS.merge(
"grpc-accept-encoding" => ["identity", *@options.encodings.registry.keys],
)
headers = headers.merge(metadata) if metadata headers = headers.merge(metadata) if metadata
body = if input.respond_to?(:each) body = if input.respond_to?(:each)

View File

@ -6,8 +6,8 @@ module HTTPX
type deflatable = _Reader | _ToS type deflatable = _Reader | _ToS
interface _Deflater interface _Deflater
def deflate: (deflatable, _Writer, chunk_size: Integer) -> void def deflate: (deflatable, ?_Writer, ?chunk_size: Integer) -> _ToS
| (deflatable, _Writer, chunk_size: Integer) { (String) -> void } -> void | (deflatable, ?_Writer, ?chunk_size: Integer) { (String) -> void } -> _ToS
end end
interface _Inflater interface _Inflater

View File

@ -14,21 +14,32 @@ module Requests
assert call.metadata["k2"] == "v2" assert call.metadata["k2"] == "v2"
end end
# stub = ::GRPC::ClientStub.new("localhost:#{server_port}",
# :this_channel_is_insecure)
grpc = HTTPX.plugin(:grpc) grpc = HTTPX.plugin(:grpc)
# build service # build service
stub = grpc.build_stub("http://localhost:#{server_port}") stub = grpc.build_stub("http://localhost:#{server_port}")
result = stub.execute("an_rpc_method", "a_request", metadata: { k1: "v1", k2: "v2" }) result = stub.execute("an_rpc_method", "a_request", metadata: { k1: "v1", k2: "v2" })
# stub = ::GRPC::ClientStub.new("localhost:#{server_port}", :this_channel_is_insecure)
# op = stub.request_response("an_rpc_method", "a_request", no_marshal, no_marshal,
# return_op: true, metadata: { k1: "v1", k2: "v2" })
# op.start_call if run_start_call_first
# result = op.execute
assert result == "a_reply" assert result == "a_reply"
end end
def test_plugin_grpc_compressed_response
no_marshal = proc { |x| x }
server_port = run_request_response("A" * 2000, OK, marshal: no_marshal,
server_initial_md: { "grpc-internal-encoding-request" => "gzip" }) do |call|
assert call.remote_read == "a_request"
# assert call.metadata["k1"] == "v1"
# assert call.metadata["k2"] == "v2"
end
grpc = HTTPX.plugin(:grpc)
# build service
stub = grpc.build_stub("http://localhost:#{server_port}")
result = stub.execute("an_rpc_method", "a_request")
assert result == "A" * 2000
end
def test_plugin_grpc_unary_protobuf def test_plugin_grpc_unary_protobuf
server_port = run_rpc(TestService) server_port = run_rpc(TestService)