mirror of
				https://github.com/HoneyryderChuck/httpx.git
				synced 2025-10-31 00:02:15 -04:00 
			
		
		
		
	
		
			
				
	
	
		
			335 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
			
		
		
	
	
			335 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
| # frozen_string_literal: true
 | |
| 
 | |
| require "test_helper"
 | |
| require "support/http_helpers"
 | |
| 
 | |
| class Bug_1_5_1_Test < Minitest::Test
 | |
|   include HTTPHelpers
 | |
| 
 | |
|   def test_persistent_fiber_scheduler_skipping_dns_query_and_hanging
 | |
|     urls = %w[
 | |
|       https://www.google.com
 | |
|       https://nghttp2.org/httpbin/get
 | |
|       https://railsatscale.com/feed.xml
 | |
|       https://www.mikeperham.com/index.xml
 | |
|       https://www.opennet.ru/opennews/opennews_mini_noadv.rss
 | |
|       http://blog.cleancoder.com/atom.xml
 | |
|     ]
 | |
|     http = HTTPX.plugin(:persistent)
 | |
| 
 | |
|     responses = Thread.start do
 | |
|       scheduler = Scheduler.new
 | |
|       Fiber.set_scheduler scheduler
 | |
| 
 | |
|       res = []
 | |
| 
 | |
|       urls.each do |url|
 | |
|         Fiber.schedule do
 | |
|           res << http.get(url)
 | |
|         end
 | |
|       end
 | |
| 
 | |
|       res
 | |
|     end.value
 | |
| 
 | |
|     assert responses.size == urls.size
 | |
| 
 | |
|     responses.each do |response|
 | |
|       verify_status(response, 200)
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   def test_persistent_connection_http1_should_use_buffered_requests_to_switch_context_too
 | |
|     http = HTTPX.plugin(:persistent, ssl: { alpn_protocols: %w[http/1.1] })
 | |
|     url = build_uri("/get")
 | |
| 
 | |
|     Thread.start do
 | |
|       Thread.current.abort_on_exception = true
 | |
|       scheduler = Scheduler.new
 | |
|       Fiber.set_scheduler scheduler
 | |
| 
 | |
|       5.times do
 | |
|         Fiber.schedule do
 | |
|           3.times do
 | |
|             res = http.get(url)
 | |
|             verify_status(res, 200)
 | |
|           end
 | |
|         end
 | |
|       end
 | |
|     end.join
 | |
|   ensure
 | |
|     http.close
 | |
|   end
 | |
| 
 | |
|   private
 | |
| 
 | |
|   def scheme
 | |
|     "https://"
 | |
|   end
 | |
| 
 | |
|   class Scheduler
 | |
|     def initialize(fiber = Fiber.current)
 | |
|       @fiber = fiber
 | |
| 
 | |
|       @readable = Hash.new { |hs, k| hs[k] = [] }
 | |
|       @writable = Hash.new { |hs, k| hs[k] = [] }
 | |
|       @waiting = {}
 | |
|       @lock = Thread::Mutex.new
 | |
|       @blocking = {}.compare_by_identity
 | |
|       @ready = []
 | |
| 
 | |
|       @closed = false
 | |
| 
 | |
|       @urgent = IO.pipe
 | |
|     end
 | |
| 
 | |
|     # Hook for `Fiber.schedule`
 | |
|     def fiber(&block)
 | |
|       Fiber.new(blocking: false, &block).tap(&:transfer)
 | |
|     end
 | |
| 
 | |
|     # Hook for `IO.select`
 | |
|     def io_select(...)
 | |
|       Thread.new do
 | |
|         IO.select(...)
 | |
|       end.value
 | |
|     end
 | |
| 
 | |
|     def scheduler_close
 | |
|       close(true)
 | |
|     end
 | |
| 
 | |
|     def close(internal = false)
 | |
|       # $stderr.puts [__method__, Fiber.current].inspect
 | |
| 
 | |
