Removing HTTPX::Registry and its usage internally

These internnal registries were a bit magical to use, difficult to
debug, not thread-safe, and overall a nuisance when it came to type
checking. So long.
This commit is contained in:
HoneyryderChuck 2023-04-27 18:05:25 +01:00
parent 613e95d5f4
commit bbf257477b
41 changed files with 160 additions and 224 deletions

View File

@ -11,7 +11,6 @@ require "httpx/domain_name"
require "httpx/altsvc"
require "httpx/callbacks"
require "httpx/loggable"
require "httpx/registry"
require "httpx/transcoder"
require "httpx/timers"
require "httpx/pool"

View File

@ -29,7 +29,6 @@ module HTTPX
#
class Connection
extend Forwardable
include Registry
include Loggable
include Callbacks
@ -63,7 +62,7 @@ module HTTPX
# if there's an already open IO, get its
# peer address, and force-initiate the parser
transition(:already_open)
@io = IO.registry(@type).new(@origin, nil, @options)
@io = build_socket
parser
else
transition(:idle)
@ -82,7 +81,7 @@ module HTTPX
if @io
@io.add_addresses(addrs)
else
@io = IO.registry(@type).new(@origin, addrs, @options)
@io = build_socket(addrs)
end
end
@ -451,7 +450,7 @@ module HTTPX
end
def build_parser(protocol = @io.protocol)
parser = registry(protocol).new(@write_buffer, @options)
parser = self.class.parser_type(protocol).new(@write_buffer, @options)
set_parser_callbacks(parser)
parser
end
@ -594,6 +593,17 @@ module HTTPX
remove_instance_variable(:@timeout) if defined?(@timeout)
end
def build_socket(addrs = nil)
transport_type = case @type
when "tcp" then TCP
when "ssl" then SSL
when "unix" then UNIX
else
raise Error, "unsupported transport (#{@type})"
end
transport_type.new(@origin, addrs, @options)
end
def on_error(error)
if error.instance_of?(TimeoutError)
@ -662,5 +672,16 @@ module HTTPX
error = error_type.new(request, request.response, read_timeout)
on_error(error)
end
class << self
def parser_type(protocol)
case protocol
when "h2" then HTTP2
when "http/1.1" then HTTP1
else
raise Error, "unsupported protocol (##{protocol})"
end
end
end
end
end

View File

@ -368,5 +368,4 @@ module HTTPX
UPCASED[field] || field.split("-").map(&:capitalize).join("-")
end
end
Connection.register "http/1.1", Connection::HTTP1
end

View File

@ -412,5 +412,4 @@ module HTTPX
end
end
end
Connection.register "h2", Connection::HTTP2
end

View File

@ -5,13 +5,3 @@ require "httpx/io/udp"
require "httpx/io/tcp"
require "httpx/io/unix"
require "httpx/io/ssl"
module HTTPX
module IO
extend Registry
register "udp", UDP
register "unix", HTTPX::UNIX
register "tcp", TCP
register "ssl", SSL
end
end

View File

@ -201,7 +201,7 @@ module HTTPX
def option_transport(value)
transport = value.to_s
raise TypeError, "\#{transport} is an unsupported transport type" unless IO.registry.key?(transport)
raise TypeError, "#{transport} is an unsupported transport type" unless %w[unix].include?(transport)
transport
end

View File

@ -20,10 +20,7 @@ module HTTPX
end
def extra_options(options)
encodings = Module.new do
extend Registry
end
options.merge(encodings: encodings)
options.merge(encodings: {})
end
end
@ -36,7 +33,7 @@ module HTTPX
end
def option_encodings(value)
raise TypeError, ":encodings must be a registry" unless value.respond_to?(:registry)
raise TypeError, ":encodings must be an Hash" unless value.is_a?(Hash)
value
end
@ -49,7 +46,7 @@ module HTTPX
if @headers.key?("range")
@headers.delete("accept-encoding")
else
@headers["accept-encoding"] ||= @options.encodings.registry.keys
@headers["accept-encoding"] ||= @options.encodings.keys
end
end
end
@ -65,7 +62,9 @@ module HTTPX
@headers.get("content-encoding").each do |encoding|
next if encoding == "identity"
@body = Encoder.new(@body, options.encodings.registry(encoding).deflater)
next unless options.encodings.key?(encoding)
@body = Encoder.new(@body, options.encodings[encoding].deflater)
end
@headers["content-length"] = @body.bytesize unless unbounded_body?
end
@ -95,7 +94,9 @@ module HTTPX
@_inflaters = @headers.get("content-encoding").filter_map do |encoding|
next if encoding == "identity"
inflater = @options.encodings.registry(encoding).inflater(compressed_length)
next unless @options.encodings.key?(encoding)
inflater = @options.encodings[encoding].inflater(compressed_length)
# do not uncompress if there is no decoder available. In fact, we can't reliably
# continue decompressing beyond that, so ignore.
break unless inflater

