diff --git a/lib/httpx/pool.rb b/lib/httpx/pool.rb index f4156ff5..fcecfa30 100644 --- a/lib/httpx/pool.rb +++ b/lib/httpx/pool.rb @@ -14,7 +14,7 @@ module HTTPX def initialize @resolvers = {} - @_resolver_monitors = {} + @_resolver_ios = {} @timers = Timers::Group.new @selector = Selector.new @connections = [] @@ -33,9 +33,8 @@ module HTTPX throw(:jump_tick) end - @selector.select(timeout) do |monitor| - monitor.io.call - end + @selector.select(timeout, &:call) + @timers.fire end rescue StandardError => e @@ -85,7 +84,7 @@ module HTTPX resolver << connection return if resolver.empty? - @_resolver_monitors[resolver] ||= @selector.register(resolver) + @_resolver_ios[resolver] ||= @selector.register(resolver) end def on_resolver_connection(connection) @@ -117,8 +116,7 @@ module HTTPX @resolvers.delete(resolver_type) @selector.deregister(resolver) - monitor = @_resolver_monitors.delete(resolver) - monitor.close if monitor + @_resolver_ios.delete(resolver) resolver.close unless resolver.closed? end diff --git a/lib/httpx/selector.rb b/lib/httpx/selector.rb index 91535689..c59092b7 100644 --- a/lib/httpx/selector.rb +++ b/lib/httpx/selector.rb @@ -30,56 +30,20 @@ class HTTPX::Selector using IOExtensions unless IO.method_defined?(:wait) && IO.instance_method(:wait).arity == 2 - # - # I/O monitor - # - class Monitor - attr_accessor :io - - def initialize(io, reactor) - @io = io - @reactor = reactor - @closed = false - end - - # closes +@io+, deregisters from reactor (unless +deregister+ is false) - def close(deregister = true) - return if @closed - - @closed = true - @reactor.deregister(@io) if deregister - end - - def closed? - @closed - end - - # :nocov: - def to_s - "#<#{self.class}: #{@io}(closed:#{@closed}) #{@io.interests} #{object_id.to_s(16)}>" - end - # :nocov: - end - def initialize - @selectables = {} + @selectables = [] end # deregisters +io+ from selectables. def deregister(io) - monitor = @selectables.delete(io) - monitor.close(false) if monitor + @selectables.delete(io) end # register +io+. def register(io) - monitor = @selectables[io] - return if monitor + return if @selectables.include?(io) - monitor = Monitor.new(io, self) - @selectables[io] = monitor - - monitor + @selectables << io end private @@ -88,57 +52,78 @@ class HTTPX::Selector WRITE_INTERESTS = %i[w rw].freeze def select_many(interval) - begin - r = nil - w = nil + selectables, r, w = nil - @selectables.each_key do |io| - interests = io.interests + # first, we group IOs based on interest type. On call to #interests however, + # things might already happen, and new IOs might be registered, so we might + # have to start all over again. We do this until we group all selectables + loop do + begin + r = nil + w = nil - (r ||= []) << io if READ_INTERESTS.include?(interests) - (w ||= []) << io if WRITE_INTERESTS.include?(interests) + selectables = @selectables + @selectables = [] + + selectables.each do |io| + interests = io.interests + + (r ||= []) << io if READ_INTERESTS.include?(interests) + (w ||= []) << io if WRITE_INTERESTS.include?(interests) + end + + if @selectables.empty? + @selectables = selectables + break + else + @selectables = [*selectables, @selectables] + end + rescue StandardError + @selectables = selectables if selectables + raise end + end + # TODO: what to do if there are no selectables? + + begin readers, writers = IO.select(r, w, nil, interval) raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil? rescue IOError, SystemCallError - @selectables.reject! { |io, _| io.closed? } + @selectables.reject!(&:closed?) retry end readers.each do |io| - monitor = @selectables[io] - next unless monitor + yield io # so that we don't yield 2 times writers.delete(io) - - yield monitor end if readers writers.each do |io| - monitor = @selectables[io] - next unless monitor - - yield monitor + yield io end if writers end def select_one(interval) - io, monitor = @selectables.first + io = @selectables.first - result = case io.interests + interests = io.interests + + result = case interests when :r then io.to_io.wait_readable(interval) when :w then io.to_io.wait_writable(interval) when :rw then io.to_io.wait(interval, :read_write) + when nil then return end raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") unless result - yield monitor + yield io rescue IOError, SystemCallError - @selectables.reject! { |ios, _| ios.closed? } + @selectables.reject!(&:closed?) end def select(interval, &block)