mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
making the selector closer to the nio4r pure ruby selector
Some requests were hanging because some connection with read interests were only on the writers selector. This issue manifested itself in high load scenarios. The fix is not the most performant, but it does the job: only set write interest when connecting, otherwise read/write. This increases the number of wakeups, but at least we have correctness.
This commit is contained in:
parent
c1d8b30d08
commit
be39f6b901
@ -10,6 +10,7 @@ services:
|
||||
- HTTPX_SOCKS4A_PROXY=socks4a://socksproxy:8080
|
||||
- HTTPX_SOCKS5_PROXY=socks5://socksproxy:8080
|
||||
- HTTPX_SSH_PROXY=ssh://sshproxy:22
|
||||
- PARALLEL=1
|
||||
- N=6 # minitest workers
|
||||
- MT_CPU=6 # minitest workers
|
||||
- CI=1
|
||||
|
@ -146,13 +146,7 @@ module HTTPX
|
||||
def interests
|
||||
return :w if @state == :idle
|
||||
|
||||
readable = !@read_buffer.full?
|
||||
writable = !@write_buffer.empty?
|
||||
if readable
|
||||
writable ? :rw : :r
|
||||
else
|
||||
writable ? :w : :r
|
||||
end
|
||||
:rw
|
||||
end
|
||||
|
||||
def to_io
|
||||
|
@ -70,6 +70,7 @@ module HTTPX
|
||||
|
||||
def response=(response)
|
||||
return unless response
|
||||
|
||||
@timer.cancel if @timer
|
||||
@response = response
|
||||
end
|
||||
|
@ -48,44 +48,25 @@ class HTTPX::Selector
|
||||
end
|
||||
|
||||
def initialize
|
||||
@readers = {}
|
||||
@writers = {}
|
||||
@selectables = {}
|
||||
@__r__, @__w__ = IO.pipe
|
||||
@closed = false
|
||||
end
|
||||
|
||||
# deregisters +io+ from selectables.
|
||||
def deregister(io)
|
||||
rmonitor = @readers.delete(io)
|
||||
wmonitor = @writers.delete(io)
|
||||
monitor = rmonitor || wmonitor
|
||||
monitor = @selectables.delete(io)
|
||||
monitor.close(false) if monitor
|
||||
end
|
||||
|
||||
# register +io+ for +interests+ events.
|
||||
def register(io, interests)
|
||||
readable = READABLE.include?(interests)
|
||||
writable = WRITABLE.include?(interests)
|
||||
if readable
|
||||
monitor = @readers[io]
|
||||
if monitor
|
||||
monitor.interests = interests
|
||||
else
|
||||
monitor = Monitor.new(io, interests, self)
|
||||
end
|
||||
@readers[io] = monitor
|
||||
@writers.delete(io) unless writable
|
||||
end
|
||||
if writable
|
||||
monitor = @writers[io]
|
||||
if monitor
|
||||
monitor.interests = interests
|
||||
else
|
||||
# reuse object
|
||||
monitor = readable ? @readers[io] : Monitor.new(io, interests, self)
|
||||
end
|
||||
@writers[io] = monitor
|
||||
@readers.delete(io) unless readable
|
||||
monitor = @selectables[io]
|
||||
if monitor
|
||||
monitor.interests = interests
|
||||
else
|
||||
monitor = Monitor.new(io, interests, self)
|
||||
@selectables[io] = monitor
|
||||
end
|
||||
monitor
|
||||
end
|
||||
@ -95,16 +76,20 @@ class HTTPX::Selector
|
||||
#
|
||||
def select(interval)
|
||||
begin
|
||||
r = @readers.keys
|
||||
w = @writers.keys
|
||||
r.unshift(@__r__)
|
||||
r = [@__r__]
|
||||
w = []
|
||||
|
||||
@selectables.each do |io, monitor|
|
||||
r << io if monitor.interests == :r || monitor.interests == :rw
|
||||
w << io if monitor.interests == :w || monitor.interests == :rw
|
||||
monitor.readiness = nil
|
||||
end
|
||||
|
||||
readers, writers = IO.select(r, w, nil, interval)
|
||||
|
||||
raise HTTPX::TimeoutError.new(interval, "timed out while waiting on select") if readers.nil? && writers.nil?
|
||||
rescue IOError, SystemCallError
|
||||
@readers.reject! { |io, _| io.closed? }
|
||||
@writers.reject! { |io, _| io.closed? }
|
||||
@selectables.reject! { |io, _| io.closed? }
|
||||
retry
|
||||
end
|
||||
|
||||
@ -113,7 +98,7 @@ class HTTPX::Selector
|
||||
# clean up wakeups
|
||||
@__r__.read(@__r__.stat.size)
|
||||
else
|
||||
monitor = io.closed? ? @readers.delete(io) : @readers[io]
|
||||
monitor = io.closed? ? @selectables.delete(io) : @selectables[io]
|
||||
next unless monitor
|
||||
|
||||
monitor.readiness = writers.delete(io) ? :rw : :r
|
||||
@ -122,7 +107,7 @@ class HTTPX::Selector
|
||||
end if readers
|
||||
|
||||
writers.each do |io|
|
||||
monitor = io.closed? ? @writers.delete(io) : @writers[io]
|
||||
monitor = io.closed? ? @selectables.delete(io) : @selectables[io]
|
||||
next unless monitor
|
||||
|
||||
# don't double run this, the last iteration might have run this task already
|
||||
|
@ -17,7 +17,7 @@ module MinitestExtensions
|
||||
TestTimeout = Class.new(Timeout::Error)
|
||||
|
||||
def run(*)
|
||||
::Timeout.timeout(RUBY_ENGINE == "jruby" ? 60 : 30, TestTimeout) { super }
|
||||
::Timeout.timeout(60, TestTimeout) { super }
|
||||
end
|
||||
end
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user