View File

@ -5,13 +5,13 @@ module HTTPX
module Compression
module Brotli
class << self
def load_dependencies(_klass)
def load_dependencies(klass)
require "brotli"
klass.plugin(:compression)
end
def configure(klass)
klass.plugin(:compression)
klass.default_options.encodings.register "br", self
def extra_options(options)
options.merge(encodings: options.encodings.merge("br" => self))
end
end

View File

@ -4,14 +4,19 @@ module HTTPX
module Plugins
module Compression
module Deflate
def self.load_dependencies(_klass)
require "stringio"
require "zlib"
end
class << self
def load_dependencies(_klass)
require "stringio"
require "zlib"
end
def self.configure(klass)
klass.plugin(:"compression/gzip")
klass.default_options.encodings.register "deflate", self
def configure(klass)
klass.plugin(:"compression/gzip")
end
def extra_options(options)
options.merge(encodings: options.encodings.merge("deflate" => self))
end
end
module Deflater

View File

@ -6,12 +6,14 @@ module HTTPX
module Plugins
module Compression
module GZIP
def self.load_dependencies(*)
require "zlib"
end
class << self
def load_dependencies(*)
require "zlib"
end
def self.configure(klass)
klass.default_options.encodings.register "gzip", self
def extra_options(options)
options.merge(encodings: options.encodings.merge("gzip" => self))
end
end
class Deflater

View File

@ -233,7 +233,7 @@ module HTTPX
uri.path = rpc_method
headers = HEADERS.merge(
"grpc-accept-encoding" => ["identity", *@options.encodings.registry.keys]
"grpc-accept-encoding" => ["identity", *@options.encodings.keys]
)
unless deadline == Float::INFINITY
# convert to milliseconds
@ -249,7 +249,7 @@ module HTTPX
if compression
headers["grpc-encoding"] = compression
deflater = @options.encodings.registry(compression).deflater
deflater = @options.encodings[compression].deflater if @options.encodings.key?(compression)
end
headers.merge!(@options.call_credentials.call) if @options.call_credentials

View File

@ -47,7 +47,9 @@ module HTTPX
data = message.byteslice(5..size + 5 - 1)
if compressed == 1
encodings.reverse_each do |algo|
inflater = encoders.registry(algo).inflater(size)
next unless encoders.key?(algo)
inflater = encoders[algo].inflater(size)
data = inflater.inflate(data)
size = data.bytesize
end

View File

@ -12,13 +12,9 @@ module HTTPX
VALID_H2C_VERBS = %w[GET OPTIONS HEAD].freeze
class << self
def load_dependencies(*)
def load_dependencies(klass)
require "base64"
end
def configure(klass)
klass.plugin(:upgrade)
klass.default_options.upgrade_handlers.register "h2c", self
end
def call(connection, request, response)
@ -26,7 +22,7 @@ module HTTPX
end
def extra_options(options)
options.merge(max_concurrent_requests: 1)
options.merge(max_concurrent_requests: 1, upgrade_handlers: options.upgrade_handlers.merge("h2c" => self))
end
end
@ -38,7 +34,7 @@ module HTTPX
connection = pool.find_connection(upgrade_request.uri, upgrade_request.options)
return super if connection && connection.upgrade_protocol == :h2c
return super if connection && connection.upgrade_protocol == "h2c"
# build upgrade request
upgrade_request.headers.add("connection", "upgrade")
@ -83,7 +79,7 @@ module HTTPX
set_parser_callbacks(@parser)
@inflight += 1
@parser.upgrade(request, response)
@upgrade_protocol = :h2c
@upgrade_protocol = "h2c"
if request.options.max_concurrent_requests != @options.max_concurrent_requests
@options = @options.merge(max_concurrent_requests: nil)

