mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
introduce custom timer to replace Timers::Group
The HTTPX::Timers class mimicks the same top-level API as its predecessors, but simplifies its implementation. Adding a timer will resort all timers, while lookups are roughly the same complexity. The key difference is that callbacks are now aggregated by interval, i.e. different requests setting the same timeout, will reuse the same timer. This is a more simple design than Timers::Group, which stores timers in a binary search tree; the latter will perform well in any environment, whereas the first one is more tailored for the use-case of httpx, where most of the times no timers will be set, and when they do, the same timer will be reused for all requests because they usually have the same set of options (and therefore timeouts).
This commit is contained in:
parent
5146cfae44
commit
27d81f3090
@ -32,5 +32,4 @@ Gem::Specification.new do |gem|
|
||||
gem.require_paths = ["lib"]
|
||||
|
||||
gem.add_runtime_dependency "http-2-next", ">= 0.4.1"
|
||||
gem.add_runtime_dependency "timers"
|
||||
end
|
||||
|
@ -13,6 +13,7 @@ require "httpx/callbacks"
|
||||
require "httpx/loggable"
|
||||
require "httpx/registry"
|
||||
require "httpx/transcoder"
|
||||
require "httpx/timers"
|
||||
require "httpx/pool"
|
||||
require "httpx/headers"
|
||||
require "httpx/request"
|
||||
|
@ -213,7 +213,7 @@ module HTTPX
|
||||
if @parser && !@write_buffer.full?
|
||||
request.headers["alt-used"] = @origin.authority if match_altsvcs?(request.uri)
|
||||
|
||||
if @response_received_at &&
|
||||
if @response_received_at && @keep_alive_timeout &&
|
||||
(Process.clock_gettime(Process::CLOCK_MONOTONIC) - @response_received_at) > @keep_alive_timeout
|
||||
# when pushing a request into an existing connection, we have to check whether there
|
||||
# is the possibility that the connection might have extended the keep alive timeout.
|
||||
@ -223,8 +223,7 @@ module HTTPX
|
||||
return
|
||||
end
|
||||
|
||||
@inflight += 1
|
||||
parser.send(request)
|
||||
send_request_to_parser(request)
|
||||
else
|
||||
@pending << request
|
||||
end
|
||||
@ -389,8 +388,7 @@ module HTTPX
|
||||
|
||||
def send_pending
|
||||
while !@write_buffer.full? && (request = @pending.shift)
|
||||
@inflight += 1
|
||||
parser.send(request)
|
||||
send_request_to_parser(request)
|
||||
end
|
||||
end
|
||||
|
||||
@ -398,6 +396,11 @@ module HTTPX
|
||||
@parser ||= build_parser
|
||||
end
|
||||
|
||||
def send_request_to_parser(request)
|
||||
@inflight += 1
|
||||
parser.send(request)
|
||||
end
|
||||
|
||||
def build_parser(protocol = @io.protocol)
|
||||
parser = registry(protocol).new(@write_buffer, @options)
|
||||
set_parser_callbacks(parser)
|
||||
|
@ -69,9 +69,14 @@ module HTTPX
|
||||
end
|
||||
|
||||
module ConnectionMethods
|
||||
def send(request)
|
||||
def send_request_to_parser(request)
|
||||
super
|
||||
|
||||
return unless request.headers["expect"] == "100-continue"
|
||||
|
||||
request.once(:expect) do
|
||||
@timers.after(@options.expect_timeout) do
|
||||
@timers.after(request.options.expect_timeout) do
|
||||
# expect timeout expired
|
||||
if request.state == :expect && !request.expects?
|
||||
Expect.no_expect_store << request.origin
|
||||
request.headers.delete("expect")
|
||||
@ -79,7 +84,6 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
end
|
||||
super
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -69,9 +69,9 @@ module HTTPX
|
||||
end
|
||||
|
||||
def transition(nextstate)
|
||||
state = @state
|
||||
prev_state = @state
|
||||
super
|
||||
meter_elapsed_time("Request##{object_id}[#{@verb} #{@uri}: #{state}] -> #{nextstate}") if nextstate == @state
|
||||
meter_elapsed_time("Request##{object_id}[#{@verb} #{@uri}: #{prev_state}] -> #{@state}") if prev_state != @state
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "forwardable"
|
||||
require "timers"
|
||||
require "httpx/selector"
|
||||
require "httpx/connection"
|
||||
require "httpx/resolver"
|
||||
@ -16,7 +15,7 @@ module HTTPX
|
||||
def initialize
|
||||
@resolvers = {}
|
||||
@_resolver_ios = {}
|
||||
@timers = Timers::Group.new
|
||||
@timers = Timers.new
|
||||
@selector = Selector.new
|
||||
@connections = []
|
||||
@connected_connections = 0
|
||||
@ -28,15 +27,18 @@ module HTTPX
|
||||
|
||||
def next_tick
|
||||
catch(:jump_tick) do
|
||||
timeout = [next_timeout, @timers.wait_interval].compact.min
|
||||
timeout = [@timers.wait_interval, next_timeout].compact.min
|
||||
if timeout && timeout.negative?
|
||||
@timers.fire
|
||||
throw(:jump_tick)
|
||||
end
|
||||
|
||||
@selector.select(timeout, &:call)
|
||||
|
||||
@timers.fire
|
||||
begin
|
||||
@selector.select(timeout, &:call)
|
||||
@timers.fire
|
||||
rescue TimeoutError => e
|
||||
@timers.fire(e)
|
||||
end
|
||||
end
|
||||
rescue StandardError => e
|
||||
@connections.each do |connection|
|
||||
|
84
lib/httpx/timers.rb
Normal file
84
lib/httpx/timers.rb
Normal file
@ -0,0 +1,84 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX
|
||||
class Timers
|
||||
def initialize
|
||||
@intervals = []
|
||||
end
|
||||
|
||||
def after(interval_in_secs, &blk)
|
||||
return unless interval_in_secs
|
||||
|
||||
# I'm assuming here that most requests will have the same
|
||||
# request timeout, as in most cases they share common set of
|
||||
# options. A user setting different request timeouts for 100s of
|
||||
# requests will already have a hard time dealing with that.
|
||||
unless (interval = @intervals.find { |t| t == interval_in_secs })
|
||||
interval = Interval.new(interval_in_secs)
|
||||
@intervals << interval
|
||||
@intervals.sort!
|
||||
end
|
||||
|
||||
interval << blk
|
||||
end
|
||||
|
||||
def wait_interval
|
||||
return if @intervals.empty?
|
||||
|
||||
@next_interval_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
|
||||
@intervals.first.interval
|
||||
end
|
||||
|
||||
def fire(error = nil)
|
||||
raise error if error && error.timeout != @intervals.first
|
||||
return if @intervals.empty? || !@next_interval_at
|
||||
|
||||
elapsed_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) - @next_interval_at
|
||||
|
||||
@intervals.delete_if { |interval| interval.elapse(elapsed_time) <= 0 }
|
||||
end
|
||||
|
||||
def cancel
|
||||
@intervals.clear
|
||||
end
|
||||
|
||||
class Interval
|
||||
include Comparable
|
||||
|
||||
attr_reader :interval
|
||||
|
||||
def initialize(interval)
|
||||
@interval = interval
|
||||
@callbacks = []
|
||||
end
|
||||
|
||||
def <=>(other)
|
||||
@interval <=> other.interval
|
||||
end
|
||||
|
||||
def ==(other)
|
||||
return @interval == other if other.is_a?(Numeric)
|
||||
|
||||
@interval == other.to_f # rubocop:disable Lint/FloatComparison
|
||||
end
|
||||
|
||||
def to_f
|
||||
@interval
|
||||
end
|
||||
|
||||
def <<(callback)
|
||||
@callbacks << callback
|
||||
end
|
||||
|
||||
def elapse(elapsed)
|
||||
@interval -= elapsed
|
||||
|
||||
@callbacks.each(&:call) if @interval <= 0
|
||||
|
||||
@interval
|
||||
end
|
||||
end
|
||||
private_constant :Interval
|
||||
end
|
||||
end
|
@ -25,7 +25,15 @@ module HTTPX
|
||||
attr_reader state: Symbol
|
||||
attr_reader pending: Array[Request]
|
||||
attr_reader options: Options
|
||||
attr_writer timers: untyped # Timers::Timer
|
||||
attr_writer timers: Timers
|
||||
|
||||
@origins: Array[URI::Generic]
|
||||
@window_size: Integer
|
||||
@read_buffer: Buffer
|
||||
@write_buffer: Buffer
|
||||
@inflight: Integer
|
||||
@keep_alive_timeout: Numeric?
|
||||
@total_timeout: Numeric?
|
||||
|
||||
def addresses: () -> Array[ipaddr]?
|
||||
|
||||
@ -76,6 +84,8 @@ module HTTPX
|
||||
|
||||
def parser: () -> _Parser
|
||||
|
||||
def send_request_to_parser: (Request request) -> void
|
||||
|
||||
def build_parser: () -> _Parser
|
||||
| (String) -> _Parser
|
||||
|
||||
@ -89,9 +99,6 @@ module HTTPX
|
||||
|
||||
def handle_error: (StandardError) -> void
|
||||
|
||||
def total_timeout: () -> untyped?
|
||||
# def total_timeout: () -> Timers::Timer?
|
||||
#
|
||||
def purge_after_closed: () -> void
|
||||
end
|
||||
end
|
31
sig/timers.rbs
Normal file
31
sig/timers.rbs
Normal file
@ -0,0 +1,31 @@
|
||||
module HTTPX
|
||||
class Timers
|
||||
@interval: Array[Interval]
|
||||
|
||||
def after: (Numeric interval_in_secs) { () -> void } -> void
|
||||
|
||||
def wait_interval: () -> Numeric?
|
||||
|
||||
def fire: (?StandardError error) -> void
|
||||
|
||||
def cancel: () -> void
|
||||
private
|
||||
|
||||
def initialize: () -> void
|
||||
|
||||
class Interval
|
||||
include Comparable
|
||||
|
||||
attr_reader interval: Numeric
|
||||
|
||||
def to_f: () -> Float
|
||||
|
||||
def <<: (^() -> void) -> void
|
||||
|
||||
def elapse: (Numeric elapsed) -> Numeric
|
||||
private
|
||||
|
||||
def initialize: (Numeric interval) -> void
|
||||
end
|
||||
end
|
||||
end
|
Loading…
x
Reference in New Issue
Block a user