diff --git a/lib/httpx/plugins/stream.rb b/lib/httpx/plugins/stream.rb index f7bfa58c..7754d7cf 100644 --- a/lib/httpx/plugins/stream.rb +++ b/lib/httpx/plugins/stream.rb @@ -7,27 +7,107 @@ module HTTPX # module Stream module InstanceMethods - def stream - headers("accept" => "text/event-stream", - "cache-control" => "no-cache") + private + + def request(*args, stream: false, **options) + return super(*args, **options) unless stream + + requests = build_requests(*args, options) + + raise Error, "only 1 response at a time is supported for streaming requests" unless requests.size == 1 + + StreamResponse.new(requests.first, self) end end + module RequestMethods + attr_accessor :stream + end + module ResponseMethods - def complete? - super || - stream? && - @stream_complete + def stream + @request.stream + end + end + + module ResponseBodyMethods + def initialize(*) + super + @stream = @response.stream end - def stream? - @headers["content-type"].start_with?("text/event-stream") + def write(chunk) + return super unless @stream + + @stream.on_chunk(chunk) end - def <<(data) - res = super - @stream_complete = true if String(data).end_with?("\n\n") - res + private + + def transition(*) + return if @stream + + super + end + end + + class StreamResponse + def initialize(request, session) + @request = request + @session = session + @options = @request.options + end + + def each_line + raise Error, "response already streamed" if @response + + Enumerator.new do |yielder| + @request.stream = self + + @chunk_fiber = Fiber.new do + response + :done + end + + loop do + chunk = @chunk_fiber.resume + + break if chunk == :done + + yielder << chunk + end + end + end + + # This is a ghost method. It's to be used ONLY internally, when processing streams + def on_chunk(chunk) + raise NoMethodError unless @chunk_fiber + + @on_chunk.call(chunk.dup) + end + + # :nocov: + def inspect + "#" + end + # :nocov: + + private + + def response + @response ||= @session.__send__(:send_requests, @request, @options).first + end + + def respond_to_missing?(*args) + @options.response_class.respond_to?(*args) || super + end + + def method_missing(meth, *args, &block) + if @options.response_class.public_method_defined?(meth) + response.__send__(meth, *args, &block) + else + super + end end end end diff --git a/lib/httpx/response.rb b/lib/httpx/response.rb index f218c709..12042830 100644 --- a/lib/httpx/response.rb +++ b/lib/httpx/response.rb @@ -278,7 +278,7 @@ module HTTPX # rubocop:disable Style/MissingRespondToMissing def method_missing(meth, *, &block) - raise NoMethodError, "undefined response method `#{meth}' for error response" if Response.public_method_defined?(meth) + raise NoMethodError, "undefined response method `#{meth}' for error response" if @options.response_class.public_method_defined?(meth) super end