set fiber context for sent request

this is used downstream to inform the selector (via IO interest calculation) whether the connection should wait on events for the current fiber (i.e. the connection should be used for requests of the current fiber, or DNS requests for the connection of such requests must be pending)
This commit is contained in:
HoneyryderChuck 2025-07-11 17:32:31 +01:00
parent 37cc138bef
commit aa03a3a52e
12 changed files with 76 additions and 6 deletions

View File

@ -205,6 +205,12 @@ module HTTPX
end
end
def current_context?
@pending.any?(&:current_context?) || (
@sibling && @sibling.pending.any?(&:current_context?)
)
end
def io_connected?
return @coalesced_connection.io_connected? if @coalesced_connection
@ -227,6 +233,8 @@ module HTTPX
def interests
# connecting
if connecting?
return unless @pending.any?(&:current_context?)
connect
return @io.interests if connecting?

View File

@ -35,7 +35,8 @@ module HTTPX
@settings = @options.http2_settings
@pending = []
@streams = {}
@drains = {}
@contexts = Hash.new { |hs, k| hs[k] = Set.new }
@drains = {}
@pings = []
@buffer = buffer
@handshake_completed = false
@ -111,6 +112,8 @@ module HTTPX
end
def send(request, head = false)
add_to_context(request)
unless can_buffer_more_requests?
head ? @pending.unshift(request) : @pending << request
return false
@ -337,6 +340,7 @@ module HTTPX
log(level: 2) { "#{stream.id}: closing stream" }
@drains.delete(request)
@streams.delete(request)
clear_from_context(request)
if error
case error
@ -386,6 +390,7 @@ module HTTPX
case error
when :http_1_1_required
while (request = @pending.shift)
clear_from_context(request)
emit(:error, request, error)
end
when :no_error
@ -393,6 +398,7 @@ module HTTPX
@pending.unshift(*@streams.keys)
@drains.clear
@streams.clear
@contexts.clear
else
ex = Error.new(0, error)
end
@ -460,5 +466,17 @@ module HTTPX
emit(:pong)
end
def add_to_context(request)
@contexts[request.context] << request
end
def clear_from_context(request)
requests = @contexts[request.context]
requests.delete(request)
@contexts.delete(request.context) if requests.empty?
end
end
end

View File

@ -27,6 +27,8 @@ module HTTPX
class H2CParser < Connection::HTTP2
def upgrade(request, response)
@contexts[request.context] << request
# skip checks, it is assumed that this is the first
# request in the connection
stream = @connection.upgrade

View File

@ -22,6 +22,16 @@ module HTTPX
module ConnectionMethods
using URIExtensions
def interests
return super unless connecting? && @parser
connect
return @io.interests if connecting?
super
end
def upgrade_to_h2
prev_parser = @parser

View File

@ -48,6 +48,9 @@ module HTTPX
attr_reader :active_timeouts
# the execution context (fiber) this request was sent on.
attr_reader :context
# will be +true+ when request body has been completely flushed.
def_delegator :@body, :empty?
@ -103,13 +106,22 @@ module HTTPX
raise UnsupportedSchemeError, "#{@uri}: #{@uri.scheme}: unsupported URI scheme" unless ALLOWED_URI_SCHEMES.include?(@uri.scheme)
@state = :idle
@response = nil
@peer_address = nil
@response = @peer_address = @context = nil
@ping = false
@persistent = @options.persistent
@active_timeouts = []
end
# sets the execution context for this request. the default is the current fiber.
def set_context!
@context ||= Fiber.current # rubocop:disable Naming/MemoizedInstanceVariableName
end
# checks whether the current execution context is the one where the request was created.
def current_context?
@context == Fiber.current
end
# whether request has been buffered with a ping
def ping?
@ping

View File

@ -105,6 +105,7 @@ module HTTPX
request.on(:response, &method(:on_response).curry(2)[request])
request.on(:promise, &method(:on_promise))
@requests[request] = hostname
request.set_context!
resolver_connection.send(request)
@connections << connection
rescue ResolveError, Resolv::DNS::EncodeError => e

View File

@ -105,11 +105,13 @@ module HTTPX
private
def calculate_interests
return :w unless @write_buffer.empty?
return if @queries.empty?
return :r unless @queries.empty?
return unless @queries.values.any?(&:current_context?) || @connections.any?(&:current_context?)
nil
return :r if @write_buffer.empty?
:w
end
def consume

View File

@ -82,6 +82,8 @@ module HTTPX
def interests
return if @queries.empty?
return unless @queries.any? { |_, conn| conn.current_context? }
:r
end

View File

@ -238,6 +238,8 @@ module HTTPX
# sends the +request+ to the corresponding HTTPX::Connection
def send_request(request, selector, options = request.options)
request.set_context!
error = begin
catch(:resolve_error) do
connection = find_connection(request.uri, selector, options)

View File

@ -76,6 +76,8 @@ module HTTPX
def connecting?: () -> bool
def current_context?: () -> bool
def io_connected?: () -> bool
def inflight?: () -> boolish

View File

@ -18,6 +18,7 @@ module HTTPX
@buffer: Buffer
@handshake_completed: bool
@wait_for_handshake: bool
@contexts: Hash[Fiber, Set[Request]]
def interests: () -> io_interests?
@ -97,6 +98,10 @@ module HTTPX
def on_pong: (string ping) -> void
def add_to_context: (Request request) -> void
def clear_from_context: (Request request) -> void
class Error < ::HTTPX::Error
def initialize: (Integer id, Symbol | StandardError error) -> void
end

View File

@ -17,6 +17,8 @@ module HTTPX
attr_reader drain_error: StandardError?
attr_reader active_timeouts: Array[Symbol]
attr_reader context: Fiber
attr_accessor peer_address: ipaddr?
attr_writer persistent: bool
@ -30,6 +32,10 @@ module HTTPX
def initialize: (Symbol | String verb, generic_uri uri, Options options, ?request_params params) -> untyped
def set_context!: () -> void
def current_context?: () -> bool
def ping?: () -> bool
def ping!: () -> void