moved logic related with supporting the fiber scheduler into its own plugin

this plugin is loaded into the persistent plugin by default, as it's already expected to work across fibers
This commit is contained in:
HoneyryderChuck 2025-08-22 15:46:19 +01:00
parent 7f34a62b82
commit 53a3ba7037
21 changed files with 279 additions and 90 deletions

View File

@ -202,12 +202,6 @@ module HTTPX
end
end
def current_context?
@pending.any?(&:current_context?) || (
@sibling && @sibling.pending.any?(&:current_context?)
)
end
def io_connected?
return @coalesced_connection.io_connected? if @coalesced_connection
@ -230,8 +224,6 @@ module HTTPX
def interests
# connecting
if connecting?
return unless @pending.any?(&:current_context?)
connect
return @io.interests if connecting?

View File

@ -35,8 +35,6 @@ module HTTPX
return unless request
return unless request.current_context? || @requests.any?(&:current_context?) || @pending.any?(&:current_context?)
return :w if request.interests == :w || !@buffer.empty?
:r

View File

@ -35,7 +35,6 @@ module HTTPX
@settings = @options.http2_settings
@pending = []
@streams = {}
@contexts = Hash.new { |hs, k| hs[k] = Set.new }
@drains = {}
@pings = []
@buffer = buffer
@ -65,12 +64,6 @@ module HTTPX
return @buffer.empty? ? :r : :rw
end
unless @contexts.key?(Fiber.current)
return :w unless @pings.empty?
return
end
unless @connection.send_buffer.empty?
return :rw unless @buffer.empty?
@ -112,8 +105,6 @@ module HTTPX
end
def send(request, head = false)
add_to_context(request)
unless can_buffer_more_requests?
head ? @pending.unshift(request) : @pending << request
return false
@ -342,9 +333,7 @@ module HTTPX
return if error == :stream_closed && !@streams.key?(request)
log(level: 2) { "#{stream.id}: closing stream" }
@drains.delete(request)
@streams.delete(request)
clear_from_context(request)
teardown(request)
if error
case error
@ -394,15 +383,12 @@ module HTTPX
case error
when :http_1_1_required
while (request = @pending.shift)
clear_from_context(request)
emit(:error, request, error)
end
when :no_error
ex = GoawayError.new
@pending.unshift(*@streams.keys)
@drains.clear
@streams.clear
@contexts.clear
teardown
else
ex = Error.new(0, error)
end
@ -471,16 +457,14 @@ module HTTPX
emit(:pong)
end
def add_to_context(request)
@contexts[request.context] << request
end
def clear_from_context(request)
requests = @contexts[request.context]
requests.delete(request)
@contexts.delete(request.context) if requests.empty?
def teardown(request = nil)
if request
@drains.delete(request)
@streams.delete(request)
else
@drains.clear
@streams.clear
end
end
end
end

View File

@ -0,0 +1,192 @@
# frozen_string_literal: true
module HTTPX
module Plugins
# This plugin makes a session reuse the same selector in a given thread.
#
# This selector is common to all fibers in the same thread, which makes it friendly with
# fiber scheduler implementations such as `async`.
#
module FiberSelector
def self.subplugins
{
h2c: FiberSelectorH2C,
}
end
module InstanceMethods
private
def send_request(request, *)
request.set_context!
super
end
def get_current_selector
super(&nil) || begin
return unless block_given?
default = yield
set_current_selector(default)
default
end
end
end
module RequestMethods
# the execution context (fiber) this request was sent on.
attr_reader :context
def initialize(*)
super
@context = nil
end
# sets the execution context for this request. the default is the current fiber.
def set_context!
@context ||= Fiber.current # rubocop:disable Naming/MemoizedInstanceVariableName
end
# checks whether the current execution context is the one where the request was created.
def current_context?
@context == Fiber.current
end
def complete!(response = @response)
@context = nil
super
end
end
module ConnectionMethods
def current_context?
@pending.any?(&:current_context?) || (
@sibling && @sibling.pending.any?(&:current_context?)
)
end
def interests
return if connecting? && @pending.none?(&:current_context?)
super
end
def send(request)
# DoH requests bypass the session, so context needs to be set here.
request.set_context!
super
end
end
module HTTP1Methods
def interests
request = @request || @requests.first
return unless request
return unless request.current_context? || @requests.any?(&:current_context?) || @pending.any?(&:current_context?)
super
end
end
module HTTP2Methods
def initialize(*)
super
@contexts = Hash.new { |hs, k| hs[k] = Set.new }
end
def interests
if @connection.state == :connected && @handshake_completed && !@contexts.key?(Fiber.current)
return :w unless @pings.empty?
return
end
super
end
def send(request, *)
add_to_context(request)
super
end
private
def on_close(_, error, _)
if error == :http_1_1_required
# remove all pending requests context
@pending.each do |req|
clear_from_context(req)
end
end
super
end
def on_stream_close(_, request, error)
clear_from_context(request) if error != :stream_closed && @streams.key?(request)
super
end
def teardown(request = nil)
super
if request
clear_from_context(request)
else
@contexts.clear
end
end
def add_to_context(request)
@contexts[request.context] << request
end
def clear_from_context(request)
requests = @contexts[request.context]
requests.delete(request)
@contexts.delete(request.context) if requests.empty?
end
end
module NativeResolverMethods
private
def calculate_interests
return if @queries.empty?
return unless @queries.values.any?(&:current_context?) || @connections.any?(&:current_context?)
super
end
end
module SystemResolverMethods
def interests
return unless @queries.any? { |_, conn| conn.current_context? }
super
end
end
module FiberSelectorH2C
module HTTP2Methods
def upgrade(request, *)
@contexts[request.context] << request
super
end
end
end
end
end
end

