Use external em_http and em_synchrony adapters (#1274)

This commit is contained in:
Matt 2021-04-26 10:57:10 +01:00 committed by GitHub
parent a582495311
commit 67c5f7b239
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 5 additions and 578 deletions

View File

@ -15,6 +15,8 @@ Gem::Specification.new do |spec|
spec.required_ruby_version = '>= 2.4'
spec.add_dependency 'faraday-em_http', '~> 1.0'
spec.add_dependency 'faraday-em_synchrony', '~> 1.0'
spec.add_dependency 'faraday-excon', '~> 1.1'
spec.add_dependency 'faraday-net_http', '~> 1.0'
spec.add_dependency 'faraday-net_http_persistent', '~> 1.1'

View File

@ -27,9 +27,11 @@ require 'faraday/error'
require 'faraday/file_part'
require 'faraday/param_part'
require 'faraday/em_http'
require 'faraday/em_synchrony'
require 'faraday/excon'
require 'faraday/net_http'
require 'faraday/net_http_persistent'
require 'faraday/excon'
# This is the main namespace for Faraday.
#

View File

@ -13,8 +13,6 @@ module Faraday
test: [:Test, 'test'],
typhoeus: [:Typhoeus, 'typhoeus'],
patron: [:Patron, 'patron'],
em_synchrony: [:EMSynchrony, 'em_synchrony'],
em_http: [:EMHttp, 'em_http'],
rack: [:Rack, 'rack'],
httpclient: [:HTTPClient, 'httpclient']

View File

@ -1,289 +0,0 @@
# frozen_string_literal: true
module Faraday
class Adapter
# EventMachine adapter. This adapter is useful for either asynchronous
# requests when in an EM reactor loop, or for making parallel requests in
# synchronous code.
class EMHttp < Faraday::Adapter
# Options is a module containing helpers to convert the Faraday env object
# into options hashes for EMHTTP method calls.
module Options
# @return [Hash]
def connection_config(env)
options = {}
configure_proxy(options, env)
configure_timeout(options, env)
configure_socket(options, env)
configure_ssl(options, env)
options
end
def request_config(env)
options = {
body: read_body(env),
head: env[:request_headers]
# keepalive: true,
# file: 'path/to/file', # stream data off disk
}
configure_compression(options, env)
options
end
def read_body(env)
body = env[:body]
body.respond_to?(:read) ? body.read : body
end
# Reads out proxy settings from env into options
def configure_proxy(options, env)
proxy = request_options(env)[:proxy]
return unless proxy
options[:proxy] = {
host: proxy[:uri].host,
port: proxy[:uri].port,
authorization: [proxy[:user], proxy[:password]]
}
end
# Reads out host and port settings from env into options
def configure_socket(options, env)
bind = request_options(env)[:bind]
return unless bind
options[:bind] = {
host: bind[:host],
port: bind[:port]
}
end
# Reads out SSL certificate settings from env into options
def configure_ssl(options, env)
return unless env[:url].scheme == 'https' && env[:ssl]
options[:ssl] = {
cert_chain_file: env[:ssl][:ca_file],
verify_peer: env[:ssl].fetch(:verify, true)
}
end
# Reads out timeout settings from env into options
def configure_timeout(options, env)
req = request_options(env)
options[:inactivity_timeout] = request_timeout(:read, req)
options[:connect_timeout] = request_timeout(:open, req)
end
# Reads out compression header settings from env into options
def configure_compression(options, env)
return unless (env[:method] == :get) &&
!options[:head].key?('accept-encoding')
options[:head]['accept-encoding'] = 'gzip, compressed'
end
def request_options(env)
env[:request]
end
end
include Options
dependency do
require 'em-http'
begin
require 'openssl'
rescue LoadError
warn 'Warning: no such file to load -- openssl. ' \
'Make sure it is installed if you want HTTPS support'
else
require 'em-http/version'
if EventMachine::HttpRequest::VERSION < '1.1.6'
require 'faraday/adapter/em_http_ssl_patch'
end
end
end
self.supports_parallel = true
# @return [Manager]
def self.setup_parallel_manager(_options = nil)
Manager.new
end
def call(env)
super
perform_request env
@app.call env
end
def perform_request(env)
if parallel?(env)
manager = env[:parallel_manager]
manager.add do
perform_single_request(env)
.callback { env[:response].finish(env) }
end
elsif EventMachine.reactor_running?
# EM is running: instruct upstream that this is an async request
env[:parallel_manager] = true
perform_single_request(env)
.callback { env[:response].finish(env) }
.errback do
# TODO: no way to communicate the error in async mode
raise NotImplementedError
end
else
error = nil
# start EM, block until request is completed
EventMachine.run do
perform_single_request(env)
.callback { EventMachine.stop }
.errback do |client|
error = error_message(client)
EventMachine.stop
end
end
raise_error(error) if error
end
rescue EventMachine::Connectify::CONNECTError => e
if e.message.include?('Proxy Authentication Required')
raise Faraday::ConnectionFailed,
%(407 "Proxy Authentication Required ")
end
raise Faraday::ConnectionFailed, e
rescue StandardError => e
if defined?(::OpenSSL::SSL::SSLError) && \
e.is_a?(::OpenSSL::SSL::SSLError)
raise Faraday::SSLError, e
end
raise
end
# TODO: reuse the connection to support pipelining
def perform_single_request(env)
req = create_request(env)
req = req.setup_request(env[:method], request_config(env))
req.callback do |client|
if env[:request].stream_response?
warn "Streaming downloads for #{self.class.name} " \
'are not yet implemented.'
env[:request].on_data.call(
client.response,
client.response.bytesize
)
end
status = client.response_header.status
reason = client.response_header.http_reason
save_response(env, status, client.response, nil, reason) do |headers|
client.response_header.each do |name, value|
headers[name.to_sym] = value
end
end
end
end
def create_request(env)
EventMachine::HttpRequest.new(
env[:url], connection_config(env).merge(@connection_options)
)
end
def error_message(client)
client.error || 'request failed'
end
def raise_error(msg)
error_class = Faraday::ClientError
if timeout_message?(msg)
error_class = Faraday::TimeoutError
msg = 'request timed out'
elsif msg == Errno::ECONNREFUSED
error_class = Faraday::ConnectionFailed
msg = 'connection refused'
elsif msg == 'connection closed by server'
error_class = Faraday::ConnectionFailed
end
raise error_class, msg
end
def timeout_message?(msg)
msg == Errno::ETIMEDOUT ||
(msg.is_a?(String) && msg.include?('timeout error'))
end
# @return [Boolean]
def parallel?(env)
!!env[:parallel_manager]
end
# This parallel manager is designed to start an EventMachine loop
# and block until all registered requests have been completed.
class Manager
# @see reset
def initialize
reset
end
# Re-initializes instance variables
def reset
@registered_procs = []
@num_registered = 0
@num_succeeded = 0
@errors = []
@running = false
end
# @return [Boolean]
def running?
@running
end
def add(&block)
if running?
perform_request(&block)
else
@registered_procs << block
end
@num_registered += 1
end
def run
if @num_registered.positive?
@running = true
EventMachine.run do
@registered_procs.each do |proc|
perform_request(&proc)
end
end
unless @errors.empty?
raise Faraday::ClientError, @errors.first || 'connection failed'
end
end
ensure
reset
end
def perform_request
client = yield
client.callback do
@num_succeeded += 1
check_finished
end
client.errback do
@errors << client.error
check_finished
end
end
def check_finished
EventMachine.stop if @num_succeeded + @errors.size == @num_registered
end
end
end
end
end

View File

@ -1,62 +0,0 @@
# frozen_string_literal: true
require 'openssl'
require 'em-http'
# EventMachine patch to make SSL work.
module EmHttpSslPatch
def ssl_verify_peer(cert_string)
begin
@last_seen_cert = OpenSSL::X509::Certificate.new(cert_string)
rescue OpenSSL::X509::CertificateError
return false
end
unless certificate_store.verify(@last_seen_cert)
raise OpenSSL::SSL::SSLError,
%(unable to verify the server certificate for "#{host}")
end
begin
certificate_store.add_cert(@last_seen_cert)
rescue OpenSSL::X509::StoreError => e
raise e unless e.message == 'cert already in hash table'
end
true
end
def ssl_handshake_completed
return true unless verify_peer?
unless verified_cert_identity?
raise OpenSSL::SSL::SSLError,
%(host "#{host}" does not match the server certificate)
end
true
end
def verify_peer?
parent.connopts.tls[:verify_peer]
end
def verified_cert_identity?
OpenSSL::SSL.verify_certificate_identity(@last_seen_cert, host)
end
def host
parent.uri.host
end
def certificate_store
@certificate_store ||= begin
store = OpenSSL::X509::Store.new
store.set_default_paths
ca_file = parent.connopts.tls[:cert_chain_file]
store.add_file(ca_file) if ca_file
store
end
end
end
EventMachine::HttpStubConnection.include(EmHttpSslPatch)

View File

@ -1,153 +0,0 @@
# frozen_string_literal: true
require 'uri'
module Faraday
class Adapter
# EventMachine Synchrony adapter.
class EMSynchrony < Faraday::Adapter
include EMHttp::Options
dependency do
require 'em-synchrony/em-http'
require 'em-synchrony/em-multi'
require 'fiber'
require 'faraday/adapter/em_synchrony/parallel_manager'
if Faraday::Adapter::EMSynchrony.loaded?
begin
require 'openssl'
rescue LoadError
warn 'Warning: no such file to load -- openssl. ' \
'Make sure it is installed if you want HTTPS support'
else
require 'em-http/version'
if EventMachine::HttpRequest::VERSION < '1.1.6'
require 'faraday/adapter/em_http_ssl_patch'
end
end
end
end
self.supports_parallel = true
# @return [ParallelManager]
def self.setup_parallel_manager(_options = nil)
ParallelManager.new
end
def call(env)
super
request = create_request(env)
http_method = env[:method].to_s.downcase.to_sym
if env[:parallel_manager]
# Queue requests for parallel execution.
execute_parallel_request(env, request, http_method)
else
# Execute single request.
execute_single_request(env, request, http_method)
end
@app.call env
rescue Errno::ECONNREFUSED
raise Faraday::ConnectionFailed, $ERROR_INFO
rescue EventMachine::Connectify::CONNECTError => e
if e.message.include?('Proxy Authentication Required')
raise Faraday::ConnectionFailed,
%(407 "Proxy Authentication Required")
end
raise Faraday::ConnectionFailed, e
rescue Errno::ETIMEDOUT => e
raise Faraday::TimeoutError, e
rescue RuntimeError => e
if e.message == 'connection closed by server'
raise Faraday::ConnectionFailed, e
end
raise Faraday::TimeoutError, e if e.message.include?('timeout error')
raise
rescue StandardError => e
if defined?(OpenSSL) && e.is_a?(OpenSSL::SSL::SSLError)
raise Faraday::SSLError, e
end
raise
end
def create_request(env)
EventMachine::HttpRequest.new(
Utils::URI(env[:url].to_s),
connection_config(env).merge(@connection_options)
)
end
private
def execute_parallel_request(env, request, http_method)
env[:parallel_manager].add(request, http_method,
request_config(env)) do |resp|
if (req = env[:request]).stream_response?
warn "Streaming downloads for #{self.class.name} " \
'are not yet implemented.'
req.on_data.call(resp.response, resp.response.bytesize)
end
save_response(env, resp.response_header.status,
resp.response) do |resp_headers|
resp.response_header.each do |name, value|
resp_headers[name.to_sym] = value
end
end
# Finalize the response object with values from `env`.
env[:response].finish(env)
end
end
def execute_single_request(env, request, http_method)
block = -> { request.send(http_method, request_config(env)) }
client = call_block(block)
raise client.error if client&.error
if env[:request].stream_response?
warn "Streaming downloads for #{self.class.name} " \
'are not yet implemented.'
env[:request].on_data.call(
client.response,
client.response.bytesize
)
end
status = client.response_header.status
reason = client.response_header.http_reason
save_response(env, status, client.response, nil, reason) do |headers|
client.response_header.each do |name, value|
headers[name.to_sym] = value
end
end
end
def call_block(block)
client = nil
if EM.reactor_running?
client = block.call
else
EM.run do
Fiber.new do
client = block.call
EM.stop
end.resume
end
end
client
end
end
end
end

View File

@ -1,69 +0,0 @@
# frozen_string_literal: true
module Faraday
class Adapter
class EMSynchrony < Faraday::Adapter
# A parallel manager for EMSynchrony.
class ParallelManager
# Add requests to queue.
#
# @param request [EM::HttpRequest]
# @param method [Symbol, String] HTTP method
# @param args [Array] the rest of the positional arguments
def add(request, method, *args, &block)
queue << {
request: request,
method: method,
args: args,
block: block
}
end
# Run all requests on queue with `EM::Synchrony::Multi`, wrapping
# it in a reactor and fiber if needed.
def run
result = nil
if !EM.reactor_running?
EM.run do
Fiber.new do
result = perform
EM.stop
end.resume
end
else
result = perform
end
result
end
private
# The request queue.
def queue
@queue ||= []
end
# Main `EM::Synchrony::Multi` performer.
def perform
multi = ::EM::Synchrony::Multi.new
queue.each do |item|
method = "a#{item[:method]}".to_sym
req = item[:request].send(method, *item[:args])
req.callback(&item[:block])
req_name = "req_#{multi.requests.size}".to_sym
multi.add(req_name, req)
end
# Clear the queue, so parallel manager objects can be reused.
@queue = []
# Block fiber until all requests have returned.
multi.perform
end
end
end
end
end

View File

@ -58,8 +58,6 @@ module Faraday
class Adapter
extend AutoloadHelper
autoload_all 'faraday/adapter',
EMSynchrony: 'em_synchrony',
EMHttp: 'em_http',
Typhoeus: 'typhoeus',
Patron: 'patron',
Test: 'test',