View File

@ -52,7 +52,7 @@ module HTTPX
super
meter_elapsed_time("Session: initialized!!!")
resolver_type = @options.resolver_class
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
resolver_type = Resolver.resolver_for(resolver_type)
return unless resolver_type <= Resolver::Native
resolver_type.prepend TrackTimeMethods

View File

@ -40,9 +40,21 @@ module HTTPX
require "httpx/plugins/multipart/part"
require "httpx/plugins/multipart/mime_type_detector"
end
end
def configure(*)
Transcoder.register("form", FormTranscoder)
module RequestBodyMethods
private
def initialize_body(options)
return FormTranscoder.encode(options.form) if options.form
super
end
end
module ResponseMethods
def form
decode(FormTranscoder)
end
end

View File

@ -318,7 +318,7 @@ module HTTPX
register_plugin :proxy, Proxy
end
class ProxySSL < IO.registry["ssl"]
class ProxySSL < SSL
def initialize(tcp, request_uri, options)
@io = tcp.to_io
super(request_uri, tcp.addresses, options)

View File

@ -61,7 +61,7 @@ module HTTPX
return unless @io.connected?
@parser || begin
@parser = registry(@io.protocol).new(@write_buffer, @options.merge(max_concurrent_requests: 1))
@parser = self.class.parser_type(@io.protocol).new(@write_buffer, @options.merge(max_concurrent_requests: 1))
parser = @parser
parser.extend(ProxyParser)
parser.on(:response, &method(:__http_on_connect))

View File

@ -15,16 +15,13 @@ module HTTPX
end
def extra_options(options)
upgrade_handlers = Module.new do
extend Registry
end
options.merge(upgrade_handlers: upgrade_handlers)
options.merge(upgrade_handlers: {})
end
end
module OptionsMethods
def option_upgrade_handlers(value)
raise TypeError, ":upgrade_handlers must be a registry" unless value.respond_to?(:registry)
raise TypeError, ":upgrade_handlers must be a Hash" unless value.is_a?(Hash)
value
end
@ -41,9 +38,9 @@ module HTTPX
upgrade_protocol = response.headers["upgrade"].split(/ *, */).first
return response unless upgrade_protocol && options.upgrade_handlers.registry.key?(upgrade_protocol)
return response unless upgrade_protocol && options.upgrade_handlers.key?(upgrade_protocol)
protocol_handler = options.upgrade_handlers.registry(upgrade_protocol)
protocol_handler = options.upgrade_handlers[upgrade_protocol]
return response unless protocol_handler

View File

@ -10,8 +10,8 @@ module HTTPX
#
module H2
class << self
def configure(klass)
klass.default_options.upgrade_handlers.register "h2", self
def extra_options(options)
options.merge(upgrade_handlers: options.upgrade_handlers.merge("h2" => self))
end
def call(connection, _request, _response)
@ -32,7 +32,7 @@ module HTTPX
@parser = Connection::HTTP2.new(@write_buffer, @options)
set_parser_callbacks(@parser)
@upgrade_protocol = :h2
@upgrade_protocol = "h2"
# what's happening here:
# a deviation from the state machine is done to perform the actions when a

View File

@ -244,7 +244,7 @@ module HTTPX
def find_resolver_for(connection)
connection_options = connection.options
resolver_type = connection_options.resolver_class
resolver_type = Resolver.registry(resolver_type) if resolver_type.is_a?(Symbol)
resolver_type = Resolver.resolver_for(resolver_type)
@resolvers[resolver_type] ||= begin
resolver_manager = if resolver_type.multi?

View File

