mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-13 00:02:57 -04:00
Compare commits
4 Commits
45c8dcb36b
...
709101cf0f
Author | SHA1 | Date | |
---|---|---|---|
|
709101cf0f | ||
|
0d969a7a3c | ||
|
0f988e3e9f | ||
|
9bcae578d7 |
@ -118,6 +118,9 @@ Style/HashSyntax:
|
||||
Style/AndOr:
|
||||
Enabled: False
|
||||
|
||||
Style/ArgumentsForwarding:
|
||||
Enabled: False
|
||||
|
||||
Naming/MethodParameterName:
|
||||
Enabled: false
|
||||
|
||||
|
@ -4,6 +4,7 @@ module HTTPX
|
||||
module Callbacks
|
||||
def on(type, &action)
|
||||
callbacks(type) << action
|
||||
self
|
||||
end
|
||||
|
||||
def once(type, &block)
|
||||
@ -11,6 +12,7 @@ module HTTPX
|
||||
block.call(*args, &callback)
|
||||
:delete
|
||||
end
|
||||
self
|
||||
end
|
||||
|
||||
def only(type, &block)
|
||||
|
@ -10,6 +10,19 @@ module HTTPX
|
||||
MOD
|
||||
end
|
||||
|
||||
%i[
|
||||
connection_opened connection_closed
|
||||
request_error
|
||||
request_started request_body_chunk request_completed
|
||||
response_started response_body_chunk response_completed
|
||||
].each do |meth|
|
||||
class_eval(<<-MOD, __FILE__, __LINE__ + 1)
|
||||
def on_#{meth}(&blk) # def on_connection_opened(&blk)
|
||||
on(:#{meth}, &blk) # on(:connection_opened, &blk)
|
||||
end # end
|
||||
MOD
|
||||
end
|
||||
|
||||
def request(*args, **options)
|
||||
branch(default_options).request(*args, **options)
|
||||
end
|
||||
@ -56,6 +69,12 @@ module HTTPX
|
||||
branch(default_options.merge(options), &blk)
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def on(*args, &blk)
|
||||
branch(default_options).on(*args, &blk)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def default_options
|
||||
|
@ -133,33 +133,42 @@ module HTTPX
|
||||
end
|
||||
|
||||
def on_data(chunk)
|
||||
return unless @request
|
||||
request = @request
|
||||
|
||||
return unless request
|
||||
|
||||
log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
|
||||
log(level: 2, color: :green) { "-> #{chunk.inspect}" }
|
||||
response = @request.response
|
||||
response = request.response
|
||||
|
||||
response << chunk
|
||||
rescue StandardError => e
|
||||
error_response = ErrorResponse.new(request, e, request.options)
|
||||
request.response = error_response
|
||||
dispatch
|
||||
end
|
||||
|
||||
def on_complete
|
||||
return unless @request
|
||||
request = @request
|
||||
|
||||
return unless request
|
||||
|
||||
log(level: 2) { "parsing complete" }
|
||||
dispatch
|
||||
end
|
||||
|
||||
def dispatch
|
||||
if @request.expects?
|
||||
request = @request
|
||||
|
||||
if request.expects?
|
||||
@parser.reset!
|
||||
return handle(@request)
|
||||
return handle(request)
|
||||
end
|
||||
|
||||
request = @request
|
||||
@request = nil
|
||||
@requests.shift
|
||||
response = request.response
|
||||
response.finish!
|
||||
response.finish! unless response.is_a?(ErrorResponse)
|
||||
emit(:response, request, response)
|
||||
|
||||
if @parser.upgrade?
|
||||
@ -169,7 +178,11 @@ module HTTPX
|
||||
|
||||
@parser.reset!
|
||||
@max_requests -= 1
|
||||
manage_connection(response)
|
||||
if response.is_a?(ErrorResponse)
|
||||
disable
|
||||
else
|
||||
manage_connection(response)
|
||||
end
|
||||
|
||||
send(@pending.shift) unless @pending.empty?
|
||||
end
|
||||
|
@ -314,6 +314,7 @@ module HTTPX
|
||||
ex = Error.new(stream.id, error)
|
||||
ex.set_backtrace(caller)
|
||||
response = ErrorResponse.new(request, ex, request.options)
|
||||
request.response = response
|
||||
emit(:response, request, response)
|
||||
else
|
||||
response = request.response
|
||||
|
@ -38,7 +38,10 @@ module HTTPX
|
||||
add_addresses(addresses)
|
||||
end
|
||||
@ip_index = @addresses.size - 1
|
||||
# @io ||= build_socket
|
||||
end
|
||||
|
||||
def socket
|
||||
@io.to_io
|
||||
end
|
||||
|
||||
def add_addresses(addrs)
|
||||
|
@ -170,11 +170,7 @@ module HTTPX
|
||||
proxy = options.proxy
|
||||
return super unless proxy
|
||||
|
||||
connection = options.connection_class.new("tcp", uri, options)
|
||||
catch(:coalesced) do
|
||||
pool.init_connection(connection, options)
|
||||
connection
|
||||
end
|
||||
init_connection("tcp", uri, options)
|
||||
end
|
||||
|
||||
def fetch_response(request, connections, options)
|
||||
|
@ -95,6 +95,8 @@ module HTTPX
|
||||
return
|
||||
end
|
||||
@response = response
|
||||
|
||||
emit(:response_started, response)
|
||||
end
|
||||
|
||||
def path
|
||||
@ -130,8 +132,10 @@ module HTTPX
|
||||
return nil if @body.nil?
|
||||
|
||||
@drainer ||= @body.each
|
||||
chunk = @drainer.next
|
||||
chunk.dup
|
||||
chunk = @drainer.next.dup
|
||||
|
||||
emit(:body_chunk, chunk)
|
||||
chunk
|
||||
rescue StopIteration
|
||||
nil
|
||||
rescue StandardError => e
|
||||
|
@ -9,6 +9,7 @@ require "forwardable"
|
||||
module HTTPX
|
||||
class Response
|
||||
extend Forwardable
|
||||
include Callbacks
|
||||
|
||||
attr_reader :status, :headers, :body, :version
|
||||
|
||||
@ -144,9 +145,13 @@ module HTTPX
|
||||
def write(chunk)
|
||||
return if @state == :closed
|
||||
|
||||
@length += chunk.bytesize
|
||||
size = chunk.bytesize
|
||||
@length += size
|
||||
transition
|
||||
@buffer.write(chunk)
|
||||
|
||||
@response.emit(:chunk_received, chunk)
|
||||
size
|
||||
end
|
||||
|
||||
def read(*args)
|
||||
|
@ -4,6 +4,7 @@ module HTTPX
|
||||
class Session
|
||||
include Loggable
|
||||
include Chainable
|
||||
include Callbacks
|
||||
|
||||
EMPTY_HASH = {}.freeze
|
||||
|
||||
@ -45,6 +46,31 @@ module HTTPX
|
||||
request = rklass.new(verb, uri, options.merge(persistent: @persistent))
|
||||
request.on(:response, &method(:on_response).curry(2)[request])
|
||||
request.on(:promise, &method(:on_promise))
|
||||
|
||||
request.on(:headers) do
|
||||
emit(:request_started, request)
|
||||
end
|
||||
request.on(:body_chunk) do |chunk|
|
||||
emit(:request_body_chunk, request, chunk)
|
||||
end
|
||||
request.on(:done) do
|
||||
emit(:request_completed, request)
|
||||
end
|
||||
|
||||
request.on(:response_started) do |res|
|
||||
if res.is_a?(Response)
|
||||
emit(:response_started, request, res)
|
||||
res.on(:chunk_received) do |chunk|
|
||||
emit(:response_body_chunk, request, res, chunk)
|
||||
end
|
||||
else
|
||||
emit(:request_error, request, res.error)
|
||||
end
|
||||
end
|
||||
request.on(:response) do |res|
|
||||
emit(:response_completed, request, res)
|
||||
end
|
||||
|
||||
request
|
||||
end
|
||||
|
||||
@ -174,7 +200,16 @@ module HTTPX
|
||||
raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
|
||||
end
|
||||
end
|
||||
init_connection(type, uri, options)
|
||||
end
|
||||
|
||||
def init_connection(type, uri, options)
|
||||
connection = options.connection_class.new(type, uri, options)
|
||||
connection.on(:open) do
|
||||
emit(:connection_opened, connection.origin, connection.io.socket)
|
||||
# only run close callback if it opened
|
||||
connection.on(:close) { emit(:connection_closed, connection.origin, connection.io.socket) }
|
||||
end
|
||||
catch(:coalesced) do
|
||||
pool.init_connection(connection, options)
|
||||
connection
|
||||
@ -252,6 +287,7 @@ module HTTPX
|
||||
super
|
||||
klass.instance_variable_set(:@default_options, @default_options)
|
||||
klass.instance_variable_set(:@plugins, @plugins.dup)
|
||||
klass.instance_variable_set(:@callbacks, @callbacks.dup)
|
||||
end
|
||||
|
||||
def plugin(pl, options = nil, &block)
|
||||
|
@ -4,9 +4,9 @@ module HTTPX
|
||||
end
|
||||
|
||||
module Callbacks
|
||||
def on: (Symbol) { (*untyped) -> void } -> void
|
||||
def once: (Symbol) { (*untyped) -> void } -> void
|
||||
def only: (Symbol) { (*untyped) -> void } -> void
|
||||
def on: (Symbol) { (*untyped) -> void } -> self
|
||||
def once: (Symbol) { (*untyped) -> void } -> self
|
||||
def only: (Symbol) { (*untyped) -> void } -> self
|
||||
def emit: (Symbol, *untyped) -> void
|
||||
|
||||
def callbacks_for?: (Symbol) -> bool
|
||||
|
@ -9,6 +9,7 @@ module HTTPX
|
||||
|
||||
class Response
|
||||
extend Forwardable
|
||||
include Callbacks
|
||||
|
||||
include _Response
|
||||
include _ToS
|
||||
|
@ -27,9 +27,9 @@ module HTTPX
|
||||
def on_promise: (untyped, untyped) -> void
|
||||
def fetch_response: (Request request, Array[Connection] connections, untyped options) -> response?
|
||||
|
||||
def find_connection: (Request, Array[Connection] connections, Options options) -> Connection
|
||||
def find_connection: (Request request, Array[Connection] connections, Options options) -> Connection
|
||||
|
||||
def set_connection_callbacks: (Connection, Array[Connection], Options) -> void
|
||||
def set_connection_callbacks: (Connection connection, Array[Connection] connections, Options options) -> void
|
||||
|
||||
def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options) -> Connection?
|
||||
|
||||
@ -39,13 +39,15 @@ module HTTPX
|
||||
| (verb, _Each[[uri, options]], Options) -> Array[Request]
|
||||
| (verb, _Each[uri], options) -> Array[Request]
|
||||
|
||||
def build_connection: (URI::HTTP | URI::HTTPS uri, Options options) -> Connection
|
||||
def build_connection: (URI::HTTP | URI::HTTP uri, Options options) -> Connection
|
||||
|
||||
def init_connection: (String type, URI::HTTP | URI::HTTP uri, Options options) -> Connection
|
||||
|
||||
def send_requests: (*Request) -> Array[response]
|
||||
|
||||
def _send_requests: (Array[Request]) -> Array[Connection]
|
||||
def _send_requests: (Array[Request] requests) -> Array[Connection]
|
||||
|
||||
def receive_requests: (Array[Request], Array[Connection]) -> Array[response]
|
||||
def receive_requests: (Array[Request] requests, Array[Connection] connections) -> Array[response]
|
||||
|
||||
attr_reader self.default_options: Options
|
||||
end
|
||||
|
@ -13,6 +13,7 @@ class HTTPTest < Minitest::Test
|
||||
include Headers
|
||||
include ResponseBody
|
||||
include IO
|
||||
include Callbacks
|
||||
include Errors if RUBY_ENGINE == "ruby"
|
||||
include AltSvc if ENV.key?("HTTPBIN_ALTSVC_HOST")
|
||||
|
||||
|
@ -11,6 +11,7 @@ class HTTPSTest < Minitest::Test
|
||||
include Headers
|
||||
include ResponseBody
|
||||
include IO
|
||||
include Callbacks
|
||||
include Errors if RUBY_ENGINE == "ruby"
|
||||
include Resolvers if ENV.key?("HTTPX_RESOLVER_URI")
|
||||
# TODO: uncomment as soon as nghttpx supports altsvc for HTTP/2
|
||||
|
@ -29,4 +29,11 @@ module HTTPHelpers
|
||||
def origin(orig = httpbin)
|
||||
"#{scheme}#{orig}"
|
||||
end
|
||||
|
||||
def next_available_port
|
||||
server = TCPServer.new("localhost", 0)
|
||||
server.addr[1]
|
||||
ensure
|
||||
server.close
|
||||
end
|
||||
end
|
||||
|
92
test/support/requests/callbacks.rb
Normal file
92
test/support/requests/callbacks.rb
Normal file
@ -0,0 +1,92 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "resolv"
|
||||
|
||||
module Requests
|
||||
using HTTPX::URIExtensions
|
||||
module Callbacks
|
||||
def test_callbacks_connection_opened
|
||||
uri = URI(build_uri("/get"))
|
||||
origin = ip = nil
|
||||
|
||||
response = HTTPX.on_connection_opened do |o, sock|
|
||||
origin = o
|
||||
ip = sock.remote_address.ip_address
|
||||
end.get(uri)
|
||||
verify_status(response, 200)
|
||||
|
||||
assert !origin.nil?
|
||||
assert origin.to_s == uri.origin
|
||||
assert !ip.nil?
|
||||
|
||||
assert Resolv.getaddresses(uri.host).include?(ip)
|
||||
end
|
||||
|
||||
def test_callbacks_connection_closed
|
||||
uri = URI(build_uri("/get"))
|
||||
origin = nil
|
||||
|
||||
response = HTTPX.on_connection_closed do |o|
|
||||
origin = o
|
||||
end.get(uri)
|
||||
verify_status(response, 200)
|
||||
|
||||
assert !origin.nil?
|
||||
assert origin.to_s == uri.origin
|
||||
end
|
||||
|
||||
def test_callbacks_request_error
|
||||
uri = URI(build_uri("/get"))
|
||||
error = nil
|
||||
|
||||
http = HTTPX.on_request_error { |_, err| error = err }
|
||||
|
||||
response = http.get(uri)
|
||||
verify_status(response, 200)
|
||||
|
||||
assert error.nil?
|
||||
|
||||
unavailable_host = URI(origin("localhost"))
|
||||
unavailable_host.port = next_available_port
|
||||
response = http.get(unavailable_host.to_s)
|
||||
verify_error_response(response, /Connection refused| not available/)
|
||||
|
||||
assert !error.nil?
|
||||
assert error == response.error
|
||||
end
|
||||
|
||||
def test_callbacks_request
|
||||
uri = URI(build_uri("/post"))
|
||||
started = completed = false
|
||||
chunks = 0
|
||||
|
||||
http = HTTPX.on_request_started { |_| started = true }
|
||||
.on_request_body_chunk { |_, _chunk| chunks += 1 }
|
||||
.on_request_completed { |_| completed = true }
|
||||
|
||||
response = http.post(uri, body: "data")
|
||||
verify_status(response, 200)
|
||||
|
||||
assert started
|
||||
assert completed
|
||||
assert chunks.positive?
|
||||
end
|
||||
|
||||
def test_callbacks_response
|
||||
uri = URI(build_uri("/get"))
|
||||
started = completed = false
|
||||
chunks = 0
|
||||
|
||||
http = HTTPX.on_response_started { |_, _| started = true }
|
||||
.on_response_body_chunk { |_, _, _chunk| chunks += 1 }
|
||||
.on_response_completed { |_, _| completed = true }
|
||||
|
||||
response = http.get(uri)
|
||||
verify_status(response, 200)
|
||||
|
||||
assert started
|
||||
assert completed
|
||||
assert chunks.positive?
|
||||
end
|
||||
end
|
||||
end
|
@ -41,13 +41,21 @@ module Requests
|
||||
# end
|
||||
# end
|
||||
|
||||
private
|
||||
ResponseErrorEmitter = Module.new do
|
||||
self::ResponseMethods = Module.new do
|
||||
def <<(_)
|
||||
raise "done with it"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def next_available_port
|
||||
server = TCPServer.new("localhost", 0)
|
||||
server.addr[1]
|
||||
ensure
|
||||
server.close
|
||||
def test_errors_mid_response_buffering
|
||||
uri = URI(build_uri("/get"))
|
||||
HTTPX.plugin(SessionWithPool).plugin(ResponseErrorEmitter).wrap do |http|
|
||||
response = http.get(uri)
|
||||
verify_error_response(response, "done with it")
|
||||
assert http.pool.connections.empty?
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
x
Reference in New Issue
Block a user