Compare commits

...

6 Commits

Author SHA1 Message Date
HoneyryderChuck
2edb00c3d6 Merge branch 'issue-190' into 'master'
Circuit breaker plugin

Closes #190

See merge request honeyryderchuck/httpx!214
2022-08-08 21:32:34 +00:00
HoneyryderChuck
99a9e1c138 fix circuit breaker:
* circuit break the uri (instead of the whole origin) if the timeout is
  only on requests;
* improved cached responses loop;
* organized components into separate files
2022-08-08 22:19:48 +01:00
HoneyryderChuck
8f8febc10e sig fixes 2022-08-08 22:19:20 +01:00
HoneyryderChuck
3b9cbec8d9 unix socket deprecation message and doc fixes 2022-08-07 22:24:57 +01:00
HoneyryderChuck
c31ded54e1 circuit breaker plugin 2022-08-07 22:24:57 +01:00
HoneyryderChuck
7819079b71 fixes on existing plugin option checks 2022-08-07 22:24:57 +01:00
29 changed files with 419 additions and 28 deletions

View File

@ -34,7 +34,7 @@ HTTPX.get("http://example.com", addresses: %w[172.5.3.1 172.5.3.2]))
You should also use it to connect to HTTP servers bound to a UNIX socket, in which case you'll have to provide a path:
```ruby
HTTPX.get("http://example.com", addresses: %w[/path/to/usocket]))
HTTPX.get("http://example.com", transport: "unix", addresses: %w[/path/to/usocket]))
```
The `:transport_options` are therefore deprecated, and will be moved in a major version.

View File

@ -577,7 +577,7 @@ module HTTPX
error = ex
else
# inactive connections do not contribute to the select loop, therefore
# they should fail due to such errors.
# they should not fail due to such errors.
return if @state == :inactive
if @timeout

View File

@ -33,7 +33,7 @@ module HTTPX
else
if @options.transport_options
# :nocov:
warn ":#{__method__} is deprecated, use :addresses instead"
warn ":transport_options is deprecated, use :addresses instead"
@path = @options.transport_options[:path]
# :nocov:
else

View File

@ -267,7 +267,7 @@ module HTTPX
instance_variables.each do |ivar|
value = other.instance_variable_get(ivar)
value = case value
when Symbol, Fixnum, TrueClass, FalseClass # rubocop:disable Lint/UnifiedInteger
when Symbol, Numeric, TrueClass, FalseClass
value
else
value.dup

View File

@ -0,0 +1,115 @@
# frozen_string_literal: true
module HTTPX
module Plugins
#
# This plugin implements a circuit breaker around connection errors.
#
# https://gitlab.com/honeyryderchuck/httpx/wikis/Circuit-Breaker
#
module CircuitBreaker
using URIExtensions
def self.load_dependencies(*)
require_relative "circuit_breaker/circuit"
require_relative "circuit_breaker/circuit_store"
end
def self.extra_options(options)
options.merge(circuit_breaker_max_attempts: 3, circuit_breaker_reset_attempts_in: 60, circuit_breaker_break_in: 60,
circuit_breaker_half_open_drip_rate: 1)
end
module InstanceMethods
def initialize(*)
super
@circuit_store = CircuitStore.new(@options)
end
def initialize_dup(orig)
super
@circuit_store = orig.instance_variable_get(:@circuit_store).dup
end
def send_requests(*requests)
# @type var short_circuit_responses: Array[response]
short_circuit_responses = []
# run all requests through the circuit breaker, see if the circuit is
# open for any of them.
real_requests = requests.each_with_object([]) do |req, real_reqs|
short_circuit_response = @circuit_store.try_respond(req)
if short_circuit_response.nil?
real_reqs << req
next
end
short_circuit_responses[requests.index(req)] = short_circuit_response
end
# run requests for the remainder
unless real_requests.empty?
responses = super(*real_requests)
real_requests.each_with_index do |request, idx|
short_circuit_responses[requests.index(request)] = responses[idx]
end
end
short_circuit_responses
end
def on_response(request, response)
if response.is_a?(ErrorResponse)
case response.error
when RequestTimeoutError
@circuit_store.try_open(request.uri, response)
else
@circuit_store.try_open(request.origin, response)
end
elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response)
@circuit_store.try_open(request.uri, response)
end
super
end
end
module OptionsMethods
def option_circuit_breaker_max_attempts(value)
attempts = Integer(value)
raise TypeError, ":circuit_breaker_max_attempts must be positive" unless attempts.positive?
attempts
end
def option_circuit_breaker_reset_attempts_in(value)
timeout = Float(value)
raise TypeError, ":circuit_breaker_reset_attempts_in must be positive" unless timeout.positive?
timeout
end
def option_circuit_breaker_break_in(value)
timeout = Float(value)
raise TypeError, ":circuit_breaker_break_in must be positive" unless timeout.positive?
timeout
end
def option_circuit_breaker_half_open_drip_rate(value)
ratio = Float(value)
raise TypeError, ":circuit_breaker_half_open_drip_rate must be a number between 0 and 1" unless (0..1).cover?(ratio)
ratio
end
def option_circuit_breaker_break_on(value)
raise TypeError, ":circuit_breaker_break_on must be called with the response" unless value.respond_to?(:call)
value
end
end
end
register_plugin :circuit_breaker, CircuitBreaker
end
end

