mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-10-04 00:00:37 -04:00
fix circuit breaker:
* circuit break the uri (instead of the whole origin) if the timeout is only on requests; * improved cached responses loop; * organized components into separate files
This commit is contained in:
parent
8f8febc10e
commit
99a9e1c138
@ -10,110 +10,9 @@ module HTTPX
|
||||
module CircuitBreaker
|
||||
using URIExtensions
|
||||
|
||||
class CircuitStore
|
||||
def initialize(options)
|
||||
@circuits = Hash.new do |h, k|
|
||||
h[k] = Circuit.new(
|
||||
options.circuit_breaker_max_attempts,
|
||||
options.circuit_breaker_reset_attempts_in,
|
||||
options.circuit_breaker_break_in,
|
||||
options.circuit_breaker_half_open_drip_rate
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def try_open(uri, response)
|
||||
circuit = get_circuit_for_uri(uri)
|
||||
|
||||
circuit.try_open(response)
|
||||
end
|
||||
|
||||
def try_close(uri)
|
||||
circuit = get_circuit_for_uri(uri)
|
||||
|
||||
circuit.try_close
|
||||
end
|
||||
|
||||
def try_respond(request)
|
||||
circuit = get_circuit_for_uri(request.uri)
|
||||
|
||||
circuit.respond
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get_circuit_for_uri(uri)
|
||||
uri = URI(uri)
|
||||
|
||||
if @circuits.key?(uri.origin)
|
||||
@circuits[uri.origin]
|
||||
else
|
||||
@circuits[uri.to_s]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Circuit
|
||||
def initialize(max_attempts, reset_attempts_in, break_in, circuit_breaker_half_open_drip_rate)
|
||||
@max_attempts = max_attempts
|
||||
@reset_attempts_in = reset_attempts_in
|
||||
@break_in = break_in
|
||||
@circuit_breaker_half_open_drip_rate = 1 - circuit_breaker_half_open_drip_rate
|
||||
@attempts = 0
|
||||
@state = :closed
|
||||
end
|
||||
|
||||
def respond
|
||||
try_close
|
||||
|
||||
case @state
|
||||
when :closed
|
||||
nil
|
||||
when :half_open
|
||||
# return nothing or smth based on ratio
|
||||
return if Random::DEFAULT.rand >= @circuit_breaker_half_open_drip_rate
|
||||
|
||||
@response
|
||||
when :open
|
||||
|
||||
@response
|
||||
end
|
||||
end
|
||||
|
||||
def try_open(response)
|
||||
return unless @state == :closed
|
||||
|
||||
now = Utils.now
|
||||
|
||||
if @attempts.positive?
|
||||
@attempts = 0 if now - @attempted_at > @reset_attempts_in
|
||||
else
|
||||
@attempted_at = now
|
||||
end
|
||||
|
||||
@attempts += 1
|
||||
|
||||
return unless @attempts >= @max_attempts
|
||||
|
||||
@state = :open
|
||||
@opened_at = now
|
||||
@response = response
|
||||
end
|
||||
|
||||
def try_close
|
||||
case @state
|
||||
when :closed
|
||||
nil
|
||||
when :half_open
|
||||
# reset!
|
||||
@attempts = 0
|
||||
@opened_at = @attempted_at = @response = nil
|
||||
@state = :closed
|
||||
|
||||
when :open
|
||||
@state = :half_open if Utils.elapsed_time(@opened_at) > @break_in
|
||||
end
|
||||
end
|
||||
def self.load_dependencies(*)
|
||||
require_relative "circuit_breaker/circuit"
|
||||
require_relative "circuit_breaker/circuit_store"
|
||||
end
|
||||
|
||||
def self.extra_options(options)
|
||||
@ -133,14 +32,21 @@ module HTTPX
|
||||
end
|
||||
|
||||
def send_requests(*requests)
|
||||
# @type var short_circuit_responses: Array[response]
|
||||
short_circuit_responses = []
|
||||
|
||||
# run all requests through the circuit breaker, see if the circuit is
|
||||
# open for any of them.
|
||||
real_requests = requests.each_with_object([]) do |req, real_reqs|
|
||||
short_circuit_response = @circuit_store.try_respond(req)
|
||||
real_reqs << req if short_circuit_response.nil?
|
||||
if short_circuit_response.nil?
|
||||
real_reqs << req
|
||||
next
|
||||
end
|
||||
short_circuit_responses[requests.index(req)] = short_circuit_response
|
||||
end
|
||||
|
||||
# run requests for the remainder
|
||||
unless real_requests.empty?
|
||||
responses = super(*real_requests)
|
||||
|
||||
@ -154,7 +60,12 @@ module HTTPX
|
||||
|
||||
def on_response(request, response)
|
||||
if response.is_a?(ErrorResponse)
|
||||
@circuit_store.try_open(request.origin, response)
|
||||
case response.error
|
||||
when RequestTimeoutError
|
||||
@circuit_store.try_open(request.uri, response)
|
||||
else
|
||||
@circuit_store.try_open(request.origin, response)
|
||||
end
|
||||
elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response)
|
||||
@circuit_store.try_open(request.uri, response)
|
||||
end
|
||||
|
76
lib/httpx/plugins/circuit_breaker/circuit.rb
Normal file
76
lib/httpx/plugins/circuit_breaker/circuit.rb
Normal file
@ -0,0 +1,76 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX
|
||||
module Plugins::CircuitBreaker
|
||||
#
|
||||
# A circuit is assigned to a given absoolute url or origin.
|
||||
#
|
||||
# It sets +max_attempts+, the number of attempts the circuit allows, before it is opened.
|
||||
# It sets +reset_attempts_in+, the time a circuit stays open at most, before it resets.
|
||||
# It sets +break_in+, the time that must elapse before an open circuit can transit to the half-open state.
|
||||
# It sets +circuit_breaker_half_open_drip_rate+, the rate of requests a circuit allows to be performed when in an half-open state.
|
||||
#
|
||||
class Circuit
|
||||
def initialize(max_attempts, reset_attempts_in, break_in, circuit_breaker_half_open_drip_rate)
|
||||
@max_attempts = max_attempts
|
||||
@reset_attempts_in = reset_attempts_in
|
||||
@break_in = break_in
|
||||
@circuit_breaker_half_open_drip_rate = 1 - circuit_breaker_half_open_drip_rate
|
||||
@attempts = 0
|
||||
@state = :closed
|
||||
end
|
||||
|
||||
def respond
|
||||
try_close
|
||||
|
||||
case @state
|
||||
when :closed
|
||||
nil
|
||||
when :half_open
|
||||
# return nothing or smth based on ratio
|
||||
return if Random::DEFAULT.rand >= @circuit_breaker_half_open_drip_rate
|
||||
|
||||
@response
|
||||
when :open
|
||||
|
||||
@response
|
||||
end
|
||||
end
|
||||
|
||||
def try_open(response)
|
||||
return unless @state == :closed
|
||||
|
||||
now = Utils.now
|
||||
|
||||
if @attempts.positive?
|
||||
@attempts = 0 if now - @attempted_at > @reset_attempts_in
|
||||
else
|
||||
@attempted_at = now
|
||||
end
|
||||
|
||||
@attempts += 1
|
||||
|
||||
return unless @attempts >= @max_attempts
|
||||
|
||||
@state = :open
|
||||
@opened_at = now
|
||||
@response = response
|
||||
end
|
||||
|
||||
def try_close
|
||||
case @state
|
||||
when :closed
|
||||
nil
|
||||
when :half_open
|
||||
# reset!
|
||||
@attempts = 0
|
||||
@opened_at = @attempted_at = @response = nil
|
||||
@state = :closed
|
||||
|
||||
when :open
|
||||
@state = :half_open if Utils.elapsed_time(@opened_at) > @break_in
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
44
lib/httpx/plugins/circuit_breaker/circuit_store.rb
Normal file
44
lib/httpx/plugins/circuit_breaker/circuit_store.rb
Normal file
@ -0,0 +1,44 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX::Plugins::CircuitBreaker
|
||||
using HTTPX::URIExtensions
|
||||
|
||||
class CircuitStore
|
||||
def initialize(options)
|
||||
@circuits = Hash.new do |h, k|
|
||||
h[k] = Circuit.new(
|
||||
options.circuit_breaker_max_attempts,
|
||||
options.circuit_breaker_reset_attempts_in,
|
||||
options.circuit_breaker_break_in,
|
||||
options.circuit_breaker_half_open_drip_rate
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
def try_open(uri, response)
|
||||
circuit = get_circuit_for_uri(uri)
|
||||
|
||||
circuit.try_open(response)
|
||||
end
|
||||
|
||||
# if circuit is open, it'll respond with the stored response.
|
||||
# if not, nil.
|
||||
def try_respond(request)
|
||||
circuit = get_circuit_for_uri(request.uri)
|
||||
|
||||
circuit.respond
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def get_circuit_for_uri(uri)
|
||||
uri = URI(uri)
|
||||
|
||||
if @circuits.key?(uri.origin)
|
||||
@circuits[uri.origin]
|
||||
else
|
||||
@circuits[uri.to_s]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
x
Reference in New Issue
Block a user