mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-11-29 00:01:06 -05:00
circuit-breaker plugin. fix half-open decision to emit real request
the previous logic was relying on a random order which didn't work in practice; instead, one now reuses the max-attempts to define how many requests happen in the half-open state, and the drip rate defines how may of them will be real
This commit is contained in:
parent
dd84195db6
commit
e4869e1a4b
@ -44,7 +44,11 @@
|
||||
* `:datadog` adapter only supports `ddtrace` gem 1.x or higher.
|
||||
* `:faraday` adapter only supports `faraday` gem 1.x or higher.
|
||||
|
||||
### chore
|
||||
## Improvements
|
||||
|
||||
* `circuit_breaker`: the drip rate of real request during the "half-open" stage of a circuit will reliably distribute real requests (as per the drip rate) over the `max_attempts`, before the circuit is closed.
|
||||
|
||||
## Chore
|
||||
|
||||
* `:grpc` plugin: connection won't buffer requests before HTTP/2 handshake is commpleted, i.e. works the same as plain `httpx` HTTP/2 connection establishment.
|
||||
* if you are relying on this, you can keep the old behavior this way: `HTTPX.plugin(:grpc, http2_settings: { wait_for_handshake: false })`.
|
||||
|
||||
@ -16,8 +16,12 @@ module HTTPX
|
||||
end
|
||||
|
||||
def self.extra_options(options)
|
||||
options.merge(circuit_breaker_max_attempts: 3, circuit_breaker_reset_attempts_in: 60, circuit_breaker_break_in: 60,
|
||||
circuit_breaker_half_open_drip_rate: 1)
|
||||
options.merge(
|
||||
circuit_breaker_max_attempts: 3,
|
||||
circuit_breaker_reset_attempts_in: 60,
|
||||
circuit_breaker_break_in: 60,
|
||||
circuit_breaker_half_open_drip_rate: 1
|
||||
)
|
||||
end
|
||||
|
||||
module InstanceMethods
|
||||
@ -84,6 +88,8 @@ module HTTPX
|
||||
end
|
||||
elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response)
|
||||
@circuit_store.try_open(request.uri, response)
|
||||
else
|
||||
@circuit_store.try_close(request.uri)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -15,8 +15,11 @@ module HTTPX
|
||||
@max_attempts = max_attempts
|
||||
@reset_attempts_in = reset_attempts_in
|
||||
@break_in = break_in
|
||||
@circuit_breaker_half_open_drip_rate = 1 - circuit_breaker_half_open_drip_rate
|
||||
@circuit_breaker_half_open_drip_rate = circuit_breaker_half_open_drip_rate
|
||||
@attempts = 0
|
||||
|
||||
total_real_attempts = @max_attempts * @circuit_breaker_half_open_drip_rate
|
||||
@drip_factor = (@max_attempts / total_real_attempts).round
|
||||
@state = :closed
|
||||
end
|
||||
|
||||
@ -27,8 +30,13 @@ module HTTPX
|
||||
when :closed
|
||||
nil
|
||||
when :half_open
|
||||
# return nothing or smth based on ratio
|
||||
return if Random.rand >= @circuit_breaker_half_open_drip_rate
|
||||
@attempts += 1
|
||||
|
||||
# do real requests while drip rate valid
|
||||
if (@real_attempts % @drip_factor).zero?
|
||||
@real_attempts += 1
|
||||
return
|
||||
end
|
||||
|
||||
@response
|
||||
when :open
|
||||
@ -38,23 +46,31 @@ module HTTPX
|
||||
end
|
||||
|
||||
def try_open(response)
|
||||
return unless @state == :closed
|
||||
case @state
|
||||
when :closed
|
||||
now = Utils.now
|
||||
|
||||
now = Utils.now
|
||||
if @attempts.positive?
|
||||
# reset if error happened long ago
|
||||
@attempts = 0 if now - @attempted_at > @reset_attempts_in
|
||||
else
|
||||
@attempted_at = now
|
||||
end
|
||||
|
||||
if @attempts.positive?
|
||||
@attempts = 0 if now - @attempted_at > @reset_attempts_in
|
||||
else
|
||||
@attempted_at = now
|
||||
@attempts += 1
|
||||
|
||||
return unless @attempts >= @max_attempts
|
||||
|
||||
@state = :open
|
||||
@opened_at = now
|
||||
@response = response
|
||||
when :half_open
|
||||
# open immediately
|
||||
|
||||
@state = :open
|
||||
@attempted_at = @opened_at = Utils.now
|
||||
@response = response
|
||||
end
|
||||
|
||||
@attempts += 1
|
||||
|
||||
return unless @attempts >= @max_attempts
|
||||
|
||||
@state = :open
|
||||
@opened_at = now
|
||||
@response = response
|
||||
end
|
||||
|
||||
def try_close
|
||||
@ -62,13 +78,21 @@ module HTTPX
|
||||
when :closed
|
||||
nil
|
||||
when :half_open
|
||||
|
||||
# do not close circuit unless attempts exhausted
|
||||
return unless @attempts >= @max_attempts
|
||||
|
||||
# reset!
|
||||
@attempts = 0
|
||||
@opened_at = @attempted_at = @response = nil
|
||||
@state = :closed
|
||||
|
||||
when :open
|
||||
@state = :half_open if Utils.elapsed_time(@opened_at) > @break_in
|
||||
if Utils.elapsed_time(@opened_at) > @break_in
|
||||
@state = :half_open
|
||||
@attempts = 0
|
||||
@real_attempts = 0
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -21,6 +21,14 @@ module HTTPX::Plugins::CircuitBreaker
|
||||
circuit.try_open(response)
|
||||
end
|
||||
|
||||
def try_close(uri)
|
||||
return unless @circuits.key?(uri.origin) || @circuits.key?(uri.to_s)
|
||||
|
||||
circuit = get_circuit_for_uri(uri)
|
||||
|
||||
circuit.try_close
|
||||
end
|
||||
|
||||
# if circuit is open, it'll respond with the stored response.
|
||||
# if not, nil.
|
||||
def try_respond(request)
|
||||
|
||||
@ -9,6 +9,8 @@ module HTTPX
|
||||
|
||||
def try_respond: (Request request) -> response?
|
||||
|
||||
def try_close: (generic_uri uri) -> void
|
||||
|
||||
private
|
||||
|
||||
def get_circuit_for_uri: (generic_uri uri) -> Circuit
|
||||
@ -23,6 +25,8 @@ module HTTPX
|
||||
@break_in: Float
|
||||
@circuit_breaker_half_open_drip_rate: Float
|
||||
@attempts: Integer
|
||||
@real_attempts: Integer
|
||||
@drip_factor: Integer
|
||||
|
||||
@response: response?
|
||||
@opened_at: Float?
|
||||
|
||||
@ -94,22 +94,64 @@ module Requests
|
||||
assert circuit_opened
|
||||
end
|
||||
|
||||
# def test_plugin_circuit_breaker_half_open_drip_rate
|
||||
# unknown_uri = "http://www.qwwqjqwdjqiwdj.com"
|
||||
def test_plugin_circuit_breaker_half_open_drip_rate
|
||||
delay_url = URI(build_uri("/delay/2"))
|
||||
|
||||
# session = HTTPX.plugin(:circuit_breaker, circuit_breaker_max_attempts: 1, circuit_breaker_half_open_drip_rate: 0.5)
|
||||
session = HTTPX.plugin(:circuit_breaker, circuit_breaker_max_attempts: 2, circuit_breaker_half_open_drip_rate: 0.5,
|
||||
circuit_breaker_break_in: 1)
|
||||
|
||||
# response1 = session.get(unknown_uri)
|
||||
# verify_status(response1, 404)
|
||||
# verify_error_response(response1)
|
||||
store = session.instance_variable_get(:@circuit_store)
|
||||
circuit = store.instance_variable_get(:@circuits)[delay_url.origin]
|
||||
|
||||
# # circuit open
|
||||
response1 = session.get(delay_url, timeout: { request_timeout: 0.5 })
|
||||
response2 = session.get(delay_url, timeout: { request_timeout: 0.5 })
|
||||
verify_error_response(response1, HTTPX::RequestTimeoutError)
|
||||
verify_error_response(response2, HTTPX::RequestTimeoutError)
|
||||
|
||||
# responses = session.get(*([unknown_uri] * 10))
|
||||
# circuit open
|
||||
assert circuit.instance_variable_get(:@attempts) == 2
|
||||
assert circuit.instance_variable_get(:@state) == :open
|
||||
|
||||
# assert responses.size == 10
|
||||
# assert responses.select { |res| res == response1 }.size == 5
|
||||
# end
|
||||
sleep 1.5
|
||||
|
||||
# circuit half-open
|
||||
response3 = session.get(delay_url)
|
||||
verify_status(response3, 200)
|
||||
|
||||
assert circuit.instance_variable_get(:@attempts) == 1
|
||||
assert circuit.instance_variable_get(:@state) == :half_open
|
||||
|
||||
response4 = session.get(delay_url)
|
||||
verify_error_response(response4, HTTPX::RequestTimeoutError)
|
||||
|
||||
assert circuit.instance_variable_get(:@attempts) == 2
|
||||
assert circuit.instance_variable_get(:@state) == :half_open
|
||||
|
||||
# circuit closed again
|
||||
response5 = session.get(delay_url)
|
||||
verify_status(response5, 200)
|
||||
|
||||
assert circuit.instance_variable_get(:@state) == :closed
|
||||
|
||||
response1 = session.get(delay_url, timeout: { request_timeout: 0.5 })
|
||||
response2 = session.get(delay_url, timeout: { request_timeout: 0.5 })
|
||||
verify_error_response(response1, HTTPX::RequestTimeoutError)
|
||||
verify_error_response(response2, HTTPX::RequestTimeoutError)
|
||||
|
||||
# circuit open
|
||||
assert circuit.instance_variable_get(:@attempts) == 2
|
||||
assert circuit.instance_variable_get(:@state) == :open
|
||||
|
||||
sleep 1.5
|
||||
|
||||
# circuit half-open
|
||||
response3 = session.get(delay_url, timeout: { request_timeout: 0.5 })
|
||||
verify_error_response(response3, HTTPX::RequestTimeoutError)
|
||||
|
||||
# attempts reset, haf-open -> open transition
|
||||
assert circuit.instance_variable_get(:@attempts) == 1
|
||||
assert circuit.instance_variable_get(:@state) == :open
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user