From 8797eebbf25a4fc83e7e5e7d49b8af76529884c0 Mon Sep 17 00:00:00 2001 From: HoneyryderChuck Date: Sat, 13 Jan 2018 14:21:34 +0000 Subject: [PATCH] turned the channel into a state machine: this had the nice side-effect of solving a lot of API inconsistencies from the proxies --- lib/httpx/channel.rb | 46 +++++++++++++++++++---------- lib/httpx/plugins/proxy.rb | 16 ++-------- lib/httpx/plugins/proxy/http.rb | 13 ++++---- lib/httpx/plugins/proxy/socks4.rb | 19 +++++++----- lib/httpx/plugins/proxy/socks5.rb | 49 +++++++++++++++++++------------ 5 files changed, 83 insertions(+), 60 deletions(-) diff --git a/lib/httpx/channel.rb b/lib/httpx/channel.rb index 5f1008ef..640e36c7 100644 --- a/lib/httpx/channel.rb +++ b/lib/httpx/channel.rb @@ -64,6 +64,7 @@ module HTTPX @write_buffer = Buffer.new(BUFFER_SIZE) @pending = [] @on_response = on_response + @state = :idle end def match?(uri) @@ -75,18 +76,17 @@ module HTTPX end def to_io - connect + case @state + when :idle + transition(:open) + when :open + end @io.to_io end def close(hard=false) - if pr = @parser - pr.close - @parser = nil - end - @io.close - @read_buffer.clear - @write_buffer.clear + pr = @parser + transition(:closed) return true if hard unless pr && pr.empty? connect @@ -106,7 +106,7 @@ module HTTPX end def call - return if closed? + return if @state == :closed catch(:called) do dread dwrite @@ -121,11 +121,6 @@ module HTTPX end private - - def connect - @io.connect - send_pending - end def dread(wsize = @window_size) loop do @@ -148,7 +143,6 @@ module HTTPX end def send_pending - return if @io.closed? while !@write_buffer.full? && (req_args = @pending.shift) request, args = req_args parser.send(request, **args) @@ -165,5 +159,27 @@ module HTTPX parser.on(:close) { throw(:close, self) } parser end + + def transition(nextstate) + case nextstate + when :idle + + when :open + return if @state == :closed + @io.connect + return if @io.closed? + send_pending + when :closed + return if @state == :idle + if pr = @parser + pr.close + @parser = nil + end + @io.close + @read_buffer.clear + @write_buffer.clear + end + @state = nextstate + end end end diff --git a/lib/httpx/plugins/proxy.rb b/lib/httpx/plugins/proxy.rb index 4ffd2191..da933e4b 100644 --- a/lib/httpx/plugins/proxy.rb +++ b/lib/httpx/plugins/proxy.rb @@ -81,25 +81,15 @@ module HTTPX def initialize(io, parameters, options, &blk) super(io, options, &blk) @parameters = parameters - @state = :idle end def match?(*) true end - def send_pending - return if @pending.empty? - case @state - when :open - # normal flow after connection - return super - when :connecting - # do NOT enqueue requests if proxy is connecting - return - when :idle - proxy_connect - end + def to_io + transition(:connecting) if @state == :idle + super end end diff --git a/lib/httpx/plugins/proxy/http.rb b/lib/httpx/plugins/proxy/http.rb index d1466fa3..e3267ffd 100644 --- a/lib/httpx/plugins/proxy/http.rb +++ b/lib/httpx/plugins/proxy/http.rb @@ -17,7 +17,6 @@ module HTTPX # and therefore, will share the connection. # if req.uri.scheme == "https" - transition(:connecting) connect_request = ConnectRequest.new(req.uri) if @parameters.authenticated? connect_request.headers["proxy-authentication"] = "Basic #{@parameters.token_authentication}" @@ -25,18 +24,20 @@ module HTTPX parser.send(connect_request) else transition(:open) - send_pending end end def transition(nextstate) case nextstate - when :idle when :connecting return unless @state == :idle + @io.connect + return if @io.closed? @parser = ConnectProxyParser.new(@write_buffer, @options.merge(max_concurrent_requests: 1)) @parser.once(:response, &method(:on_connect)) @parser.on(:close) { throw(:close, self) } + proxy_connect + return if @state == :open when :open case @state when :connecting @@ -46,19 +47,17 @@ module HTTPX @parser = ProxyParser.new(@write_buffer, @options) @parser.on(:response, &@on_response) @parser.on(:close) { throw(:close, self) } - else - return end end - @state = nextstate + super end def on_connect(request, response) if response.status == 200 - transition(:open) req, _ = @pending.first request_uri = req.uri @io = ProxySSL.new(@io, request_uri, @options) + transition(:open) throw(:called) else pending = @parser.pending diff --git a/lib/httpx/plugins/proxy/socks4.rb b/lib/httpx/plugins/proxy/socks4.rb index 05e55711..ab5ca8b4 100644 --- a/lib/httpx/plugins/proxy/socks4.rb +++ b/lib/httpx/plugins/proxy/socks4.rb @@ -19,22 +19,21 @@ module HTTPX def proxy_connect @parser = SocksParser.new(@write_buffer, @options) @parser.once(:packet, &method(:on_packet)) - transition(:connecting) end def on_packet(packet) version, status, port, ip = packet.unpack("CCnN") if status == GRANTED - transition(:open) req, _ = @pending.first request_uri = req.uri if request_uri.scheme == "https" @io = ProxySSL.new(@io, request_uri, @options) end + transition(:open) throw(:called) else - pending = @parser.instance_variable_get(:@pending) - while req = pending.shift + response = ErrorResponse.new("socks error: #{status}", 0) + while (req, _ = @pending.shift) @on_response.call(req, response) end end @@ -42,18 +41,20 @@ module HTTPX def transition(nextstate) case nextstate - when :idle when :connecting return unless @state == :idle + @io.connect + return if @io.closed? req, _ = @pending.first request_uri = req.uri @write_buffer << Packet.connect(@parameters, request_uri) + proxy_connect when :open - return unless :connecting + return unless @state == :connecting @parser = nil end log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" } - @state = nextstate + super end end Parameters.register("socks4", Socks4ProxyChannel) @@ -73,6 +74,10 @@ module HTTPX def consume(*) end + def empty? + true + end + def <<(packet) emit(:packet, packet) end diff --git a/lib/httpx/plugins/proxy/socks5.rb b/lib/httpx/plugins/proxy/socks5.rb index de26b846..510a6423 100644 --- a/lib/httpx/plugins/proxy/socks5.rb +++ b/lib/httpx/plugins/proxy/socks5.rb @@ -28,7 +28,7 @@ module HTTPX def on_packet(packet) case @state - when :negotiating + when :connecting version, method = packet.unpack("CC") check_version(version) case method @@ -36,54 +36,63 @@ module HTTPX transition(:authenticating) return when NONE - raise Error, "no supported authorization methods" + on_error_response("no supported authorization methods") else - transition(:connecting) + transition(:negotiating) end when :authenticating version, status = packet.unpack("CC") check_version(version) - raise Error, "could not authorize" if status != SUCCESS - transition(:connecting) - when :connecting + return transition(:negotiating) if status == SUCCESS + on_error_response("socks authentication error: #{status}") + when :negotiating version, reply, = packet.unpack("CC") check_version(version) - raise Error, "Illegal response type" unless reply == SUCCESS - transition(:open) + return on_error_response("socks5 negotiation error: #{reply}") unless reply == SUCCESS req, _ = @pending.first request_uri = req.uri if request_uri.scheme == "https" @io = ProxySSL.new(@io, request_uri, @options) end + transition(:open) throw(:called) end end def transition(nextstate) case nextstate - when :idle - when :negotiating - return unless @state == :idle - @write_buffer << Packet.negotiate(@parameters) - when :authenticating - return unless @state == :negotiating - @write_buffer << Packet.authenticate(@parameters) when :connecting - return unless @state == :negotiating || @state == :authenticating + return unless @state == :idle + @io.connect + return if @io.closed? + @write_buffer << Packet.negotiate(@parameters) + proxy_connect + when :authenticating + return unless @state == :connecting + @write_buffer << Packet.authenticate(@parameters) + when :negotiating + return unless @state == :connecting || @state == :authenticating req, _ = @pending.first request_uri = req.uri @write_buffer << Packet.connect(request_uri) when :open - return unless :connecting + return unless @state == :negotiating @parser = nil end log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" } - @state = nextstate + super end def check_version(version) raise Error, "invalid SOCKS version (#{version})" if version != 5 end + + def on_error_response(error) + response = ErrorResponse.new(error, 0) + while (req, _ = @pending.shift) + @on_response.call(req, response) + end + end end Parameters.register("socks5", Socks5ProxyChannel) @@ -101,6 +110,10 @@ module HTTPX def consume(*) end + def empty? + true + end + def <<(packet) emit(:packet, packet) end