View File

@ -0,0 +1,76 @@
# frozen_string_literal: true
module HTTPX
module Plugins::CircuitBreaker
#
# A circuit is assigned to a given absoolute url or origin.
#
# It sets +max_attempts+, the number of attempts the circuit allows, before it is opened.
# It sets +reset_attempts_in+, the time a circuit stays open at most, before it resets.
# It sets +break_in+, the time that must elapse before an open circuit can transit to the half-open state.
# It sets +circuit_breaker_half_open_drip_rate+, the rate of requests a circuit allows to be performed when in an half-open state.
#
class Circuit
def initialize(max_attempts, reset_attempts_in, break_in, circuit_breaker_half_open_drip_rate)
@max_attempts = max_attempts
@reset_attempts_in = reset_attempts_in
@break_in = break_in
@circuit_breaker_half_open_drip_rate = 1 - circuit_breaker_half_open_drip_rate
@attempts = 0
@state = :closed
end
def respond
try_close
case @state
when :closed
nil
when :half_open
# return nothing or smth based on ratio
return if Random::DEFAULT.rand >= @circuit_breaker_half_open_drip_rate
@response
when :open
@response
end
end
def try_open(response)
return unless @state == :closed
now = Utils.now
if @attempts.positive?
@attempts = 0 if now - @attempted_at > @reset_attempts_in
else
@attempted_at = now
end
@attempts += 1
return unless @attempts >= @max_attempts
@state = :open
@opened_at = now
@response = response
end
def try_close
case @state
when :closed
nil
when :half_open
# reset!
@attempts = 0
@opened_at = @attempted_at = @response = nil
@state = :closed
when :open
@state = :half_open if Utils.elapsed_time(@opened_at) > @break_in
end
end
end
end
end

View File

@ -0,0 +1,44 @@
# frozen_string_literal: true
module HTTPX::Plugins::CircuitBreaker
using HTTPX::URIExtensions
class CircuitStore
def initialize(options)
@circuits = Hash.new do |h, k|
h[k] = Circuit.new(
options.circuit_breaker_max_attempts,
options.circuit_breaker_reset_attempts_in,
options.circuit_breaker_break_in,
options.circuit_breaker_half_open_drip_rate
)
end
end
def try_open(uri, response)
circuit = get_circuit_for_uri(uri)
circuit.try_open(response)
end
# if circuit is open, it'll respond with the stored response.
# if not, nil.
def try_respond(request)
circuit = get_circuit_for_uri(request.uri)
circuit.respond
end
private
def get_circuit_for_uri(uri)
uri = URI(uri)
if @circuits.key?(uri.origin)
@circuits[uri.origin]
else
@circuits[uri.to_s]
end
end
end
end

View File

@ -42,7 +42,7 @@ module HTTPX
private
def on_response(reuest, response)
def on_response(_request, response)
if response && response.respond_to?(:headers) && (set_cookie = response.headers["set-cookie"])
log { "cookies: set-cookie is over #{Cookie::MAX_LENGTH}" } if set_cookie.bytesize > Cookie::MAX_LENGTH

View File

@ -22,7 +22,7 @@ module HTTPX
module OptionsMethods
def option_expect_timeout(value)
seconds = Integer(value)
seconds = Float(value)
raise TypeError, ":expect_timeout must be positive" unless seconds.positive?
seconds

View File

@ -67,7 +67,7 @@ module HTTPX
end
def option_retry_on(value)
raise ":retry_on must be called with the response" unless value.respond_to?(:call)
raise TypeError, ":retry_on must be called with the response" unless value.respond_to?(:call)
value
end

View File

@ -15,7 +15,7 @@ module HTTPX
# delegated
def <<: (string data) -> String
def empty?: () -> bool
def bytesize: () -> Integer
def bytesize: () -> (Integer | Float)
def clear: () -> void
def replace: (string) -> void

View File

@ -33,6 +33,7 @@ module HTTPX
| (:aws_sigv4, ?options) -> Plugins::awsSigV4Session
| (:grpc, ?options) -> Plugins::grpcSession
| (:response_cache, ?options) -> Plugins::sessionResponseCache
| (:circuit_breaker, ?options) -> Plugins::sessionCircuitBreaker
| (Symbol | Module, ?options) { (Class) -> void } -> Session
| (Symbol | Module, ?options) -> Session