|       return Fiber.set_scheduler(nil) if !internal && (Fiber.scheduler == self)
 | |
| 
 | |
|       raise "Scheduler already closed!" if @closed
 | |
| 
 | |
|       run
 | |
|     ensure
 | |
|       @closed ||= true
 | |
| 
 | |
|       # We freeze to detect any unintended modifications after the scheduler is closed:
 | |
|       freeze
 | |
|     end
 | |
| 
 | |
|     EAGAIN = -Errno::EAGAIN::Errno
 | |
| 
 | |
|     # Hook for IO#read_nonblock
 | |
|     def io_read(io, buffer, length, offset)
 | |
|       total = 0
 | |
|       io.nonblock = true
 | |
| 
 | |
|       loop do
 | |
|         result = Fiber.blocking { buffer.read(io, 0, offset) }
 | |
| 
 | |
|         case result
 | |
|         when EAGAIN
 | |
|           return result unless length.positive?
 | |
| 
 | |
|           io_wait(io, IO::READABLE, nil)
 | |
|         when (0...)
 | |
|           total += result
 | |
|           offset += result
 | |
|           break if total >= length
 | |
|         when 0
 | |
|           break
 | |
|         when (...0)
 | |
|           return result
 | |
|         end
 | |
|       end
 | |
| 
 | |
|       total
 | |
|     end
 | |
| 
 | |
|     # Hook for IO#write_nonblock
 | |
|     def io_write(io, buffer, length, offset)
 | |
|       total = 0
 | |
|       io.nonblock = true
 | |
| 
 | |
|       loop do
 | |
|         result = Fiber.blocking { buffer.write(io, 0, offset) }
 | |
| 
 | |
|         case result
 | |
|         when EAGAIN
 | |
|           return result unless length.positive?
 | |
| 
 | |
|           io_wait(io, IO::WRITABLE, nil)
 | |
| 
 | |
|         when (0...)
 | |
|           total += result
 | |
|           offset += result
 | |
|           break if total >= length
 | |
|         when 0
 | |
|           break
 | |
|         when (...0)
 | |
|           return result
 | |
|         end
 | |
|       end
 | |
| 
 | |
|       total
 | |
|     end
 | |
| 
 | |
|     # Hook for non-nonblocking socket write/read calls. Used by other hooks.
 | |
|     def io_wait(io, events, duration)
 | |
|       fiber = Fiber.current
 | |
| 
 | |
|       unless (events & IO::READABLE).zero?
 | |
|         @readable[io] << fiber
 | |
|         readable = true
 | |
|       end
 | |
| 
 | |
|       unless (events & IO::WRITABLE).zero?
 | |
|         @writable[io] << fiber
 | |
|         writable = true
 | |
|       end
 | |
| 
 | |
|       @waiting[fiber] = Process.clock_gettime(Process::CLOCK_MONOTONIC) + duration if duration
 | |
| 
 | |
|       @fiber.transfer
 | |
|     ensure
 | |
|       @waiting.delete(fiber) if duration
 | |
|       if readable
 | |
|         @readable[io].delete(fiber)
 | |
|         @readable.delete(io) if @readable[io].empty?
 | |
|       end
 | |
|       if writable
 | |
|         @writable[io].delete(fiber)
 | |
|         @writable.delete(io) if @writable[io].empty?
 | |
|       end
 | |
|     end
 | |
| 
 | |
|     # Hook for `Thread::Mutex#lock`, `Thread::Queue#pop` and `Thread::SizedQueue#push.
 | |
|     # also called when a non-blocking fiber blocks.
 | |
|     def block(_blocker, timeout = nil)
 | |
|       fiber = Fiber.current
 | |
| 
 | |
|       if timeout
 | |
|         @waiting[fiber] = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
 | |
|         begin
 | |
|           @fiber.transfer
 | |
|         ensure
 | |
|           @waiting.delete(fiber)
 | |
|         end
 | |
|       else
 | |
|         @blocking[fiber] = true
 | |
