mirror of
https://github.com/HoneyryderChuck/httpx.git
synced 2025-08-13 00:02:57 -04:00
added custom upgrade plugin as example (for websockets)
This commit is contained in:
parent
a03e93e531
commit
7fca78ad23
1
Gemfile
1
Gemfile
@ -13,6 +13,7 @@ group :test do
|
||||
gem "minitest"
|
||||
gem "minitest-proveit"
|
||||
gem "webmock"
|
||||
gem "websocket-driver"
|
||||
|
||||
if RUBY_VERSION < "2.2"
|
||||
gem "net-ssh", "~> 4.2.0"
|
||||
|
@ -29,6 +29,7 @@ services:
|
||||
- sshproxy
|
||||
- nghttp2
|
||||
- aws
|
||||
- ws-echo-server
|
||||
volumes:
|
||||
- ./:/home
|
||||
links:
|
||||
@ -109,3 +110,10 @@ services:
|
||||
- 4566:4566
|
||||
volumes:
|
||||
- ./test/support/ci/aws:/docker-entrypoint-initaws.d
|
||||
|
||||
ws-echo-server:
|
||||
environment:
|
||||
- PORT=80
|
||||
ports:
|
||||
- 8083:80
|
||||
image: jmalloc/echo-server
|
||||
|
@ -28,6 +28,7 @@ class HTTPTest < Minitest::Test
|
||||
include Plugins::RateLimiter
|
||||
include Plugins::Stream
|
||||
include Plugins::AWSAuthentication
|
||||
include Plugins::Upgrade
|
||||
|
||||
def test_verbose_log
|
||||
log = StringIO.new
|
||||
|
@ -4,6 +4,8 @@ module Requests
|
||||
module Plugins
|
||||
module Upgrade
|
||||
def test_plugin_upgrade_h2
|
||||
return unless origin.start_with?("https://")
|
||||
|
||||
http = HTTPX.plugin(SessionWithPool)
|
||||
|
||||
if OpenSSL::SSL::SSLContext.instance_methods.include?(:alpn_protocols)
|
||||
@ -27,6 +29,36 @@ module Requests
|
||||
assert response2.version == "2.0", "second request should already be in HTTP/2"
|
||||
response2.close
|
||||
end
|
||||
end unless RUBY_ENGINE == "jruby"
|
||||
|
||||
def test_plugin_upgrade_websockets
|
||||
return unless origin.start_with?("http://")
|
||||
|
||||
http = HTTPX.plugin(SessionWithPool).plugin(:upgrade)
|
||||
|
||||
response = http.get("http://ws-echo-server")
|
||||
verify_status(response, 200)
|
||||
|
||||
http = http.plugin(WSTestPlugin)
|
||||
|
||||
response = http.get("http://ws-echo-server")
|
||||
verify_status(response, 101)
|
||||
|
||||
websocket = response.websocket
|
||||
|
||||
assert !websocket.nil?, "websocket wasn't created"
|
||||
|
||||
websocket.send("ping")
|
||||
websocket.send("pong")
|
||||
|
||||
sleep 2
|
||||
|
||||
echo_messages = websocket.messages
|
||||
assert echo_messages.size >= 3
|
||||
assert echo_messages.include?("handshake")
|
||||
assert echo_messages.include?("ping")
|
||||
assert echo_messages.include?("pong")
|
||||
websocket.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
164
test/support/websocket_test_plugin.rb
Normal file
164
test/support/websocket_test_plugin.rb
Normal file
@ -0,0 +1,164 @@
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "base64"
|
||||
require "forwardable"
|
||||
require "websocket/driver"
|
||||
|
||||
# have to roll our own, as the default client bundles its own
|
||||
# HTTP client handshake logic
|
||||
class WSDriver < WebSocket::Driver::Hybi
|
||||
include WebSocket
|
||||
|
||||
def initialize(*, opts)
|
||||
h = opts.delete(:headers)
|
||||
super
|
||||
@headers = h
|
||||
|
||||
@key = Base64.strict_encode64(SecureRandom.random_bytes(16))
|
||||
@headers["upgrade"] = "websocket"
|
||||
@headers["connection"] = "Upgrade"
|
||||
@headers["sec-websocket-key"] = @key
|
||||
@headers["sec-websocket-version"] = VERSION
|
||||
|
||||
@headers["Sec-WebSocket-Protocol"] = @protocols * ", " if @protocols.size.positive?
|
||||
|
||||
extensions = @extensions.generate_offer
|
||||
@headers["Sec-WebSocket-Extensions"] = extensions if extensions
|
||||
end
|
||||
|
||||
def start(bytes)
|
||||
open
|
||||
parse(bytes)
|
||||
end
|
||||
|
||||
def validate(headers)
|
||||
accept = headers["sec-websocket-accept"]
|
||||
protocol = headers["sec-websocket-protocol"]
|
||||
|
||||
return fail_handshake("Sec-WebSocket-Accept mismatch") unless accept == Driver::Hybi.generate_accept(@key)
|
||||
|
||||
if protocol && !protocol.empty?
|
||||
return fail_handshake("Sec-WebSocket-Protocol mismatch") unless @protocols.include?(protocol)
|
||||
|
||||
@protocol = protocol
|
||||
end
|
||||
|
||||
begin
|
||||
@extensions.activate(@headers["Sec-WebSocket-Extensions"])
|
||||
rescue ::WebSocket::Extensions::ExtensionError => e
|
||||
return fail_handshake(e.message)
|
||||
end
|
||||
true
|
||||
end
|
||||
|
||||
def fail_handshake(message)
|
||||
message = "Error during WebSocket handshake: #{message}"
|
||||
@ready_state = 3
|
||||
emit(:error, message)
|
||||
emit(:close, Driver::CloseEvent.new(Driver::Hybi::ERRORS[:protocol_error], message))
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
class WSCLient
|
||||
extend Forwardable
|
||||
|
||||
def_delegator :@driver, :headers, :close
|
||||
|
||||
attr_reader :messages
|
||||
|
||||
def initialize(io, headers)
|
||||
@io = io
|
||||
@closed = false
|
||||
@messages = []
|
||||
|
||||
@driver = WSDriver.new(self, masking: true, headers: headers)
|
||||
@driver.on(:open) { |_event| send("handshake") }
|
||||
@driver.on(:message) { |event| @messages << event.data }
|
||||
@driver.on(:error) { |error| warn("ws error: #{error}") }
|
||||
@driver.on(:close) { |event| finalize(event) }
|
||||
end
|
||||
|
||||
def start(bytes)
|
||||
@driver.start(bytes)
|
||||
@thread = Thread.new do
|
||||
until @closed
|
||||
bytes = @io.read(1)
|
||||
@driver.parse(bytes)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def validate(*args)
|
||||
@driver.validate(*args)
|
||||
end
|
||||
|
||||
def send(message)
|
||||
@driver.text(message)
|
||||
end
|
||||
|
||||
def write(data)
|
||||
@io.write(data)
|
||||
end
|
||||
|
||||
def close
|
||||
@driver.close
|
||||
end
|
||||
|
||||
def finalize(_event)
|
||||
@closed = true
|
||||
end
|
||||
end
|
||||
|
||||
module WSTestPlugin
|
||||
class << self
|
||||
def load_dependencies(klass, *)
|
||||
klass.plugin(:upgrade)
|
||||
end
|
||||
|
||||
def configure(*)
|
||||
HTTPX::Plugins::Upgrade.register("websocket", self)
|
||||
end
|
||||
|
||||
def call(connection, request, response)
|
||||
return unless (ws = request.websocket)
|
||||
|
||||
return unless ws.validate(response.headers)
|
||||
|
||||
connection.hijack_io
|
||||
response.websocket = ws
|
||||
ws.start(response.body.to_s)
|
||||
end
|
||||
|
||||
def extra_options(options)
|
||||
options.merge(max_concurrent_requests: 1)
|
||||
end
|
||||
end
|
||||
|
||||
module InstanceMethods
|
||||
def find_connection(request, *)
|
||||
return super if request.websocket
|
||||
|
||||
conn = super
|
||||
|
||||
return conn unless conn && !conn.upgrade_protocol
|
||||
|
||||
request.init_websocket(conn)
|
||||
|
||||
conn
|
||||
end
|
||||
end
|
||||
|
||||
module RequestMethods
|
||||
attr_reader :websocket
|
||||
|
||||
def init_websocket(connection)
|
||||
socket = connection.to_io
|
||||
@websocket = WSCLient.new(socket, @headers)
|
||||
end
|
||||
end
|
||||
|
||||
module ResponseMethods
|
||||
attr_accessor :websocket
|
||||
end
|
||||
end
|
Loading…
x
Reference in New Issue
Block a user