View File

@ -29,7 +29,6 @@ module HTTPX
attr_writer timers: Timers
@type: io_type
@origins: Array[URI::Generic]
@window_size: Integer
@read_buffer: Buffer
@write_buffer: Buffer

View File

@ -0,0 +1,61 @@
module HTTPX
module Plugins
module CircuitBreaker
class CircuitStore
@circuits: Hash[String, Circuit]
def try_open: (generic_uri uri, response response) -> void
def try_respond: (Request request) -> response?
private
def get_circuit_for_uri: (generic_uri uri) -> Circuit
def initialize: (Options & _CircuitOptions options) -> void
end
class Circuit
@state: :closed | :open | :half_open
@max_attempts: Integer
@reset_attempts_in: Float
@break_in: Float
@circuit_breaker_half_open_drip_rate: Float
@attempts: Integer
@response: response?
@opened_at: Float?
@attempted_at: Float?
def respond: () -> response?
def try_open: (response) -> void
def try_close: () -> void
private
def initialize: (Integer max_attempts, Float reset_attempts_in, Float break_in, Float circuit_breaker_half_open_drip_rate) -> void
end
interface _CircuitOptions
def circuit_breaker_max_attempts: () -> Integer
def circuit_breaker_reset_attempts_in: () -> Float
def circuit_breaker_break_in: () -> Float
def circuit_breaker_half_open_drip_rate: () -> Float
def circuit_breaker_break_on: () -> (^(Response) -> boolish | nil)
end
def self.load_dependencies: (singleton(Session)) -> void
def self.extra_options: (Options) -> (Options & _CircuitOptions)
module InstanceMethods
@circuit_store: CircuitStore
end
end
type sessionCircuitBreaker = Session & CircuitBreaker::InstanceMethods
end
end

View File

@ -13,7 +13,7 @@ module HTTPX
interface _Inflater
def inflate: (string) -> String
def initialize: (Numeric bytesize) -> untyped
def initialize: (Integer | Float bytesize) -> untyped
end
def self.configure: (singleton(Session)) -> void

View File

@ -6,7 +6,7 @@ module HTTPX
def self.configure: (singleton(Session)) -> void
def self?.deflater: () -> _Deflater
def self?.decoder: (Numeric bytesize) -> Inflater
def self?.decoder: (Integer | Float bytesize) -> Inflater
module Deflater
extend _Deflater

View File

@ -6,7 +6,7 @@ module HTTPX
def self.configure: (singleton(Session)) -> void
def self?.deflater: () -> _Deflater
def self?.inflater: (Numeric bytesize) -> GZIP::Inflater
def self?.inflater: (Integer | Float bytesize) -> GZIP::Inflater
module Deflater
extend _Deflater

View File

@ -6,15 +6,15 @@ module HTTPX
def self.configure: (singleton(Session)) -> void
def self?.deflater: () -> _Deflater
def self?.inflater: (Numeric bytesize) -> Inflater
def self?.inflater: (Integer | Float bytesize) -> Inflater
class Deflater
include _Deflater
@compressed_chunk: String
private
def initialize: () -> untyped
def write: (string) -> void
def compressed_chunk: () -> String

View File

@ -4,14 +4,15 @@ module HTTPX
module Plugins
module Proxy
module Socks5
VERSION: Integer
module ConnectionMethods
def __socks5_proxy_connect: () -> void
def __socks5_on_packet: (String packet) -> void
def __socks5_check_version: (int) -> void
def __on_socks5_error: (string) -> void
end
class SocksParser
include Callbacks

View File

@ -1,12 +1,13 @@
module HTTPX::Registry[T, V]
module HTTPX::Registry[unchecked out T, unchecked out V]
class Error < HTTPX::Error
end
# type registrable = Symbol | String | Class
def self.registry: (T tag) -> Class
| () -> Hash[T, V]
def self.register: (T tag, V handler) -> void
def self.registry: [T, V] (T) -> Class
| [T, V] () -> Hash[T, V]
def self.register: [T, V] (T tag, V handler) -> void
def registry: (?T tag) -> V
end

View File

@ -67,7 +67,7 @@ module HTTPX
def rewind: () -> void
def empty?: () -> bool
def bytesize: () -> Numeric
def bytesize: () -> (Integer | Float)
def stream: (Transcoder::_Encoder) -> bodyIO
def unbounded_body?: () -> bool
def chunked?: () -> bool

View File

@ -12,7 +12,7 @@ module HTTPX
@ns_index: Integer
@nameserver: Array[String]?
@ndots: Integer
@start_timeout: Numeric?
@start_timeout: Float?
@search: Array[String]
@_timeouts: Array[Numeric]
@timeouts: Hash[String, Array[Numeric]]

