mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-10 00:01:27 -04:00
removing persistent connections from the selector whe inactive
keeping them around was resulting in some busy loops on timer events (i.e. retry after), making them unreliable, innacurate and CPU draining. they're now kept out whenever they're inactive.
This commit is contained in:
parent
edf7357a5f
commit
efddd72caa
@ -200,6 +200,8 @@ module HTTPX
|
||||
end
|
||||
|
||||
def close
|
||||
transition(:active) if @state == :inactive
|
||||
|
||||
@parser.close if @parser
|
||||
end
|
||||
|
||||
@ -220,6 +222,7 @@ module HTTPX
|
||||
# for such cases, we want to ping for availability before deciding to shovel requests.
|
||||
@pending << request
|
||||
parser.ping
|
||||
transition(:active) if @state == :inactive
|
||||
return
|
||||
end
|
||||
|
||||
@ -252,6 +255,14 @@ module HTTPX
|
||||
@options.timeout[:operation_timeout]
|
||||
end
|
||||
|
||||
def deactivate
|
||||
transition(:inactive)
|
||||
end
|
||||
|
||||
def open?
|
||||
@state == :open || @state == :inactive
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def connect
|
||||
@ -399,6 +410,10 @@ module HTTPX
|
||||
def send_request_to_parser(request)
|
||||
@inflight += 1
|
||||
parser.send(request)
|
||||
|
||||
return unless @state == :inactive
|
||||
|
||||
transition(:active)
|
||||
end
|
||||
|
||||
def build_parser(protocol = @io.protocol)
|
||||
@ -488,6 +503,8 @@ module HTTPX
|
||||
|
||||
@timeout = @current_timeout = parser.timeout
|
||||
emit(:open)
|
||||
when :inactive
|
||||
return unless @state == :open
|
||||
when :closing
|
||||
return unless @state == :open
|
||||
|
||||
@ -499,6 +516,11 @@ module HTTPX
|
||||
when :already_open
|
||||
nextstate = :open
|
||||
send_pending
|
||||
when :active
|
||||
return unless @state == :inactive
|
||||
|
||||
nextstate = :open
|
||||
emit(:activate)
|
||||
end
|
||||
@state = nextstate
|
||||
rescue Errno::ECONNREFUSED,
|
||||
|
@ -67,6 +67,16 @@ module HTTPX
|
||||
connection.on(:open) do
|
||||
@connected_connections += 1
|
||||
end
|
||||
connection.on(:activate) do
|
||||
select_connection(connection)
|
||||
end
|
||||
end
|
||||
|
||||
def deactivate(connections)
|
||||
connections.each do |connection|
|
||||
connection.deactivate
|
||||
deselect_connection(connection) if connection.state == :inactive
|
||||
end
|
||||
end
|
||||
|
||||
# opens a connection to the IP reachable through +uri+.
|
||||
@ -84,7 +94,7 @@ module HTTPX
|
||||
def resolve_connection(connection)
|
||||
@connections << connection unless @connections.include?(connection)
|
||||
|
||||
if connection.addresses || connection.state == :open
|
||||
if connection.addresses || connection.open?
|
||||
#
|
||||
# there are two cases in which we want to activate initialization of
|
||||
# connection immediately:
|
||||
@ -101,7 +111,7 @@ module HTTPX
|
||||
resolver << connection
|
||||
return if resolver.empty?
|
||||
|
||||
@_resolver_ios[resolver] ||= @selector.register(resolver)
|
||||
@_resolver_ios[resolver] ||= select_connection(resolver)
|
||||
end
|
||||
|
||||
def on_resolver_connection(connection)
|
||||
@ -110,7 +120,7 @@ module HTTPX
|
||||
end
|
||||
return register_connection(connection) unless found_connection
|
||||
|
||||
if found_connection.state == :open
|
||||
if found_connection.open?
|
||||
coalesce_connections(found_connection, connection)
|
||||
throw(:coalesced, found_connection)
|
||||
else
|
||||
@ -132,7 +142,7 @@ module HTTPX
|
||||
|
||||
@resolvers.delete(resolver_type)
|
||||
|
||||
@selector.deregister(resolver)
|
||||
deselect_connection(resolver)
|
||||
@_resolver_ios.delete(resolver)
|
||||
resolver.close unless resolver.closed?
|
||||
end
|
||||
@ -143,7 +153,7 @@ module HTTPX
|
||||
# consider it connected already.
|
||||
@connected_connections += 1
|
||||
end
|
||||
@selector.register(connection)
|
||||
select_connection(connection)
|
||||
connection.on(:close) do
|
||||
unregister_connection(connection)
|
||||
end
|
||||
@ -151,10 +161,18 @@ module HTTPX
|
||||
|
||||
def unregister_connection(connection)
|
||||
@connections.delete(connection)
|
||||
@selector.deregister(connection)
|
||||
deselect_connection(connection)
|
||||
@connected_connections -= 1
|
||||
end
|
||||
|
||||
def select_connection(connection)
|
||||
@selector.register(connection)
|
||||
end
|
||||
|
||||
def deselect_connection(connection)
|
||||
@selector.deregister(connection)
|
||||
end
|
||||
|
||||
def coalesce_connections(conn1, conn2)
|
||||
if conn1.coalescable?(conn2)
|
||||
conn1.merge(conn2)
|
||||
|
@ -58,7 +58,7 @@ class HTTPX::Selector
|
||||
|
||||
# do not run event loop if there's nothing to wait on.
|
||||
# this might happen if connect failed and connection was unregistered.
|
||||
return if (!r || r.empty?) && (!w || w.empty?)
|
||||
return if (!r || r.empty?) && (!w || w.empty?) && !selectables.empty?
|
||||
|
||||
break
|
||||
else
|
||||
|
@ -21,6 +21,7 @@ module HTTPX
|
||||
yield self
|
||||
ensure
|
||||
@persistent = prev_persistent
|
||||
close unless @persistent
|
||||
end
|
||||
end
|
||||
|
||||
@ -226,7 +227,11 @@ module HTTPX
|
||||
end
|
||||
responses
|
||||
ensure
|
||||
close(connections) unless @persistent
|
||||
if @persistent
|
||||
pool.deactivate(connections)
|
||||
else
|
||||
close(connections)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -70,6 +70,7 @@ module HTTPX
|
||||
|
||||
def timeout: () -> Numeric?
|
||||
|
||||
def deactivate: () -> void
|
||||
private
|
||||
|
||||
def initialize: (String, URI::Generic, options) -> untyped
|
||||
|
@ -10,6 +10,8 @@ module HTTPX
|
||||
|
||||
def find_connection: (generic_uri, Options) -> Connection?
|
||||
|
||||
def deactivate: (*Array[Connection]) -> void
|
||||
|
||||
private
|
||||
|
||||
def initialize: () -> untyped
|
||||
@ -26,6 +28,10 @@ module HTTPX
|
||||
|
||||
def unregister_connection: (Connection) -> void
|
||||
|
||||
def select_connection: (resolver | Connection connection) -> void
|
||||
|
||||
def deselect_connection: (resolver | Connection connection) -> void
|
||||
|
||||
def coalesce_connections: (Connection, Connection) -> void
|
||||
|
||||
def next_timeout: () -> Numeric?
|
||||
|
Loading…
x
Reference in New Issue
Block a user