mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
calculate interests down to connection and parser
by analyzing all of the data down to the parser, one can estimate better whether io wants to read/write/both, thereby avoiding spurious wakeups. This also greatly simplifies the monitor API, and solves the 100% CPU utilization issue.
This commit is contained in:
parent
0136200b85
commit
9d6f31d940
@ -176,9 +176,21 @@ module HTTPX
|
||||
end
|
||||
|
||||
def interests
|
||||
return :w if @state == :idle
|
||||
# connecting
|
||||
if connecting?
|
||||
return :w unless @io
|
||||
|
||||
:rw
|
||||
return :rw if @io.state == :connected
|
||||
|
||||
return :w
|
||||
end
|
||||
|
||||
# if the write buffer is full, we drain it
|
||||
return :w if @write_buffer.full?
|
||||
|
||||
return @parser.interests if @parser
|
||||
|
||||
nil
|
||||
end
|
||||
|
||||
def to_io
|
||||
|
@ -23,6 +23,19 @@ module HTTPX
|
||||
@requests = []
|
||||
end
|
||||
|
||||
def interests
|
||||
# this means we're processing incoming response already
|
||||
return :r if @request
|
||||
|
||||
return if @requests.empty?
|
||||
|
||||
request = @requests.first
|
||||
|
||||
return :w if request.interests == :w || !@buffer.empty?
|
||||
|
||||
:r
|
||||
end
|
||||
|
||||
def reset
|
||||
@max_requests = @options.max_requests || MAX_REQUESTS
|
||||
@parser.reset!
|
||||
|
@ -30,6 +30,22 @@ module HTTPX
|
||||
init_connection
|
||||
end
|
||||
|
||||
def interests
|
||||
return :r if @buffer.full?
|
||||
|
||||
return :w if @connection.state == :closed
|
||||
|
||||
return :r unless (@connection.state == :connected && @handshake_completed)
|
||||
|
||||
return :w unless @pending.empty?
|
||||
|
||||
return :w if @streams.each_key.any? { |r| r.interests == :w }
|
||||
|
||||
return :r if @buffer.empty?
|
||||
|
||||
:rw
|
||||
end
|
||||
|
||||
def reset
|
||||
init_connection
|
||||
end
|
||||
|
@ -81,4 +81,4 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -11,6 +11,8 @@ module HTTPX
|
||||
|
||||
attr_reader :addresses
|
||||
|
||||
attr_reader :state
|
||||
|
||||
alias_method :host, :ip
|
||||
|
||||
def initialize(origin, addresses, options)
|
||||
|
@ -7,6 +7,10 @@ module HTTPX
|
||||
module Proxy
|
||||
module HTTP
|
||||
module ConnectionMethods
|
||||
def connecting?
|
||||
super || @state == :connecting || @state == :connected
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def transition(nextstate)
|
||||
@ -34,7 +38,6 @@ module HTTPX
|
||||
when :idle
|
||||
@parser = ProxyParser.new(@write_buffer, @options)
|
||||
set_parser_callbacks(@parser)
|
||||
@parser.on(:close) { transition(:closing) }
|
||||
end
|
||||
end
|
||||
super
|
||||
|
@ -31,6 +31,10 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
def connecting?
|
||||
super || @state == :authenticating || @state == :negotiating
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def transition(nextstate)
|
||||
|
@ -35,7 +35,6 @@ module HTTPX
|
||||
|
||||
@selector.select(timeout) do |monitor|
|
||||
monitor.io.call
|
||||
monitor.interests = monitor.io.interests
|
||||
end
|
||||
@timers.fire
|
||||
end
|
||||
@ -86,7 +85,7 @@ module HTTPX
|
||||
resolver << connection
|
||||
return if resolver.empty?
|
||||
|
||||
@_resolver_monitors[resolver] ||= @selector.register(resolver, :w)
|
||||
@_resolver_monitors[resolver] ||= @selector.register(resolver)
|
||||
end
|
||||
|
||||
def on_resolver_connection(connection)
|
||||
@ -128,10 +127,8 @@ module HTTPX
|
||||
# if open, an IO was passed upstream, therefore
|
||||
# consider it connected already.
|
||||
@connected_connections += 1
|
||||
@selector.register(connection, :rw)
|
||||
else
|
||||
@selector.register(connection, :w)
|
||||
end
|
||||
@selector.register(connection)
|
||||
connection.on(:close) do
|
||||
unregister_connection(connection)
|
||||
end
|
||||
|
@ -58,6 +58,12 @@ module HTTPX
|
||||
@state = :idle
|
||||
end
|
||||
|
||||
def interests
|
||||
return :r if @state == :done || @state == :expect
|
||||
|
||||
:w
|
||||
end
|
||||
|
||||
# :nocov:
|
||||
if RUBY_VERSION < "2.2"
|
||||
# rubocop: disable Lint/UriEscapeUnescape:
|
||||
|
@ -11,21 +11,24 @@ class HTTPX::Selector
|
||||
# I/O monitor
|
||||
#
|
||||
class Monitor
|
||||
attr_accessor :io, :interests, :readiness
|
||||
attr_accessor :io, :readiness
|
||||
|
||||
def initialize(io, interests, reactor)
|
||||
def initialize(io, reactor)
|
||||
@io = io
|
||||
@interests = interests
|
||||
@reactor = reactor
|
||||
@closed = false
|
||||
end
|
||||
|
||||
def interests
|
||||
@io.interests
|
||||
end
|
||||
|
||||
def readable?
|
||||
READABLE.include?(@interests)
|
||||
READABLE.include?(@io.interests)
|
||||
end
|
||||
|
||||
def writable?
|
||||
WRITABLE.include?(@interests)
|
||||
WRITABLE.include?(@io.interests)
|
||||
end
|
||||
|
||||
# closes +@io+, deregisters from reactor (unless +deregister+ is false)
|
||||
@ -42,7 +45,7 @@ class HTTPX::Selector
|
||||
|
||||
# :nocov:
|
||||
def to_s
|
||||
"#<#{self.class}: #{@io}(closed:#{@closed}) #{@interests} #{object_id.to_s(16)}>"
|
||||
"#<#{self.class}: #{@io}(closed:#{@closed}) #{@io.interests} #{object_id.to_s(16)}>"
|
||||
end
|
||||
# :nocov:
|
||||
end
|
||||
@ -59,15 +62,14 @@ class HTTPX::Selector
|
||||
monitor.close(false) if monitor
|
||||
end
|
||||
|
||||
# register +io+ for +interests+ events.
|
||||
def register(io, interests)
|
||||
# register +io+.
|
||||
def register(io)
|
||||
monitor = @selectables[io]
|
||||
if monitor
|
||||
monitor.interests = interests
|
||||
else
|
||||
monitor = Monitor.new(io, interests, self)
|
||||
@selectables[io] = monitor
|
||||
end
|
||||
return if monitor
|
||||
|
||||
monitor = Monitor.new(io, self)
|
||||
@selectables[io] = monitor
|
||||
|
||||
monitor
|
||||
end
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user