diff --git a/lib/httpx/plugins/circuit_breaker.rb b/lib/httpx/plugins/circuit_breaker.rb index 61250a48..f426826a 100644 --- a/lib/httpx/plugins/circuit_breaker.rb +++ b/lib/httpx/plugins/circuit_breaker.rb @@ -10,110 +10,9 @@ module HTTPX module CircuitBreaker using URIExtensions - class CircuitStore - def initialize(options) - @circuits = Hash.new do |h, k| - h[k] = Circuit.new( - options.circuit_breaker_max_attempts, - options.circuit_breaker_reset_attempts_in, - options.circuit_breaker_break_in, - options.circuit_breaker_half_open_drip_rate - ) - end - end - - def try_open(uri, response) - circuit = get_circuit_for_uri(uri) - - circuit.try_open(response) - end - - def try_close(uri) - circuit = get_circuit_for_uri(uri) - - circuit.try_close - end - - def try_respond(request) - circuit = get_circuit_for_uri(request.uri) - - circuit.respond - end - - private - - def get_circuit_for_uri(uri) - uri = URI(uri) - - if @circuits.key?(uri.origin) - @circuits[uri.origin] - else - @circuits[uri.to_s] - end - end - end - - class Circuit - def initialize(max_attempts, reset_attempts_in, break_in, circuit_breaker_half_open_drip_rate) - @max_attempts = max_attempts - @reset_attempts_in = reset_attempts_in - @break_in = break_in - @circuit_breaker_half_open_drip_rate = 1 - circuit_breaker_half_open_drip_rate - @attempts = 0 - @state = :closed - end - - def respond - try_close - - case @state - when :closed - nil - when :half_open - # return nothing or smth based on ratio - return if Random::DEFAULT.rand >= @circuit_breaker_half_open_drip_rate - - @response - when :open - - @response - end - end - - def try_open(response) - return unless @state == :closed - - now = Utils.now - - if @attempts.positive? - @attempts = 0 if now - @attempted_at > @reset_attempts_in - else - @attempted_at = now - end - - @attempts += 1 - - return unless @attempts >= @max_attempts - - @state = :open - @opened_at = now - @response = response - end - - def try_close - case @state - when :closed - nil - when :half_open - # reset! - @attempts = 0 - @opened_at = @attempted_at = @response = nil - @state = :closed - - when :open - @state = :half_open if Utils.elapsed_time(@opened_at) > @break_in - end - end + def self.load_dependencies(*) + require_relative "circuit_breaker/circuit" + require_relative "circuit_breaker/circuit_store" end def self.extra_options(options) @@ -133,14 +32,21 @@ module HTTPX end def send_requests(*requests) + # @type var short_circuit_responses: Array[response] short_circuit_responses = [] + # run all requests through the circuit breaker, see if the circuit is + # open for any of them. real_requests = requests.each_with_object([]) do |req, real_reqs| short_circuit_response = @circuit_store.try_respond(req) - real_reqs << req if short_circuit_response.nil? + if short_circuit_response.nil? + real_reqs << req + next + end short_circuit_responses[requests.index(req)] = short_circuit_response end + # run requests for the remainder unless real_requests.empty? responses = super(*real_requests) @@ -154,7 +60,12 @@ module HTTPX def on_response(request, response) if response.is_a?(ErrorResponse) - @circuit_store.try_open(request.origin, response) + case response.error + when RequestTimeoutError + @circuit_store.try_open(request.uri, response) + else + @circuit_store.try_open(request.origin, response) + end elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response) @circuit_store.try_open(request.uri, response) end diff --git a/lib/httpx/plugins/circuit_breaker/circuit.rb b/lib/httpx/plugins/circuit_breaker/circuit.rb new file mode 100644 index 00000000..6e733d3b --- /dev/null +++ b/lib/httpx/plugins/circuit_breaker/circuit.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +module HTTPX + module Plugins::CircuitBreaker + # + # A circuit is assigned to a given absoolute url or origin. + # + # It sets +max_attempts+, the number of attempts the circuit allows, before it is opened. + # It sets +reset_attempts_in+, the time a circuit stays open at most, before it resets. + # It sets +break_in+, the time that must elapse before an open circuit can transit to the half-open state. + # It sets +circuit_breaker_half_open_drip_rate+, the rate of requests a circuit allows to be performed when in an half-open state. + # + class Circuit + def initialize(max_attempts, reset_attempts_in, break_in, circuit_breaker_half_open_drip_rate) + @max_attempts = max_attempts + @reset_attempts_in = reset_attempts_in + @break_in = break_in + @circuit_breaker_half_open_drip_rate = 1 - circuit_breaker_half_open_drip_rate + @attempts = 0 + @state = :closed + end + + def respond + try_close + + case @state + when :closed + nil + when :half_open + # return nothing or smth based on ratio + return if Random::DEFAULT.rand >= @circuit_breaker_half_open_drip_rate + + @response + when :open + + @response + end + end + + def try_open(response) + return unless @state == :closed + + now = Utils.now + + if @attempts.positive? + @attempts = 0 if now - @attempted_at > @reset_attempts_in + else + @attempted_at = now + end + + @attempts += 1 + + return unless @attempts >= @max_attempts + + @state = :open + @opened_at = now + @response = response + end + + def try_close + case @state + when :closed + nil + when :half_open + # reset! + @attempts = 0 + @opened_at = @attempted_at = @response = nil + @state = :closed + + when :open + @state = :half_open if Utils.elapsed_time(@opened_at) > @break_in + end + end + end + end +end diff --git a/lib/httpx/plugins/circuit_breaker/circuit_store.rb b/lib/httpx/plugins/circuit_breaker/circuit_store.rb new file mode 100644 index 00000000..007cea2e --- /dev/null +++ b/lib/httpx/plugins/circuit_breaker/circuit_store.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module HTTPX::Plugins::CircuitBreaker + using HTTPX::URIExtensions + + class CircuitStore + def initialize(options) + @circuits = Hash.new do |h, k| + h[k] = Circuit.new( + options.circuit_breaker_max_attempts, + options.circuit_breaker_reset_attempts_in, + options.circuit_breaker_break_in, + options.circuit_breaker_half_open_drip_rate + ) + end + end + + def try_open(uri, response) + circuit = get_circuit_for_uri(uri) + + circuit.try_open(response) + end + + # if circuit is open, it'll respond with the stored response. + # if not, nil. + def try_respond(request) + circuit = get_circuit_for_uri(request.uri) + + circuit.respond + end + + private + + def get_circuit_for_uri(uri) + uri = URI(uri) + + if @circuits.key?(uri.origin) + @circuits[uri.origin] + else + @circuits[uri.to_s] + end + end + end +end