mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-10-09 00:02:50 -04:00
deleting io monitor (not needed)
This commit is contained in:
parent
ec6e39d7b9
commit
24f1b1426f
@ -14,7 +14,7 @@ module HTTPX
|
|||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@resolvers = {}
|
@resolvers = {}
|
||||||
@_resolver_monitors = {}
|
@_resolver_ios = {}
|
||||||
@timers = Timers::Group.new
|
@timers = Timers::Group.new
|
||||||
@selector = Selector.new
|
@selector = Selector.new
|
||||||
@connections = []
|
@connections = []
|
||||||
@ -33,9 +33,8 @@ module HTTPX
|
|||||||
throw(:jump_tick)
|
throw(:jump_tick)
|
||||||
end
|
end
|
||||||
|
|
||||||
@selector.select(timeout) do |monitor|
|
@selector.select(timeout, &:call)
|
||||||
monitor.io.call
|
|
||||||
end
|
|
||||||
@timers.fire
|
@timers.fire
|
||||||
end
|
end
|
||||||
rescue StandardError => e
|
rescue StandardError => e
|
||||||
@ -85,7 +84,7 @@ module HTTPX
|
|||||||
resolver << connection
|
resolver << connection
|
||||||
return if resolver.empty?
|
return if resolver.empty?
|
||||||
|
|
||||||
@_resolver_monitors[resolver] ||= @selector.register(resolver)
|
@_resolver_ios[resolver] ||= @selector.register(resolver)
|
||||||
end
|
end
|
||||||
|
|
||||||
def on_resolver_connection(connection)
|
def on_resolver_connection(connection)
|
||||||
@ -117,8 +116,7 @@ module HTTPX
|
|||||||
@resolvers.delete(resolver_type)
|
@resolvers.delete(resolver_type)
|
||||||
|
|
||||||
@selector.deregister(resolver)
|
@selector.deregister(resolver)
|
||||||
monitor = @_resolver_monitors.delete(resolver)
|
@_resolver_ios.delete(resolver)
|
||||||
monitor.close if monitor
|
|
||||||
resolver.close unless resolver.closed?
|
resolver.close unless resolver.closed?
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -30,56 +30,20 @@ class HTTPX::Selector
|
|||||||
|
|
||||||
using IOExtensions unless IO.method_defined?(:wait) && IO.instance_method(:wait).arity == 2
|
using IOExtensions unless IO.method_defined?(:wait) && IO.instance_method(:wait).arity == 2
|
||||||
|
|
||||||
#
|
|
||||||
# I/O monitor
|
|
||||||
#
|
|
||||||
class Monitor
|
|
||||||
attr_accessor :io
|
|
||||||
|
|
||||||
def initialize(io, reactor)
|
|
||||||
@io = io
|
|
||||||
@reactor = reactor
|
|
||||||
@closed = false
|
|
||||||
end
|
|
||||||
|
|
||||||
# closes +@io+, deregisters from reactor (unless +deregister+ is false)
|
|
||||||
def close(deregister = true)
|
|
||||||
return if @closed
|
|
||||||
|
|
||||||
@closed = true
|
|
||||||
@reactor.deregister(@io) if deregister
|
|
||||||
end
|
|
||||||
|
|
||||||
def closed?
|
|
||||||
@closed
|
|
||||||
end
|
|
||||||
|
|
||||||
# :nocov:
|
|
||||||
def to_s
|
|
||||||
"#<#{self.class}: #{@io}(closed:#{@closed}) #{@io.interests} #{object_id.to_s(16)}>"
|
|
||||||
end
|
|
||||||
# :nocov:
|
|
||||||
end
|
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@selectables = {}
|
@selectables = []
|
||||||
end
|
end
|
||||||
|
|
||||||
# deregisters +io+ from selectables.
|
# deregisters +io+ from selectables.
|
||||||
def deregister(io)
|
def deregister(io)
|
||||||
monitor = @selectables.delete(io)
|
@selectables.delete(io)
|
||||||
monitor.close(false) if monitor
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# register +io+.
|
# register +io+.
|
||||||
def register(io)
|
def register(io)
|
||||||
monitor = @selectables[io]
|
return if @selectables.include?(io)
|
||||||
return if monitor
|
|
||||||
|
|
||||||
monitor = Monitor.new(io, self)
|
@selectables << io
|
||||||
@selectables[io] = monitor
|
|
||||||
|
|
||||||
monitor
|
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
@ -88,57 +52,78 @@ class HTTPX::Selector
|
|||||||
WRITE_INTERESTS = %i[w rw].freeze
|
WRITE_INTERESTS = %i[w rw].freeze
|
||||||
|
|
||||||
def select_many(interval)
|
def select_many(interval)
|
||||||
|
selectables, r, w = nil
|
||||||
|
|
||||||
|
# first, we group IOs based on interest type. On call to #interests however,
|
||||||
|
# things might already happen, and new IOs might be registered, so we might
|
||||||
|
# have to start all over again. We do this until we group all selectables
|
||||||
|
loop do
|
||||||
begin
|
begin
|
||||||
r = nil
|
r = nil
|
||||||
w = nil
|
w = nil
|
||||||
|
|
||||||
@selectables.each_key do |io|
|
selectables = @selectables
|
||||||
|
@selectables = []
|
||||||
|
|
||||||
|
selectables.each do |io|
|
||||||
interests = io.interests
|
interests = io.interests
|
||||||
|
|
||||||
(r ||= []) << io if READ_INTERESTS.include?(interests)
|
(r ||= []) << io if READ_INTERESTS.include?(interests)
|
||||||
(w ||= []) << io if WRITE_INTERESTS.include?(interests)
|
(w ||= []) << io if WRITE_INTERESTS.include?(interests)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if @selectables.empty?
|
||||||
|
@selectables = selectables
|
||||||
|
break
|
||||||
|
else
|
||||||
|
@selectables = [*selectables, @selectables]
|
||||||
|
end
|
||||||
|
rescue StandardError
|
||||||
|
@selectables = selectables if selectables
|
||||||
|
raise
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# TODO: what to do if there are no selectables?
|
||||||
|
|
||||||
|
begin
|
||||||
readers, writers = IO.select(r, w, nil, interval)
|
readers, writers = IO.select(r, w, nil, interval)
|
||||||
|
|
||||||
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil?
|
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil?
|
||||||
rescue IOError, SystemCallError
|
rescue IOError, SystemCallError
|
||||||
@selectables.reject! { |io, _| io.closed? }
|
@selectables.reject!(&:closed?)
|
||||||
retry
|
retry
|
||||||
end
|
end
|
||||||
|
|
||||||
readers.each do |io|
|
readers.each do |io|
|
||||||
monitor = @selectables[io]
|
yield io
|
||||||
next unless monitor
|
|
||||||
|
|
||||||
# so that we don't yield 2 times
|
# so that we don't yield 2 times
|
||||||
writers.delete(io)
|
writers.delete(io)
|
||||||
|
|
||||||
yield monitor
|
|
||||||
end if readers
|
end if readers
|
||||||
|
|
||||||
writers.each do |io|
|
writers.each do |io|
|
||||||
monitor = @selectables[io]
|
yield io
|
||||||
next unless monitor
|
|
||||||
|
|
||||||
yield monitor
|
|
||||||
end if writers
|
end if writers
|
||||||
end
|
end
|
||||||
|
|
||||||
def select_one(interval)
|
def select_one(interval)
|
||||||
io, monitor = @selectables.first
|
io = @selectables.first
|
||||||
|
|
||||||
result = case io.interests
|
interests = io.interests
|
||||||
|
|
||||||
|
result = case interests
|
||||||
when :r then io.to_io.wait_readable(interval)
|
when :r then io.to_io.wait_readable(interval)
|
||||||
when :w then io.to_io.wait_writable(interval)
|
when :w then io.to_io.wait_writable(interval)
|
||||||
when :rw then io.to_io.wait(interval, :read_write)
|
when :rw then io.to_io.wait(interval, :read_write)
|
||||||
|
when nil then return
|
||||||
end
|
end
|
||||||
|
|
||||||
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") unless result
|
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") unless result
|
||||||
|
|
||||||
yield monitor
|
yield io
|
||||||
rescue IOError, SystemCallError
|
rescue IOError, SystemCallError
|
||||||
@selectables.reject! { |ios, _| ios.closed? }
|
@selectables.reject!(&:closed?)
|
||||||
end
|
end
|
||||||
|
|
||||||
def select(interval, &block)
|
def select(interval, &block)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user