@ -1,85 +0,0 @@
# frozen_string_literal: true
module HTTPX
# Adds a general-purpose registry API to a class. It is designed to be a
# configuration-level API, i.e. the registry is global to the class and
# should be set on **boot time**.
#
# It is used internally to associate tags with handlers.
#
# ## Register/Fetch
#
# One is strongly advised to register handlers when creating the class.
#
# There is an instance-level method to retrieve from the registry based
# on the tag:
#
# class Server
# include HTTPX::Registry
#
# register "tcp", TCPHandler
# register "ssl", SSLHandlers
# ...
#
#
# def handle(uri)
# scheme = uri.scheme
# handler = registry(scheme) #=> TCPHandler
# handler.handle
# end
# end
#
module Registry
# Base Registry Error
class Error < Error; end
def self.extended(klass)
super
klass.extend(ClassMethods)
end
def self.included(klass)
super
klass.extend(ClassMethods)
klass.__send__(:include, InstanceMethods)
end
# Class Methods
module ClassMethods
def inherited(klass)
super
klass.instance_variable_set(:@registry, @registry.dup)
end
# @param [Object] tag the handler identifier in the registry
# @return [Symbol, String, Object] the corresponding handler (if Symbol or String,
# will assume it referes to an autoloaded module, and will load-and-return it).
#
def registry(tag = nil)
@registry ||= {}
return @registry if tag.nil?
handler = @registry[tag]
raise(Error, "#{tag} is not registered in #{self}") unless handler
handler
end
# @param [Object] tag the identifier for the handler in the registry
# @return [Symbol, String, Object] the handler (if Symbol or String, it is
# assumed to be an autoloaded module, to be loaded later)
#
def register(tag, handler)
registry[tag] = handler
end
end
# Instance Methods
module InstanceMethods
# delegates to HTTPX::Registry#registry
def registry(tag)
self.class.registry(tag)
end
end
end
end

View File

@ -120,7 +120,7 @@ module HTTPX
query = []
if (q = @options.params)
query << Transcoder.registry("form").encode(q)
query << Transcoder::Form.encode(q)
end
query << @uri.query if @uri.query
@query = query.join("&")
@ -160,15 +160,7 @@ module HTTPX
def initialize(headers, options)
@headers = headers
@body = if options.body
Transcoder.registry("body").encode(options.body)
elsif options.form
Transcoder.registry("form").encode(options.form)
elsif options.json
Transcoder.registry("json").encode(options.json)
elsif options.xml
Transcoder.registry("xml").encode(options.xml)
end
@body = initialize_body(options)
return if @body.nil?
@headers["content-type"] ||= @body.content_type
@ -211,7 +203,7 @@ module HTTPX
def stream(body)
encoded = body
encoded = Transcoder.registry("chunker").encode(body.enum_for(:each)) if chunked?
encoded = Transcoder::Chunker.encode(body.enum_for(:each)) if chunked?
encoded
end
@ -235,6 +227,20 @@ module HTTPX
"#{unbounded_body? ? "stream" : "@bytesize=#{bytesize}"}>"
end
# :nocov:
private
def initialize_body(options)
if options.body
Transcoder::Body.encode(options.body)
elsif options.form
Transcoder::Form.encode(options.form)
elsif options.json
Transcoder::JSON.encode(options.json)
elsif options.xml
Transcoder::Xml.encode(options.xml)
end
end
end
def transition(nextstate)

View File

@ -5,8 +5,6 @@ require "ipaddr"
module HTTPX
module Resolver
extend Registry
RESOLVE_TIMEOUT = 5
require "httpx/resolver/resolver"
@ -15,10 +13,6 @@ module HTTPX
require "httpx/resolver/https"
require "httpx/resolver/multi"
register :system, System
register :native, Native
register :https, HTTPS
@lookup_mutex = Mutex.new
@lookups = Hash.new { |h, k| h[k] = [] }
@ -28,6 +22,18 @@ module HTTPX
module_function
def resolver_for(resolver_type)
case resolver_type
when :native then Native
when :system then System
when :https then HTTPS
else
return resolver_type if resolver_type.is_a?(Class) && resolver_type < Resolver
raise Error, "unsupported resolver type (#{resolver_type})"
end
end
def nolookup_resolve(hostname)
ip_resolve(hostname) || cached_lookup(hostname) || system_resolve(hostname)
end

View File

