Compare commits

...

4 Commits

Author SHA1 Message Date
HoneyryderChuck
709101cf0f Merge branch 'issue-43' into 'master'
event callbacks

Closes #43

See merge request os85/httpx!229
2023-05-31 19:30:53 +00:00
HoneyryderChuck
0d969a7a3c errors in response chunk handling will now bubble up and force the connection to close 2023-05-31 20:17:27 +01:00
HoneyryderChuck
0f988e3e9f adding session lifecycle callbacks 2023-05-31 20:06:59 +01:00
HoneyryderChuck
9bcae578d7 recover from errors on response chunk processing
first attempt at more granular error handling: during response chunk processing, errors will be handled in a way where current response stops being fetched; for http/1, the connection is fully reset, for http/2, the individual stream is cancelled
2023-05-31 11:24:21 +01:00
18 changed files with 225 additions and 31 deletions

View File

@ -118,6 +118,9 @@ Style/HashSyntax:
Style/AndOr:
Enabled: False
Style/ArgumentsForwarding:
Enabled: False
Naming/MethodParameterName:
Enabled: false

View File

@ -4,6 +4,7 @@ module HTTPX
module Callbacks
def on(type, &action)
callbacks(type) << action
self
end
def once(type, &block)
@ -11,6 +12,7 @@ module HTTPX
block.call(*args, &callback)
:delete
end
self
end
def only(type, &block)

View File