View File

@ -63,7 +63,7 @@ module HTTPX
def each: () { (String) -> void } -> void
| () -> Enumerable[String]
def bytesize: () -> Numeric
def bytesize: () -> (Integer | Float)
def empty?: () -> bool
def copy_to: (String | File | _Writer destination) -> void
def close: () -> void

View File

@ -1,7 +1,7 @@
module HTTPX
class Timers
@intervals: Array[Interval]
@next_interval_at: Numeric
@next_interval_at: Float
def after: (Numeric interval_in_secs) { () -> void } -> void

View File

@ -18,7 +18,7 @@ module HTTPX
end
interface _Encoder
def bytesize: () -> Numeric
def bytesize: () -> (Integer | Float)
end
interface _Decoder

View File

@ -4,9 +4,9 @@ module HTTPX
def self?.parse_retry_after: (String) -> Numeric
def self?.now: () -> Numeric
def self?.now: () -> Float
def self?.elapsed_time: (Numeric monotonic_time) -> Numeric
def self?.elapsed_time: (Integer | Float monotonic_time) -> Float
def self?.to_uri: (generic_uri uri) -> URI::Generic
end

View File

@ -31,6 +31,7 @@ class HTTPTest < Minitest::Test
include Plugins::Upgrade
include Plugins::GRPC if RUBY_ENGINE == "ruby" && RUBY_VERSION >= "2.3.0"
include Plugins::ResponseCache
include Plugins::CircuitBreaker
def test_verbose_log
log = StringIO.new

View File

@ -32,6 +32,7 @@ class HTTPSTest < Minitest::Test
include Plugins::Upgrade
include Plugins::GRPC if RUBY_ENGINE == "ruby" && RUBY_VERSION >= "2.3.0"
include Plugins::ResponseCache
include Plugins::CircuitBreaker
def test_connection_coalescing
coalesced_origin = "https://#{ENV["HTTPBIN_COALESCING_HOST"]}"

View File

@ -0,0 +1,91 @@
# frozen_string_literal: true
module Requests
module Plugins
module CircuitBreaker
using HTTPX::URIExtensions
def test_plugin_circuit_breaker_lifecycles
return unless origin.start_with?("http://")
unknown_uri = "http://www.qwwqjqwdjqiwdj.com"
session = HTTPX.plugin(:circuit_breaker,
circuit_breaker_max_attempts: 2,
circuit_breaker_break_in: 2,
circuit_breaker_half_open_drip_rate: 1.0)
# circuit closed
response1 = session.get(unknown_uri)
verify_error_response(response1)
response2 = session.get(unknown_uri)
verify_error_response(response2)
assert response2 != response1
# circuit open
response3 = session.get(unknown_uri)
verify_error_response(response3)
assert response3 == response2
sleep 3
# circuit half-closed
response4 = session.get(unknown_uri)
assert response4 != response3
end
def test_plugin_circuit_breaker_reset_attempts
return unless origin.start_with?("http://")
unknown_uri = URI("http://www.qwwqjqwdjqiwdj.com")
session = HTTPX.plugin(:circuit_breaker,
circuit_breaker_max_attempts: 2,
circuit_breaker_reset_attempts_in: 2)
store = session.instance_variable_get(:@circuit_store)
circuit = store.instance_variable_get(:@circuits)[unknown_uri.origin]
# circuit closed
response1 = session.get(unknown_uri)
verify_error_response(response1)
assert circuit.instance_variable_get(:@attempts) == 1
sleep 2
response1 = session.get(unknown_uri)
verify_error_response(response1)
# because it reset
assert circuit.instance_variable_get(:@attempts) == 1
end
def test_plugin_circuit_breaker_break_on
break_on = ->(response) { response.is_a?(HTTPX::ErrorResponse) || response.status == 404 }
session = HTTPX.plugin(:circuit_breaker, circuit_breaker_max_attempts: 1, circuit_breaker_break_on: break_on)
response1 = session.get(build_uri("/status/404"))
verify_status(response1, 404)
response2 = session.get(build_uri("/status/404"))
verify_status(response2, 404)
assert response1 == response2
end
# def test_plugin_circuit_breaker_half_open_drip_rate
# unknown_uri = "http://www.qwwqjqwdjqiwdj.com"
# session = HTTPX.plugin(:circuit_breaker, circuit_breaker_max_attempts: 1, circuit_breaker_half_open_drip_rate: 0.5)
# response1 = session.get(unknown_uri)
# verify_status(response1, 404)
# verify_error_response(response1)
# # circuit open
# responses = session.get(*([unknown_uri] * 10))
# assert responses.size == 10
# assert responses.select { |res| res == response1 }.size == 5
# end
end
end
end