@ -87,32 +87,27 @@ module HTTPX
end
def json(*args)
decode("json", *args)
decode(Transcoder::JSON, *args)
end
def form
decode("form")
decode(Transcoder::Form)
end
def xml
decode("xml")
decode(Transcoder::Xml)
end
private
def decode(format, *args)
def decode(transcoder, *args)
# TODO: check if content-type is a valid format, i.e. "application/json" for json parsing
transcoder = Transcoder.registry(format)
raise Error, "no decoder available for \"#{format}\"" unless transcoder.respond_to?(:decode)
decoder = transcoder.decode(self)
raise Error, "no decoder available for \"#{format}\"" unless decoder
raise Error, "no decoder available for \"#{transcoder}\"" unless decoder
decoder.call(self, *args)
rescue Registry::Error
raise Error, "no decoder available for \"#{format}\""
end
def no_data?

View File

@ -2,8 +2,6 @@
module HTTPX
module Transcoder
extend Registry
using RegexpExtensions unless Regexp.method_defined?(:match?)
module_function

View File

@ -55,5 +55,4 @@ module HTTPX::Transcoder
Encoder.new(body)
end
end
register "body", Body
end

View File

@ -112,5 +112,4 @@ module HTTPX::Transcoder
Encoder.new(chunks)
end
end
register "chunker", Chunker
end

View File

@ -55,5 +55,4 @@ module HTTPX::Transcoder
Decoder
end
end
register "form", Form
end

View File

@ -56,5 +56,4 @@ module HTTPX::Transcoder
end
# rubocop:enable Style/SingleLineMethods
end
register "json", JSON
end

View File

@ -51,5 +51,4 @@ module HTTPX::Transcoder
end
end
end
register "xml", Xml
end

View File

@ -17,7 +17,6 @@ module HTTPX
extend Forwardable
include Loggable
include Callbacks
include HTTPX::Registry[String, Class]
attr_reader type: io_type
@ -101,6 +100,8 @@ module HTTPX
def transition: (Symbol) -> void
def build_socket: (?Array[ipaddr]? addrs) -> (TCP | SSL | UNIX)
def on_error: (HTTPX::TimeoutError | Error | StandardError) -> void
def handle_error: (StandardError) -> void
@ -112,5 +113,7 @@ module HTTPX
def write_timeout_callback: (Request request, Numeric write_timeout) -> void
def read_timeout_callback: (Request request, Numeric read_timeout, ?singleton(RequestTimeoutError) error_type) -> void
def self.parser_type: (String protocol) -> (singleton(HTTP1) | singleton(HTTP2))
end
end

View File

@ -1,7 +1,6 @@
module HTTPX
module Plugins
module Compression
type encodings_registry = Registry[Symbol, Class]
type deflatable = _Reader | _ToS
@ -16,12 +15,17 @@ module HTTPX
def initialize: (Integer | Float bytesize) -> untyped
end
interface _Compressor
def deflater: () -> _Deflater
def inflater: (Integer | Float bytesize) -> _Inflater
end
def self.configure: (singleton(Session)) -> void
interface _CompressionOptions
def compression_threshold_size: () -> Integer?
def encodings: () -> encodings_registry?
def encodings: () -> Hash[String, _Compressor]
end
def self.extra_options: (Options) -> (Options & _CompressionOptions)

View File

@ -39,8 +39,8 @@ module HTTPX
def self?.encode: (String bytes, ?deflater: Compression::_Deflater?) -> String
def self?.decode: (String message, encodings: Array[String], encoders: Compression::encodings_registry) -> String
| (String message, encodings: Array[String], encoders: Compression::encodings_registry) { (String) -> void } -> void
def self?.decode: (String message, encodings: Array[String], encoders: Hash[String, Compression::_Compressor]) -> String
| (String message, encodings: Array[String], encoders: Hash[String, Compression::_Compressor]) { (String) -> void } -> void
def self?.cancel: (Request) -> void
@ -65,7 +65,7 @@ module HTTPX
module ResponseMethods
def merge_headers: (headers_input trailers) -> void
def encoders: () -> Compression::encodings_registry
def encoders: () -> Hash[String, Compression::_Compressor]
end
module InstanceMethods

View File