@ -10,6 +10,19 @@ module HTTPX
MOD
end
%i[
connection_opened connection_closed
request_error
request_started request_body_chunk request_completed
response_started response_body_chunk response_completed
].each do |meth|
class_eval(<<-MOD, __FILE__, __LINE__ + 1)
def on_#{meth}(&blk) # def on_connection_opened(&blk)
on(:#{meth}, &blk) # on(:connection_opened, &blk)
end # end
MOD
end
def request(*args, **options)
branch(default_options).request(*args, **options)
end
@ -56,6 +69,12 @@ module HTTPX
branch(default_options.merge(options), &blk)
end
protected
def on(*args, &blk)
branch(default_options).on(*args, &blk)
end
private
def default_options

View File

@ -133,33 +133,42 @@ module HTTPX
end
def on_data(chunk)
return unless @request
request = @request
return unless request
log(color: :green) { "-> DATA: #{chunk.bytesize} bytes..." }
log(level: 2, color: :green) { "-> #{chunk.inspect}" }
response = @request.response
response = request.response
response << chunk
rescue StandardError => e
error_response = ErrorResponse.new(request, e, request.options)
request.response = error_response
dispatch
end
def on_complete
return unless @request
request = @request
return unless request
log(level: 2) { "parsing complete" }
dispatch
end
def dispatch
if @request.expects?
request = @request
if request.expects?
@parser.reset!
return handle(@request)
return handle(request)
end
request = @request
@request = nil
@requests.shift
response = request.response
response.finish!
response.finish! unless response.is_a?(ErrorResponse)
emit(:response, request, response)
if @parser.upgrade?
@ -169,7 +178,11 @@ module HTTPX
@parser.reset!
@max_requests -= 1
manage_connection(response)
if response.is_a?(ErrorResponse)
disable
else
manage_connection(response)
end
send(@pending.shift) unless @pending.empty?
end

View File

@ -314,6 +314,7 @@ module HTTPX
ex = Error.new(stream.id, error)
ex.set_backtrace(caller)
response = ErrorResponse.new(request, ex, request.options)
request.response = response
emit(:response, request, response)
else
response = request.response

View File

@ -38,7 +38,10 @@ module HTTPX
add_addresses(addresses)
end
@ip_index = @addresses.size - 1
# @io ||= build_socket
end
def socket
@io.to_io
end
def add_addresses(addrs)

View File

@ -170,11 +170,7 @@ module HTTPX
proxy = options.proxy
return super unless proxy
connection = options.connection_class.new("tcp", uri, options)
catch(:coalesced) do
pool.init_connection(connection, options)
connection
end
init_connection("tcp", uri, options)
end
def fetch_response(request, connections, options)

View File

@ -95,6 +95,8 @@ module HTTPX
return
end
@response = response
emit(:response_started, response)
end
def path
@ -130,8 +132,10 @@ module HTTPX
return nil if @body.nil?
@drainer ||= @body.each
chunk = @drainer.next
chunk.dup
chunk = @drainer.next.dup
emit(:body_chunk, chunk)
chunk
rescue StopIteration
nil
rescue StandardError => e

View File

@ -9,6 +9,7 @@ require "forwardable"
module HTTPX
class Response
extend Forwardable
include Callbacks
attr_reader :status, :headers, :body, :version
@ -144,9 +145,13 @@ module HTTPX
def write(chunk)
return if @state == :closed
@length += chunk.bytesize
size = chunk.bytesize
@length += size
transition
@buffer.write(chunk)
@response.emit(:chunk_received, chunk)
size
end
def read(*args)

View File

@ -4,6 +4,7 @@ module HTTPX
class Session
include Loggable
include Chainable
include Callbacks
EMPTY_HASH = {}.freeze
@ -45,6 +46,31 @@ module HTTPX
request = rklass.new(verb, uri, options.merge(persistent: @persistent))
request.on(:response, &method(:on_response).curry(2)[request])
request.on(:promise, &method(:on_promise))
request.on(:headers) do
emit(:request_started, request)
end
request.on(:body_chunk) do |chunk|
emit(:request_body_chunk, request, chunk)
end
request.on(:done) do
emit(:request_completed, request)
end
request.on(:response_started) do |res|
if res.is_a?(Response)
emit(:response_started, request, res)
res.on(:chunk_received) do |chunk|
emit(:response_body_chunk, request, res, chunk)
end
else
emit(:request_error, request, res.error)
end
end
request.on(:response) do |res|
emit(:response_completed, request, res)
end
request
end
@ -174,7 +200,16 @@ module HTTPX
raise UnsupportedSchemeError, "#{uri}: #{uri.scheme}: unsupported URI scheme"
end
end
init_connection(type, uri, options)
end
def init_connection(type, uri, options)
connection = options.connection_class.new(type, uri, options)
connection.on(:open) do
emit(:connection_opened, connection.origin, connection.io.socket)
# only run close callback if it opened
connection.on(:close) { emit(:connection_closed, connection.origin, connection.io.socket) }
end
catch(:coalesced) do
pool.init_connection(connection, options)
connection
@ -252,6 +287,7 @@ module HTTPX
super
klass.instance_variable_set(:@default_options, @default_options)
klass.instance_variable_set(:@plugins, @plugins.dup)
klass.instance_variable_set(:@callbacks, @callbacks.dup)
end
def plugin(pl, options = nil, &block)

View File

@ -4,9 +4,9 @@ module HTTPX
end
module Callbacks
def on: (Symbol) { (*untyped) -> void } -> void
def once: (Symbol) { (*untyped) -> void } -> void
def only: (Symbol) { (*untyped) -> void } -> void
def on: (Symbol) { (*untyped) -> void } -> self
def once: (Symbol) { (*untyped) -> void } -> self
def only: (Symbol) { (*untyped) -> void } -> self
def emit: (Symbol, *untyped) -> void
def callbacks_for?: (Symbol) -> bool

View File

@ -9,6 +9,7 @@ module HTTPX
class Response
extend Forwardable
include Callbacks
include _Response
include _ToS

View File

@ -27,9 +27,9 @@ module HTTPX
def on_promise: (untyped, untyped) -> void
def fetch_response: (Request request, Array[Connection] connections, untyped options) -> response?
def find_connection: (Request, Array[Connection] connections, Options options) -> Connection
def find_connection: (Request request, Array[Connection] connections, Options options) -> Connection
def set_connection_callbacks: (Connection, Array[Connection], Options) -> void
def set_connection_callbacks: (Connection connection, Array[Connection] connections, Options options) -> void
def build_altsvc_connection: (Connection existing_connection, Array[Connection] connections, URI::Generic alt_origin, String origin, Hash[String, String] alt_params, Options options) -> Connection?
@ -39,13 +39,15 @@ module HTTPX
| (verb, _Each[[uri, options]], Options) -> Array[Request]
| (verb, _Each[uri], options) -> Array[Request]
def build_connection: (URI::HTTP | URI::HTTPS uri, Options options) -> Connection
def build_connection: (URI::HTTP | URI::HTTP uri, Options options) -> Connection
def init_connection: (String type, URI::HTTP | URI::HTTP uri, Options options) -> Connection
def send_requests: (*Request) -> Array[response]
def _send_requests: (Array[Request]) -> Array[Connection]
def _send_requests: (Array[Request] requests) -> Array[Connection]
def receive_requests: (Array[Request], Array[Connection]) -> Array[response]
def receive_requests: (Array[Request] requests, Array[Connection] connections) -> Array[response]
attr_reader self.default_options: Options
end

View File

@ -13,6 +13,7 @@ class HTTPTest < Minitest::Test
include Headers
include ResponseBody
include IO
include Callbacks
include Errors if RUBY_ENGINE == "ruby"
include AltSvc if ENV.key?("HTTPBIN_ALTSVC_HOST")

View File

@ -11,6 +11,7 @@ class HTTPSTest < Minitest::Test
include Headers
include ResponseBody
include IO
include Callbacks
include Errors if RUBY_ENGINE == "ruby"
include Resolvers if ENV.key?("HTTPX_RESOLVER_URI")
# TODO: uncomment as soon as nghttpx supports altsvc for HTTP/2

View File

@ -29,4 +29,11 @@ module HTTPHelpers
def origin(orig = httpbin)
"#{scheme}#{orig}"
end
def next_available_port
server = TCPServer.new("localhost", 0)
server.addr[1]
ensure
server.close
end
end

View File

@ -0,0 +1,92 @@
# frozen_string_literal: true
require "resolv"
module Requests
using HTTPX::URIExtensions
module Callbacks
def test_callbacks_connection_opened
uri = URI(build_uri("/get"))
origin = ip = nil
response = HTTPX.on_connection_opened do |o, sock|
origin = o
ip = sock.remote_address.ip_address
end.get(uri)
verify_status(response, 200)
assert !origin.nil?
assert origin.to_s == uri.origin
assert !ip.nil?
assert Resolv.getaddresses(uri.host).include?(ip)
end
def test_callbacks_connection_closed
uri = URI(build_uri("/get"))
origin = nil
response = HTTPX.on_connection_closed do |o|
origin = o
end.get(uri)
verify_status(response, 200)
assert !origin.nil?
assert origin.to_s == uri.origin
end
def test_callbacks_request_error
uri = URI(build_uri("/get"))
error = nil
http = HTTPX.on_request_error { |_, err| error = err }
response = http.get(uri)
verify_status(response, 200)
assert error.nil?
unavailable_host = URI(origin("localhost"))
unavailable_host.port = next_available_port
response = http.get(unavailable_host.to_s)
verify_error_response(response, /Connection refused| not available/)
assert !error.nil?
assert error == response.error
end
def test_callbacks_request
uri = URI(build_uri("/post"))
started = completed = false
chunks = 0
http = HTTPX.on_request_started { |_| started = true }
.on_request_body_chunk { |_, _chunk| chunks += 1 }
.on_request_completed { |_| completed = true }
response = http.post(uri, body: "data")
verify_status(response, 200)
assert started
assert completed
assert chunks.positive?
end
def test_callbacks_response
uri = URI(build_uri("/get"))
started = completed = false
chunks = 0
http = HTTPX.on_response_started { |_, _| started = true }
.on_response_body_chunk { |_, _, _chunk| chunks += 1 }
.on_response_completed { |_, _| completed = true }
response = http.get(uri)
verify_status(response, 200)
assert started
assert completed
assert chunks.positive?
end
end
end

View File

@ -41,13 +41,21 @@ module Requests
# end
# end
private
ResponseErrorEmitter = Module.new do
self::ResponseMethods = Module.new do
def <<(_)
raise "done with it"
end
end
end
def next_available_port
server = TCPServer.new("localhost", 0)
server.addr[1]
ensure
server.close
def test_errors_mid_response_buffering
uri = URI(build_uri("/get"))
HTTPX.plugin(SessionWithPool).plugin(ResponseErrorEmitter).wrap do |http|
response = http.get(uri)
verify_error_response(response, "done with it")
assert http.pool.connections.empty?
end
end
end
end