|         begin
 | |
|           @fiber.transfer
 | |
|         ensure
 | |
|           @blocking.delete(fiber)
 | |
|         end
 | |
|       end
 | |
|     end
 | |
| 
 | |
|     def unblock(_blocker, fiber)
 | |
|       @lock.synchronize do
 | |
|         @ready << fiber
 | |
|       end
 | |
| 
 | |
|       io = @urgent.last
 | |
|       io.write_nonblock(".")
 | |
|     end
 | |
| 
 | |
|     def kernel_sleep(duration = nil)
 | |
|       block(:sleep, duration)
 | |
| 
 | |
|       true
 | |
|     end
 | |
| 
 | |
|     def run
 | |
|       readable = writable = nil
 | |
| 
 | |
|       while @readable.any? || @writable.any? || @waiting.any? || @blocking.any?
 | |
|         begin
 | |
|           readable, writable = IO.select([*@readable.keys, @urgent.first], @writable.keys, [], next_timeout)
 | |
|         rescue IOError
 | |
|         end
 | |
| 
 | |
|         selected = {}
 | |
| 
 | |
|         readable&.each do |io|
 | |
|           if io == @urgent.first
 | |
|             @urgent.first.read_nonblock(1024)
 | |
|             next
 | |
|           end
 | |
| 
 | |
|           next unless @readable.key?(io)
 | |
| 
 | |
|           @readable.delete(io).each do |fiber|
 | |
|             if @writable.key?(io) && @writable[io].include?(fiber)
 | |
|               @writable[io].delete(fiber)
 | |
|               @writable.delete(io) if @writable[io].empty?
 | |
|             end
 | |
|             selected[fiber] = IO::READABLE
 | |
|           end
 | |
|         end
 | |
| 
 | |
|         writable&.each do |io|
 | |
|           next unless @writable.key?(io)
 | |
| 
 | |
|           @writable.delete(io).each do |fiber|
 | |
|             if @readable.key?(io) && @readable[io].include?(fiber)
 | |
|               @readable[io].delete(fiber)
 | |
|               @readable.delete(io) if @readable[io].empty?
 | |
|             end
 | |
|             selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE
 | |
|           end
 | |
|         end
 | |
| 
 | |
|         selected.each do |fiber, events|
 | |
|           fiber.transfer(events)
 | |
|         end
 | |
| 
 | |
|         if @waiting.any?
 | |
| 
 | |
|           time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
 | |
|           waiting, @waiting = @waiting, {}
 | |
| 
 | |
|           waiting.each do |fiber, timeout|
 | |
|             if fiber.alive?
 | |
|               if timeout <= time
 | |
|                 fiber.transfer
 | |
|               else
 | |
|                 @waiting[fiber] = timeout
 | |
|               end
 | |
|             end
 | |
|           end
 | |
|         end
 | |
| 
 | |
|         next unless @ready.any?
 | |
| 
 | |
|         ready = nil
 | |
| 
 | |
|         @lock.synchronize do
 | |
|           ready, @ready = @ready, []
 | |
|         end
 | |
| 
 | |
|         ready.each do |fiber|
 | |
|           fiber.transfer if fiber.alive?
 | |
|         end
 | |
|       end
 | |
|     end
 | |
| 
 | |
|     def next_timeout
 | |
|       _fiber, timeout = @waiting.min_by { |_key, value| value }
 | |
| 
 | |
|       return unless timeout
 | |
| 
 | |
|       offset = timeout - Process.clock_gettime(Process::CLOCK_MONOTONIC)
 | |
| 
 | |
|       return 0 if offset.negative?
 | |
| 
 | |
|       offset
 | |
|     end
 | |
| 
 | |
|     private
 | |
| 
 | |
|     def log(msg)
 | |
|       fid = Fiber.current.object_id
 | |
| 
 | |
|       warn "(scheduler) fid:#{fid}: #{msg}"
 | |
|     end
 | |
|   end
 | |
| end
 |