turned the channel into a state machine: this had the nice side-effect of solving a lot of API inconsistencies from the proxies

This commit is contained in:
HoneyryderChuck 2018-01-13 14:21:34 +00:00
parent 539bb3c7d0
commit 8797eebbf2
5 changed files with 83 additions and 60 deletions

View File

@ -64,6 +64,7 @@ module HTTPX
@write_buffer = Buffer.new(BUFFER_SIZE)
@pending = []
@on_response = on_response
@state = :idle
end
def match?(uri)
@ -75,18 +76,17 @@ module HTTPX
end
def to_io
connect
case @state
when :idle
transition(:open)
when :open
end
@io.to_io
end
def close(hard=false)
if pr = @parser
pr.close
@parser = nil
end
@io.close
@read_buffer.clear
@write_buffer.clear
pr = @parser
transition(:closed)
return true if hard
unless pr && pr.empty?
connect
@ -106,7 +106,7 @@ module HTTPX
end
def call
return if closed?
return if @state == :closed
catch(:called) do
dread
dwrite
@ -121,11 +121,6 @@ module HTTPX
end
private
def connect
@io.connect
send_pending
end
def dread(wsize = @window_size)
loop do
@ -148,7 +143,6 @@ module HTTPX
end
def send_pending
return if @io.closed?
while !@write_buffer.full? && (req_args = @pending.shift)
request, args = req_args
parser.send(request, **args)
@ -165,5 +159,27 @@ module HTTPX
parser.on(:close) { throw(:close, self) }
parser
end
def transition(nextstate)
case nextstate
when :idle
when :open
return if @state == :closed
@io.connect
return if @io.closed?
send_pending
when :closed
return if @state == :idle
if pr = @parser
pr.close
@parser = nil
end
@io.close
@read_buffer.clear
@write_buffer.clear
end
@state = nextstate
end
end
end

View File

@ -81,25 +81,15 @@ module HTTPX
def initialize(io, parameters, options, &blk)
super(io, options, &blk)
@parameters = parameters
@state = :idle
end
def match?(*)
true
end
def send_pending
return if @pending.empty?
case @state
when :open
# normal flow after connection
return super
when :connecting
# do NOT enqueue requests if proxy is connecting
return
when :idle
proxy_connect
end
def to_io
transition(:connecting) if @state == :idle
super
end
end

View File

@ -17,7 +17,6 @@ module HTTPX
# and therefore, will share the connection.
#
if req.uri.scheme == "https"
transition(:connecting)
connect_request = ConnectRequest.new(req.uri)
if @parameters.authenticated?
connect_request.headers["proxy-authentication"] = "Basic #{@parameters.token_authentication}"
@ -25,18 +24,20 @@ module HTTPX
parser.send(connect_request)
else
transition(:open)
send_pending
end
end
def transition(nextstate)
case nextstate
when :idle
when :connecting
return unless @state == :idle
@io.connect
return if @io.closed?
@parser = ConnectProxyParser.new(@write_buffer, @options.merge(max_concurrent_requests: 1))
@parser.once(:response, &method(:on_connect))
@parser.on(:close) { throw(:close, self) }
proxy_connect
return if @state == :open
when :open
case @state
when :connecting
@ -46,19 +47,17 @@ module HTTPX
@parser = ProxyParser.new(@write_buffer, @options)
@parser.on(:response, &@on_response)
@parser.on(:close) { throw(:close, self) }
else
return
end
end
@state = nextstate
super
end
def on_connect(request, response)
if response.status == 200
transition(:open)
req, _ = @pending.first
request_uri = req.uri
@io = ProxySSL.new(@io, request_uri, @options)
transition(:open)
throw(:called)
else
pending = @parser.pending

View File

@ -19,22 +19,21 @@ module HTTPX
def proxy_connect
@parser = SocksParser.new(@write_buffer, @options)
@parser.once(:packet, &method(:on_packet))
transition(:connecting)
end
def on_packet(packet)
version, status, port, ip = packet.unpack("CCnN")
if status == GRANTED
transition(:open)
req, _ = @pending.first
request_uri = req.uri
if request_uri.scheme == "https"
@io = ProxySSL.new(@io, request_uri, @options)
end
transition(:open)
throw(:called)
else
pending = @parser.instance_variable_get(:@pending)
while req = pending.shift
response = ErrorResponse.new("socks error: #{status}", 0)
while (req, _ = @pending.shift)
@on_response.call(req, response)
end
end
@ -42,18 +41,20 @@ module HTTPX
def transition(nextstate)
case nextstate
when :idle
when :connecting
return unless @state == :idle
@io.connect
return if @io.closed?
req, _ = @pending.first
request_uri = req.uri
@write_buffer << Packet.connect(@parameters, request_uri)
proxy_connect
when :open
return unless :connecting
return unless @state == :connecting
@parser = nil
end
log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" }
@state = nextstate
super
end
end
Parameters.register("socks4", Socks4ProxyChannel)
@ -73,6 +74,10 @@ module HTTPX
def consume(*)
end
def empty?
true
end
def <<(packet)
emit(:packet, packet)
end

View File

@ -28,7 +28,7 @@ module HTTPX
def on_packet(packet)
case @state
when :negotiating
when :connecting
version, method = packet.unpack("CC")
check_version(version)
case method
@ -36,54 +36,63 @@ module HTTPX
transition(:authenticating)
return
when NONE
raise Error, "no supported authorization methods"
on_error_response("no supported authorization methods")
else
transition(:connecting)
transition(:negotiating)
end
when :authenticating
version, status = packet.unpack("CC")
check_version(version)
raise Error, "could not authorize" if status != SUCCESS
transition(:connecting)
when :connecting
return transition(:negotiating) if status == SUCCESS
on_error_response("socks authentication error: #{status}")
when :negotiating
version, reply, = packet.unpack("CC")
check_version(version)
raise Error, "Illegal response type" unless reply == SUCCESS
transition(:open)
return on_error_response("socks5 negotiation error: #{reply}") unless reply == SUCCESS
req, _ = @pending.first
request_uri = req.uri
if request_uri.scheme == "https"
@io = ProxySSL.new(@io, request_uri, @options)
end
transition(:open)
throw(:called)
end
end
def transition(nextstate)
case nextstate
when :idle
when :negotiating
return unless @state == :idle
@write_buffer << Packet.negotiate(@parameters)
when :authenticating
return unless @state == :negotiating
@write_buffer << Packet.authenticate(@parameters)
when :connecting
return unless @state == :negotiating || @state == :authenticating
return unless @state == :idle
@io.connect
return if @io.closed?
@write_buffer << Packet.negotiate(@parameters)
proxy_connect
when :authenticating
return unless @state == :connecting
@write_buffer << Packet.authenticate(@parameters)
when :negotiating
return unless @state == :connecting || @state == :authenticating
req, _ = @pending.first
request_uri = req.uri
@write_buffer << Packet.connect(request_uri)
when :open
return unless :connecting
return unless @state == :negotiating
@parser = nil
end
log { "#{nextstate.to_s}: #{@write_buffer.to_s.inspect}" }
@state = nextstate
super
end
def check_version(version)
raise Error, "invalid SOCKS version (#{version})" if version != 5
end
def on_error_response(error)
response = ErrorResponse.new(error, 0)
while (req, _ = @pending.shift)
@on_response.call(req, response)
end
end
end
Parameters.register("socks5", Socks5ProxyChannel)
@ -101,6 +110,10 @@ module HTTPX
def consume(*)
end
def empty?
true
end
def <<(packet)
emit(:packet, packet)
end