diff --git a/lib/httpx/channel.rb b/lib/httpx/channel.rb index 9bab7fc9..60588430 100644 --- a/lib/httpx/channel.rb +++ b/lib/httpx/channel.rb @@ -77,11 +77,26 @@ module HTTPX @write_buffer = Buffer.new(BUFFER_SIZE) @pending = [] on(:error) { |ex| on_error(ex) } - transition(:idle) + if @options.io + # if there's an already open IO, get its + # peer address, and force-initiate the parser + transition(:already_open) + @io = IO.registry(@type).new(@uri, nil, @options) + parser + else + transition(:idle) + end end + # this is a semi-private method, to be used by the resolver + # to initiate the io object. def addresses=(addrs) - @io = IO.registry(@type).new(@uri, addrs, @options) + @io ||= IO.registry(@type).new(@uri, addrs, @options) # rubocop:disable Naming/MemoizedInstanceVariableName + end + + def addresses + return unless @io + @io.addresses end def mergeable?(addresses) @@ -342,6 +357,11 @@ module HTTPX @io.close @read_buffer.clear + when :already_open + nextstate = :open + send_pending + @timeout_threshold = @options.timeout.operation_timeout + @timeout = @timeout_threshold end @state = nextstate rescue Errno::EHOSTUNREACH diff --git a/lib/httpx/connection.rb b/lib/httpx/connection.rb index f6898e79..79ee2526 100644 --- a/lib/httpx/connection.rb +++ b/lib/httpx/connection.rb @@ -120,12 +120,20 @@ module HTTPX end def register_channel(channel) - @timeout.transition(:idle) - monitor = @selector.register(channel, :w) + monitor = if channel.state == :open + # if open, an IO was passed upstream, therefore + # consider it connected already. + @connected_channels += 1 + @selector.register(channel, :rw) + else + @selector.register(channel, :w) + end monitor.value = channel channel.on(:close) do unregister_channel(channel) end + return if channel.state == :open + @timeout.transition(:idle) end def unregister_channel(channel) diff --git a/lib/httpx/io/tcp.rb b/lib/httpx/io/tcp.rb index ce47f7ba..b2bde623 100644 --- a/lib/httpx/io/tcp.rb +++ b/lib/httpx/io/tcp.rb @@ -17,24 +17,25 @@ module HTTPX @state = :idle @hostname = uri.host @addresses = addresses - @ip_index = @addresses.size - 1 @options = Options.new(options) @fallback_protocol = @options.fallback_protocol @port = uri.port if @options.io @io = case @options.io when Hash - @ip = @addresses[@ip_index] - @options.io[@ip] || @options.io["#{@ip}:#{@port}"] + @options.io[uri.authority] else - @ip = @hostname @options.io end + _, _, _, @ip = @io.addr + @addresses ||= [@ip] + @ip_index = @addresses.size - 1 unless @io.nil? @keep_open = true @state = :connected end else + @ip_index = @addresses.size - 1 @ip = @addresses[@ip_index] end @io ||= build_socket diff --git a/lib/httpx/resolver/resolver_mixin.rb b/lib/httpx/resolver/resolver_mixin.rb index f8a7e744..edd1bb39 100644 --- a/lib/httpx/resolver/resolver_mixin.rb +++ b/lib/httpx/resolver/resolver_mixin.rb @@ -36,7 +36,10 @@ module HTTPX end def early_resolve(channel, hostname: channel.uri.host) - addresses = ip_resolve(hostname) || Resolver.cached_lookup(hostname) || system_resolve(hostname) + addresses = channel.addresses || + ip_resolve(hostname) || + Resolver.cached_lookup(hostname) || + system_resolve(hostname) return unless addresses emit_addresses(channel, addresses)