View File

@ -107,7 +107,6 @@ module HTTPX
module H2CParser
def upgrade(request, response)
@contexts[request.context] << request
# skip checks, it is assumed that this is the first
# request in the connection
stream = @connection.upgrade

View File

@ -115,7 +115,7 @@ module HTTPX
def checkout_resolver(options)
resolver_type = options.resolver_class
resolver_type = Resolver.resolver_for(resolver_type)
resolver_type = Resolver.resolver_for(resolver_type, options)
@resolver_mtx.synchronize do
resolvers = @resolvers[resolver_type]

View File

@ -48,9 +48,6 @@ module HTTPX
attr_reader :active_timeouts
# the execution context (fiber) this request was sent on.
attr_reader :context
# will be +true+ when request body has been completely flushed.
def_delegator :@body, :empty?
@ -106,24 +103,13 @@ module HTTPX
raise UnsupportedSchemeError, "#{@uri}: #{@uri.scheme}: unsupported URI scheme" unless ALLOWED_URI_SCHEMES.include?(@uri.scheme)
@state = :idle
@response = @peer_address = @context = nil
@response = @peer_address = nil
@ping = false
@persistent = @options.persistent
@active_timeouts = []
end
# sets the execution context for this request. the default is the current fiber.
def set_context!
@context ||= Fiber.current # rubocop:disable Naming/MemoizedInstanceVariableName
end
# checks whether the current execution context is the one where the request was created.
def current_context?
@context == Fiber.current
end
def complete!(response = @response)
@context = nil
emit(:complete, response)
end

View File

@ -22,16 +22,17 @@ module HTTPX
module_function
def resolver_for(resolver_type)
def resolver_for(resolver_type, options)
case resolver_type
when :native then Native
when :system then System
when :https then HTTPS
else
return resolver_type if resolver_type.is_a?(Class) && resolver_type < Resolver
when Symbol
meth = :"resolver_#{resolver_type}_class"
raise Error, "unsupported resolver type (#{resolver_type})"
return options.__send__(meth) if options.respond_to?(meth)
when Class
return resolver_type if resolver_type < Resolver
end
raise Error, "unsupported resolver type (#{resolver_type})"
end
def nolookup_resolve(hostname)

View File

@ -105,7 +105,6 @@ module HTTPX
request.on(:response, &method(:on_response).curry(2)[request])
request.on(:promise, &method(:on_promise))
@requests[request] = hostname
request.set_context!
resolver_connection.send(request)
@connections << connection
rescue ResolveError, Resolv::DNS::EncodeError => e

View File

@ -107,8 +107,6 @@ module HTTPX
def calculate_interests
return if @queries.empty?
return unless @queries.values.any?(&:current_context?) || @connections.any?(&:current_context?)
return :r if @write_buffer.empty?
:w

View File

@ -82,8 +82,6 @@ module HTTPX
def interests
return if @queries.empty?
return unless @queries.any? { |_, conn| conn.current_context? }
:r
end

View File

@ -238,8 +238,6 @@ module HTTPX
# sends the +request+ to the corresponding HTTPX::Connection
def send_request(request, selector, options = request.options)
request.set_context!
error = begin
catch(:resolve_error) do
connection = find_connection(request.uri, selector, options)

View File

