mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-09-01 00:00:35 -04:00
added timeouts backend like http, integrated it in connecting sockets, and performing selector operations; the global one still needs work though (currently connecting one by one, connection not integrated in selector)
This commit is contained in:
parent
8edca3e579
commit
a214891fa1
@ -4,6 +4,7 @@
|
||||
require "httpx/version"
|
||||
require "httpx/callbacks"
|
||||
require "httpx/options"
|
||||
require "httpx/timeout"
|
||||
require "httpx/connection"
|
||||
require "httpx/headers"
|
||||
require "httpx/request"
|
||||
|
@ -3,14 +3,14 @@
|
||||
module HTTPX::Channel
|
||||
module_function
|
||||
|
||||
def by(uri, **options)
|
||||
def by(uri, options)
|
||||
case uri.scheme
|
||||
when "http"
|
||||
TCP.new(uri, **options)
|
||||
TCP.new(uri, options)
|
||||
when "https"
|
||||
SSL.new(uri, **options)
|
||||
SSL.new(uri, options)
|
||||
else
|
||||
raise "#{uri.scheme}: unrecognized channel"
|
||||
raise Error, "#{uri.scheme}: unrecognized channel"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -5,18 +5,24 @@ require "openssl"
|
||||
|
||||
module HTTPX::Channel
|
||||
class SSL < TCP
|
||||
def initialize(uri, ssl: {}, **)
|
||||
def initialize(uri, options)
|
||||
@timeout = options.timeout
|
||||
@options = HTTPX::Options.new(options)
|
||||
ssl = @options.ssl
|
||||
ctx = OpenSSL::SSL::SSLContext.new
|
||||
ctx.set_params(ssl)
|
||||
ctx.alpn_protocols = %w[h2 http/1.1] if ctx.respond_to?(:alpn_protocols=)
|
||||
ctx.alpn_select_cb = lambda do |pr|
|
||||
pr.first unless pr.nil? || pr.empty?
|
||||
end if ctx.respond_to?(:alpn_select_cb=)
|
||||
super
|
||||
@io = OpenSSL::SSL::SSLSocket.new(@io, ctx)
|
||||
@io.hostname = uri.host
|
||||
@io.sync_close = true
|
||||
@io.connect # TODO: non-block variant missing
|
||||
|
||||
@timeout.connect do
|
||||
super
|
||||
@io = OpenSSL::SSL::SSLSocket.new(@io, ctx)
|
||||
@io.hostname = uri.host
|
||||
@io.sync_close = true
|
||||
@io.connect # TODO: non-block variant missing
|
||||
end
|
||||
end
|
||||
|
||||
def protocol
|
||||
|
@ -18,9 +18,13 @@ module HTTPX::Channel
|
||||
|
||||
def_delegator :@io, :to_io
|
||||
|
||||
def initialize(uri, **)
|
||||
@io = TCPSocket.new(uri.host, uri.port)
|
||||
_, @remote_port, _,@remote_ip = @io.peeraddr
|
||||
def initialize(uri, options)
|
||||
@uri = uri
|
||||
@options = HTTPX::Options.new(options)
|
||||
@timeout = options.timeout
|
||||
@timeout.connect do
|
||||
@io = TCPSocket.new(uri.host, uri.port)
|
||||
end
|
||||
@read_buffer = +""
|
||||
@write_buffer = +""
|
||||
@protocol = "http/1.1"
|
||||
|
@ -9,7 +9,7 @@ module HTTPX
|
||||
class Connection
|
||||
def initialize(options)
|
||||
@options = Options.new(options)
|
||||
@operation_timeout = options.operation_timeout
|
||||
@timeout = options.timeout
|
||||
@channels = []
|
||||
@responses = {}
|
||||
end
|
||||
@ -32,7 +32,7 @@ module HTTPX
|
||||
|
||||
def <<(request)
|
||||
channel = bind(request.uri)
|
||||
raise "no channel available" unless channel
|
||||
raise Error, "no channel available" unless channel
|
||||
|
||||
channel.send(request) do |request, response|
|
||||
@responses[request] = response
|
||||
@ -43,11 +43,11 @@ module HTTPX
|
||||
@responses.delete(request)
|
||||
end
|
||||
|
||||
def process_events(timeout: @operation_timeout)
|
||||
def process_events(timeout: @timeout.timeout)
|
||||
rmonitors = @channels
|
||||
wmonitors = rmonitors.reject(&:empty?)
|
||||
readers, writers = IO.select(rmonitors, wmonitors, nil, timeout)
|
||||
raise Timeout::Error, "timed out waiting for data" if readers.nil? && writers.nil?
|
||||
raise TimeoutError, "timed out waiting for data" if readers.nil? && writers.nil?
|
||||
readers.each do |reader|
|
||||
channel = catch(:close) { reader.dread }
|
||||
close(channel) if channel
|
||||
|
7
lib/httpx/errors.rb
Normal file
7
lib/httpx/errors.rb
Normal file
@ -0,0 +1,7 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX
|
||||
Error = Class.new(StandardError)
|
||||
|
||||
TimeoutError = Class.new(Error)
|
||||
end
|
@ -35,9 +35,7 @@ module HTTPX
|
||||
defaults = {
|
||||
:proxy => {},
|
||||
:ssl => {},
|
||||
:keep_alive_timeout => KEEP_ALIVE_TIMEOUT,
|
||||
:operation_timeout => OPERATION_TIMEOUT,
|
||||
:connect_timeout => CONNECT_TIMEOUT,
|
||||
:timeout => Timeout.by(:null),
|
||||
:headers => {},
|
||||
:cookies => {},
|
||||
}
|
||||
@ -58,10 +56,13 @@ module HTTPX
|
||||
end
|
||||
end
|
||||
|
||||
def_option(:timeout) do |type, opts|
|
||||
self.timeout = Timeout.by(type, opts)
|
||||
end
|
||||
|
||||
%w[
|
||||
proxy params form json body follow
|
||||
ssl_context ssl
|
||||
keep_alive_timeout connect_timeout operation_timeout
|
||||
ssl
|
||||
].each do |method_name|
|
||||
def_option(method_name)
|
||||
end
|
||||
|
27
lib/httpx/timeout.rb
Normal file
27
lib/httpx/timeout.rb
Normal file
@ -0,0 +1,27 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX
|
||||
module Timeout
|
||||
class << self
|
||||
def by(type, **opts)
|
||||
case type
|
||||
when :null
|
||||
Null.new(opts)
|
||||
when :per_operation
|
||||
PerOperation.new(opts)
|
||||
when :global
|
||||
Global.new(opts)
|
||||
when Null
|
||||
type
|
||||
else
|
||||
raise "#{type}: unrecognized timeout option"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
require "httpx/timeout/null"
|
||||
require "httpx/timeout/per_operation"
|
||||
require "httpx/timeout/global"
|
55
lib/httpx/timeout/global.rb
Normal file
55
lib/httpx/timeout/global.rb
Normal file
@ -0,0 +1,55 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX::Timeout
|
||||
class Global < PerOperation
|
||||
|
||||
attr_reader :total_timeout
|
||||
|
||||
def initialize(**options)
|
||||
@total_timeout = options.values.reduce(:+, 0)
|
||||
reset_counter
|
||||
end
|
||||
|
||||
def ==(other)
|
||||
other.is_a?(Global) &&
|
||||
@total_timeout == other.total_timeout
|
||||
end
|
||||
|
||||
def connect(&blk)
|
||||
return yield if @connecting
|
||||
reset_timer
|
||||
::Timeout.timeout(@time_left, HTTPX::TimeoutError) do
|
||||
@connecting = true
|
||||
yield
|
||||
end
|
||||
log_time
|
||||
ensure
|
||||
@connecting = false
|
||||
end
|
||||
|
||||
def timeout
|
||||
log_time
|
||||
@time_left
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def reset_counter
|
||||
@time_left = @total_timeout
|
||||
end
|
||||
|
||||
def reset_timer
|
||||
@started = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
end
|
||||
|
||||
def log_time
|
||||
@time_left -= (Process.clock_gettime(Process::CLOCK_MONOTONIC) - @started)
|
||||
if @time_left <= 0
|
||||
raise HTTPX::TimeoutError, "Timed out after using the allocated #{@total_timeout} seconds"
|
||||
end
|
||||
|
||||
reset_timer
|
||||
end
|
||||
|
||||
end
|
||||
end
|
20
lib/httpx/timeout/null.rb
Normal file
20
lib/httpx/timeout/null.rb
Normal file
@ -0,0 +1,20 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX::Timeout
|
||||
class Null
|
||||
def initialize(**)
|
||||
end
|
||||
|
||||
def ==(other)
|
||||
other.is_a?(Null)
|
||||
end
|
||||
|
||||
def connect
|
||||
yield
|
||||
end
|
||||
|
||||
def timeout
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
39
lib/httpx/timeout/per_operation.rb
Normal file
39
lib/httpx/timeout/per_operation.rb
Normal file
@ -0,0 +1,39 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
module HTTPX::Timeout
|
||||
class PerOperation < Null
|
||||
KEEP_ALIVE_TIMEOUT = 5
|
||||
OPERATION_TIMEOUT = 5
|
||||
CONNECT_TIMEOUT = 5
|
||||
|
||||
attr_reader :connect_timeout, :operation_timeout, :keep_alive_timeout
|
||||
|
||||
alias :timeout :operation_timeout
|
||||
|
||||
def initialize(connect: CONNECT_TIMEOUT,
|
||||
operation: OPERATION_TIMEOUT,
|
||||
keep_alive: KEEP_ALIVE_TIMEOUT)
|
||||
@connect_timeout = connect
|
||||
@operation_timeout = operation
|
||||
@keep_alive_timeout = keep_alive
|
||||
end
|
||||
|
||||
def ==(other)
|
||||
other.is_a?(PerOperation) &&
|
||||
@connect_timeout == other.connect_timeout &&
|
||||
@operation_timeout == other.operation_timeout &&
|
||||
@keep_alive_timeout == other.keep_alive_timeout
|
||||
end
|
||||
|
||||
def connect
|
||||
return yield if @connecting
|
||||
::Timeout.timeout(@connect_timeout, HTTPX::TimeoutError) do
|
||||
@connecting = true
|
||||
yield
|
||||
end
|
||||
ensure
|
||||
@connecting = false
|
||||
end
|
||||
end
|
||||
end
|
||||
|
82
test/options_test.rb
Normal file
82
test/options_test.rb
Normal file
@ -0,0 +1,82 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require_relative "test_helper"
|
||||
|
||||
class OptionsSpec < Minitest::Test
|
||||
include HTTPX
|
||||
|
||||
def test_options_body
|
||||
opt1 = Options.new
|
||||
assert opt1.body.nil?, "body shouldn't be set by default"
|
||||
opt2 = Options.new(:body => "fat")
|
||||
assert opt2.body == "fat", "body was not set"
|
||||
opt3 = opt1.with_body("fat")
|
||||
assert opt3.body == "fat", "body was not set"
|
||||
end
|
||||
|
||||
%i[form json].each do |meth|
|
||||
define_method :"test_options_#{meth}" do
|
||||
opt1 = Options.new
|
||||
assert opt1.public_send(meth).nil?, "#{meth} shouldn't be set by default"
|
||||
opt2 = Options.new(meth => {"foo" => "bar"})
|
||||
assert opt2.public_send(meth) == {"foo" => "bar"}, "#{meth} was not set"
|
||||
opt3 = opt1.public_send(:"with_#{meth}", {"foo" => "bar"})
|
||||
assert opt3.public_send(meth) == {"foo" => "bar"}, "option was not set"
|
||||
end
|
||||
end
|
||||
|
||||
def test_options_headers
|
||||
opt1 = Options.new
|
||||
assert opt1.headers.to_a.empty?, "headers should be empty"
|
||||
opt2 = Options.new({:headers => {"accept" => "*/*"}})
|
||||
assert opt2.headers.to_a == [%w[accept */*]], "headers are unexpected"
|
||||
opt3 = opt1.with_headers({"accept" => "*/*"})
|
||||
assert opt3.headers.to_a == [%w[accept */*]], "headers are unexpected"
|
||||
end
|
||||
|
||||
def test_options_merge
|
||||
opts = Options.new(body: "fat")
|
||||
assert opts.merge(body: "thin").body == "thin", "parameter hasn't been merged"
|
||||
assert opts.body == "fat", "original parameter has been mutated after merge"
|
||||
|
||||
opt2 = Options.new(body: "short")
|
||||
assert opts.merge(opt2).body == "short", "options parameter hasn't been merged"
|
||||
|
||||
foo = Options.new(
|
||||
:form => {:foo => "foo"},
|
||||
:headers => {:accept => "json", :foo => "foo"},
|
||||
:proxy => {},
|
||||
)
|
||||
|
||||
bar = Options.new(
|
||||
:form => {:bar => "bar"},
|
||||
:headers => {:accept => "xml", :bar => "bar"},
|
||||
:ssl => {:foo => "bar"},
|
||||
:proxy => {:proxy_address => "127.0.0.1", :proxy_port => 8080}
|
||||
)
|
||||
|
||||
|
||||
assert foo.merge(bar).to_hash == {
|
||||
:params => nil,
|
||||
:json => nil,
|
||||
:body => nil,
|
||||
:follow => nil,
|
||||
:form => {:bar => "bar"},
|
||||
:timeout => Timeout::Null.new,
|
||||
:ssl => {:foo => "bar"},
|
||||
:headers => {"Foo" => "foo", "Accept" => "xml", "Bar" => "bar"},
|
||||
:proxy => {:proxy_address => "127.0.0.1", :proxy_port => 8080},
|
||||
:cookies => {},
|
||||
}, "options haven't merged correctly"
|
||||
end
|
||||
|
||||
def test_options_new
|
||||
opts = Options.new
|
||||
assert Options.new(opts) == opts, "it should have kept the same reference"
|
||||
end
|
||||
|
||||
def test_options_to_hash
|
||||
opts = Options.new
|
||||
assert opts.to_hash.is_a?(Hash)
|
||||
end
|
||||
end
|
Loading…
x
Reference in New Issue
Block a user