mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-12-07 00:00:31 -05:00
Merge branch 'issue-236' into 'master'
`:response_cache` andd `:circuit_breaker` improvements Closes #236 See merge request os85/httpx!255
This commit is contained in:
commit
7c1d7083ab
@ -31,6 +31,16 @@ module HTTPX
|
||||
@circuit_store = orig.instance_variable_get(:@circuit_store).dup
|
||||
end
|
||||
|
||||
%i[circuit_open].each do |meth|
|
||||
class_eval(<<-MOD, __FILE__, __LINE__ + 1)
|
||||
def on_#{meth}(&blk) # def on_circuit_open(&blk)
|
||||
on(:#{meth}, &blk) # on(:circuit_open, &blk)
|
||||
end # end
|
||||
MOD
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def send_requests(*requests)
|
||||
# @type var short_circuit_responses: Array[response]
|
||||
short_circuit_responses = []
|
||||
@ -59,6 +69,12 @@ module HTTPX
|
||||
end
|
||||
|
||||
def on_response(request, response)
|
||||
emit(:circuit_open, request) if try_circuit_open(request, response)
|
||||
|
||||
super
|
||||
end
|
||||
|
||||
def try_circuit_open(request, response)
|
||||
if response.is_a?(ErrorResponse)
|
||||
case response.error
|
||||
when RequestTimeoutError
|
||||
@ -69,8 +85,6 @@ module HTTPX
|
||||
elsif (break_on = request.options.circuit_breaker_break_on) && break_on.call(response)
|
||||
@circuit_store.try_open(request.uri, response)
|
||||
end
|
||||
|
||||
super
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@ -102,9 +102,9 @@ module HTTPX
|
||||
|
||||
module ResponseMethods
|
||||
def copy_from_cached(other)
|
||||
@body = other.body
|
||||
@body = other.body.dup
|
||||
|
||||
@body.__send__(:rewind)
|
||||
@body.rewind
|
||||
end
|
||||
|
||||
# A response is fresh if its age has not yet exceeded its freshness lifetime.
|
||||
@ -169,7 +169,7 @@ module HTTPX
|
||||
def date
|
||||
@date ||= Time.httpdate(@headers["date"])
|
||||
rescue NoMethodError, ArgumentError
|
||||
Time.now.httpdate
|
||||
Time.now
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -1,28 +1,25 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "forwardable"
|
||||
require "mutex_m"
|
||||
|
||||
module HTTPX::Plugins
|
||||
module ResponseCache
|
||||
class Store
|
||||
extend Forwardable
|
||||
|
||||
def_delegator :@store, :clear
|
||||
|
||||
def initialize
|
||||
@store = {}
|
||||
@store.extend(Mutex_m)
|
||||
end
|
||||
|
||||
def clear
|
||||
@store.synchronize { @store.clear }
|
||||
end
|
||||
|
||||
def lookup(request)
|
||||
responses = @store[request.response_cache_key]
|
||||
responses = _get(request)
|
||||
|
||||
return unless responses
|
||||
|
||||
response = responses.find(&method(:match_by_vary?).curry(2)[request])
|
||||
|
||||
return unless response && response.fresh?
|
||||
|
||||
response
|
||||
responses.find(&method(:match_by_vary?).curry(2)[request])
|
||||
end
|
||||
|
||||
def cached?(request)
|
||||
@ -32,11 +29,7 @@ module HTTPX::Plugins
|
||||
def cache(request, response)
|
||||
return unless ResponseCache.cacheable_request?(request) && ResponseCache.cacheable_response?(response)
|
||||
|
||||
responses = (@store[request.response_cache_key] ||= [])
|
||||
|
||||
responses.reject!(&method(:match_by_vary?).curry(2)[request])
|
||||
|
||||
responses << response
|
||||
_set(request, response)
|
||||
end
|
||||
|
||||
def prepare(request)
|
||||
@ -71,6 +64,32 @@ module HTTPX::Plugins
|
||||
!original_request.headers.key?(cache_field) || request.headers[cache_field] == original_request.headers[cache_field]
|
||||
end
|
||||
end
|
||||
|
||||
def _get(request)
|
||||
@store.synchronize do
|
||||
responses = @store[request.response_cache_key]
|
||||
|
||||
return unless responses
|
||||
|
||||
responses.select! do |res|
|
||||
!res.body.closed? && res.fresh?
|
||||
end
|
||||
|
||||
responses
|
||||
end
|
||||
end
|
||||
|
||||
def _set(request, response)
|
||||
@store.synchronize do
|
||||
responses = (@store[request.response_cache_key] ||= [])
|
||||
|
||||
responses.reject! do |res|
|
||||
res.body.closed? || !res.fresh? || match_by_vary?(request, res)
|
||||
end
|
||||
|
||||
responses << response
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@ -140,6 +140,12 @@ module HTTPX
|
||||
@state = :idle
|
||||
end
|
||||
|
||||
def initialize_dup(other)
|
||||
super
|
||||
|
||||
@buffer = other.instance_variable_get(:@buffer).dup
|
||||
end
|
||||
|
||||
def closed?
|
||||
@state == :closed
|
||||
end
|
||||
|
||||
@ -5,7 +5,7 @@ module HTTPX
|
||||
class CircuitStore
|
||||
@circuits: Hash[String, Circuit]
|
||||
|
||||
def try_open: (generic_uri uri, response response) -> void
|
||||
def try_open: (generic_uri uri, response response) -> response?
|
||||
|
||||
def try_respond: (Request request) -> response?
|
||||
|
||||
@ -30,7 +30,7 @@ module HTTPX
|
||||
|
||||
def respond: () -> response?
|
||||
|
||||
def try_open: (response) -> void
|
||||
def try_open: (response) -> response?
|
||||
|
||||
def try_close: () -> void
|
||||
|
||||
@ -52,6 +52,10 @@ module HTTPX
|
||||
|
||||
module InstanceMethods
|
||||
@circuit_store: CircuitStore
|
||||
|
||||
private
|
||||
|
||||
def try_circuit_open: (Request request, response response) -> response?
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
@ -8,7 +8,7 @@ module HTTPX
|
||||
def self?.cached_response?: (response response) -> bool
|
||||
|
||||
class Store
|
||||
@store: Hash[String, Array[Response]]
|
||||
@store: Hash[String, Array[Response]] & Mutex_m
|
||||
|
||||
def lookup: (Request request) -> Response?
|
||||
|
||||
@ -21,6 +21,10 @@ module HTTPX
|
||||
private
|
||||
|
||||
def match_by_vary?: (Request request, Response response) -> bool
|
||||
|
||||
def _get: (Request request) -> Array[Response]?
|
||||
|
||||
def _set: (Request request, Response response) -> void
|
||||
end
|
||||
|
||||
module InstanceMethods
|
||||
|
||||
@ -55,8 +55,18 @@ class ResponseCacheStoreTest < Minitest::Test
|
||||
assert store.lookup(request).nil?
|
||||
|
||||
request2 = request_class.new("GET", "http://example2.com/")
|
||||
_response2 = cached_response(request2, extra_headers: { "cache-control" => "no-cache", "expires" => (Time.now + 2).httpdate })
|
||||
cached_response(request2, extra_headers: { "cache-control" => "no-cache", "expires" => (Time.now + 2).httpdate })
|
||||
assert store.lookup(request2).nil?
|
||||
|
||||
request_invalid_expires = request_class.new("GET", "http://example3.com/")
|
||||
invalid_expires_response = cached_response(request_invalid_expires, extra_headers: { "expires" => "smthsmth" })
|
||||
assert store.lookup(request_invalid_expires) == invalid_expires_response
|
||||
end
|
||||
|
||||
def test_store_invalid_date
|
||||
request_invalid_age = request_class.new("GET", "http://example4.com/")
|
||||
response_invalid_age = cached_response(request_invalid_age, extra_headers: { "cache-control" => "max-age=2", "date" => "smthsmth" })
|
||||
assert store.lookup(request_invalid_age) == response_invalid_age
|
||||
end
|
||||
|
||||
def test_prepare_vary
|
||||
|
||||
@ -66,7 +66,7 @@ if [[ ${RUBY_VERSION:0:1} = "3" ]] && [[ ! $RUBYOPT =~ "jit" ]]; then
|
||||
export RUBYOPT="$RUBYOPT -rbundler/setup -rrbs/test/setup"
|
||||
export RBS_TEST_RAISE=true
|
||||
export RBS_TEST_LOGLEVEL=error
|
||||
export RBS_TEST_OPT="-Isig -rset -rforwardable -ruri -rjson -ripaddr -rpathname -rtime -rtimeout -rresolv -rsocket -ropenssl -rbase64 -rzlib -rcgi -rhttp-2-next"
|
||||
export RBS_TEST_OPT="-Isig -rset -rmutex_m -rforwardable -ruri -rjson -ripaddr -rpathname -rtime -rtimeout -rresolv -rsocket -ropenssl -rbase64 -rzlib -rcgi -rhttp-2-next"
|
||||
export RBS_TEST_TARGET="HTTP*"
|
||||
fi
|
||||
|
||||
|
||||
@ -70,6 +70,30 @@ module Requests
|
||||
assert response1 == response2
|
||||
end
|
||||
|
||||
def test_plugin_circuit_breaker_on_circuit_open
|
||||
return unless origin.start_with?("http://")
|
||||
|
||||
unknown_uri = "http://www.qwwqjqwdjqiwdj.com"
|
||||
|
||||
circuit_opened = false
|
||||
session = HTTPX.plugin(:circuit_breaker,
|
||||
circuit_breaker_max_attempts: 1,
|
||||
circuit_breaker_break_in: 2,
|
||||
circuit_breaker_half_open_drip_rate: 1.0)
|
||||
.on_circuit_open { circuit_opened = true }
|
||||
|
||||
# circuit closed
|
||||
response1 = session.get(unknown_uri)
|
||||
verify_error_response(response1)
|
||||
|
||||
# circuit open
|
||||
response2 = session.get(unknown_uri)
|
||||
verify_error_response(response2)
|
||||
assert response2 == response1
|
||||
|
||||
assert circuit_opened
|
||||
end
|
||||
|
||||
# def test_plugin_circuit_breaker_half_open_drip_rate
|
||||
# unknown_uri = "http://www.qwwqjqwdjqiwdj.com"
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user