@ -24,6 +24,7 @@ module HTTPX
| (:upgrade, ?options) -> Plugins::sessionUpgrade
| (:h2c, ?options) -> Plugins::sessionUpgrade
| (:h2, ?options) -> Plugins::sessionUpgrade
| (:fiber_selector, ?options) -> Plugins::sessionFiberSelector
| (:persistent, ?options) -> Plugins::sessionPersistent
| (:proxy, ?options) -> (Plugins::sessionProxy & Plugins::httpProxy)
| (:push_promise, ?options) -> Plugins::sessionPushPromise

View File

@ -76,8 +76,6 @@ module HTTPX
def connecting?: () -> bool
def current_context?: () -> bool
def io_connected?: () -> bool
def inflight?: () -> boolish

View File

@ -18,7 +18,6 @@ module HTTPX
@buffer: Buffer
@handshake_completed: bool
@wait_for_handshake: bool
@contexts: Hash[Fiber, Set[Request]]
def interests: () -> io_interests?
@ -100,9 +99,7 @@ module HTTPX
def on_pong: (string ping) -> void
def add_to_context: (Request request) -> void
def clear_from_context: (Request request) -> void
def teardown: (?Request? request) -> void
class Error < ::HTTPX::Error
def initialize: (Integer id, Symbol | StandardError error) -> void

View File

@ -0,0 +1,51 @@
module HTTPX
module Plugins
module FiberSelector
module InstanceMethods
end
module RequestMethods
attr_reader context: Fiber?
def set_context!: () -> void
def current_context?: () -> bool
end
module ConnectionMethods
def current_context?: () -> bool
def send: (request request) -> void
end
module HTTP1Methods
end
module HTTP2Methods
@contexts: Hash[Fiber, Set[Request]]
private
def add_to_context: (request request) -> void
def clear_from_context: (request request) -> void
end
module NativeResolverMethods
end
module SystemResolverMethods
end
module FiberSelectorH2C
module HTTP2Methods
end
end
type request = Request & RequestMethods
end
type sessionFiberSelector = Session & FiberSelector::InstanceMethods
end
end

View File

@ -9,6 +9,6 @@ module HTTPX
end
end
type sessionPersistent = Session & FiberSelector::InstanceMethods & Persistent::InstanceMethods
type sessionPersistent = sessionFiberSelector & Persistent::InstanceMethods
end
end

View File

@ -17,8 +17,6 @@ module HTTPX
attr_reader drain_error: StandardError?
attr_reader active_timeouts: Array[Symbol]
attr_reader context: Fiber
attr_accessor peer_address: ipaddr?
attr_writer persistent: bool
@ -32,10 +30,6 @@ module HTTPX
def initialize: (Symbol | String verb, generic_uri uri, Options options, ?request_params params) -> untyped
def set_context!: () -> void
def current_context?: () -> bool
def complete!: (?response response) -> void
def ping?: () -> bool

View File

@ -23,10 +23,7 @@ module HTTPX
def self?.system_resolve: (String hostname) -> Array[IPAddr]?
def self?.resolver_for: (:native resolver_type) -> singleton(Native) |
(:system resolver_type) -> singleton(System) |
(:https resolver_type) -> singleton(HTTPS) |
[U] (U resolver_type) -> U
def self?.resolver_for: (Symbol | singleton(Resolver) resolver_type, Options options) -> singleton(Resolver)
def self?.cached_lookup: (String hostname) -> Array[IPAddr]?

View File

@ -33,11 +33,17 @@ class ResolverTest < Minitest::Test
end
def test_resolver_for
assert Resolver.resolver_for(:native) == Resolver::Native
assert Resolver.resolver_for(:system) == Resolver::System
assert Resolver.resolver_for(:https) == Resolver::HTTPS
assert Resolver.resolver_for(Resolver::HTTPS) == Resolver::HTTPS
ex = assert_raises(Error) { Resolver.resolver_for(Object) }
options = Options.new
assert Resolver.resolver_for(:native, options) < Resolver::Native
assert Resolver.resolver_for(:system, options) < Resolver::System
assert Resolver.resolver_for(:https, options) < Resolver::HTTPS
ex = assert_raises(Error) { Resolver.resolver_for(:smth, options) }
assert(ex.message.include?("unsupported resolver type"))
assert Resolver.resolver_for(Resolver::HTTPS, options) == Resolver::HTTPS
return if defined?(RBS)
ex = assert_raises(Error) { Resolver.resolver_for(Object, options) }
assert(ex.message.include?("unsupported resolver type"))
end

View File

@ -17,7 +17,7 @@ module SessionWithPool
end
def resolver
resolver_type = HTTPX::Resolver.resolver_for(@options.resolver_class)
resolver_type = HTTPX::Resolver.resolver_for(@options.resolver_class, @options)
resolver = @pool.resolvers[resolver_type].first