@ -1,18 +1,20 @@
module HTTPX
module Plugins
module Upgrade
type handlers_registry = Registry[Symbol, Class]
interface _Upgrader
def call: (Connection connection, Request request, Response response) -> void
end
def self.configure: (singleton(Session)) -> void
interface _UpgradeOptions
def upgrade_handlers: () -> handlers_registry?
def upgrade_handlers: () -> Hash[String, _Upgrader]
end
def self.extra_options: (Options) -> (Options & _UpgradeOptions)
module ConnectionMethods
attr_reader upgrade_protocol: Symbol?
attr_reader upgrade_protocol: String?
attr_reader hijacked: boolish
def hijack_io: () -> void

View File

@ -1,13 +0,0 @@
module HTTPX::Registry[unchecked out T, unchecked out V]
class Error < HTTPX::Error
end
# type registrable = Symbol | String | Class
def self.registry: [T, V] (T) -> Class
| [T, V] () -> Hash[T, V]
def self.register: [T, V] (T tag, V handler) -> void
def registry: (?T tag) -> V
end

View File

@ -56,6 +56,10 @@ module HTTPX
def request_timeout: () -> Numeric
private
def initialize_body: (Options options) -> Transcoder::_Encoder?
class Body
@headers: Headers
@body: body_encoder?

View File

@ -2,8 +2,6 @@ module HTTPX
type ipaddr = IPAddr | String
module Resolver
extend Registry[Symbol, Class]
RESOLVE_TIMEOUT: Integer | Float
@lookup_mutex: Thread::Mutex
@ -21,6 +19,11 @@ module HTTPX
def system_resolve: (String hostname) -> Array[IPAddr]?
def self?.resolver_for: (:native resolver_type) -> singleton(Native) |
(:system resolver_type) -> singleton(System) |
(:https resolver_type) -> singleton(HTTPS) |
(singleton(Resolver::Resolver) resolver_type) -> singleton(Resolver::Resolver)
def self?.cached_lookup: (String hostname) -> Array[IPAddr]?
def self?.cached_lookup_set: (String hostname, ip_family family, Array[dns_result] addresses) -> void

View File

@ -41,7 +41,7 @@ module HTTPX
def initialize: (Request request, String | Integer status, String version, headers?) -> untyped
def no_data?: () -> bool
def decode:(String format, ?untyped options) -> untyped
def decode:(Transcoder::_Decode transcoder, ?untyped options) -> untyped
class Body
include _Reader

View File

@ -4,11 +4,6 @@ module HTTPX
type body_encoder = Transcoder::_Encoder | _Each[String]
module Transcoder
def self?.registry: (String tag) -> _Encode
| () -> Hash[String, _Encode]
def self?.register: (String tag, _Encode handler) -> void
def self?.normalize_keys: [U] (_ToS key, _ToAry[untyped] | _ToHash[_ToS, untyped] | untyped value, ?(^(untyped value) -> bool | nil) cond) { (String, ?untyped) -> U } -> U
def self?.normalize_query: (Hash[String, untyped] params, String name, String v, Integer depth) -> void
@ -17,6 +12,10 @@ module HTTPX
def encode: (untyped payload) -> body_encoder
end
interface _Decode
def decode: (HTTPX::Response response) -> _Decoder
end
interface _Encoder
def bytesize: () -> (Integer | Float)
end

View File

@ -205,9 +205,6 @@ class ResponseTest < Minitest::Test
form4_response = Response.new(request, 200, "2.0", { "content-type" => "application/x-www-form-urlencoded" })
form4_response << "[]"
assert form4_response.form == {}
error = assert_raises(HTTPX::Error) { form2_response.__send__(:decode, "bla") }
assert error.message.include?("no decoder available for"), "failed with unexpected error"
end
private

View File

@ -112,9 +112,8 @@ end
module WSTestPlugin
class << self
def configure(klass)
def load_dependencies(klass)
klass.plugin(:upgrade)
klass.default_options.upgrade_handlers.register("websocket", self)
end
def call(connection, request, response)
@ -128,7 +127,7 @@ module WSTestPlugin
end
def extra_options(options)
options.merge(max_concurrent_requests: 1)
options.merge(max_concurrent_requests: 1, upgrade_handlers: options.upgrade_handlers.merge("websocket" => self))
end
end