lib/anthropic/internal/stream.rb (40 lines of code) (raw):

# frozen_string_literal: true module Anthropic module Internal # @generic Elem # # @example # stream.each do |event| # puts(event) # end class Stream include Anthropic::Internal::Type::BaseStream # @api private # # @return [Enumerable<generic<Elem>>] private def iterator # rubocop:disable Metrics/BlockLength # rubocop:disable Layout/LineLength # rubocop:disable Lint/DuplicateBranch @iterator ||= Anthropic::Internal::Util.chain_fused(@stream) do |y| @stream.each do |msg| case msg in {event: "completion", data: String => data} decoded = JSON.parse(data, symbolize_names: true) y << Anthropic::Internal::Type::Converter.coerce(@model, decoded) in { event: "message_start" | "message_delta" | "message_stop" | "content_block_start" | "content_block_delta" | "content_block_stop", data: String => data } decoded = JSON.parse(data, symbolize_names: true) y << Anthropic::Internal::Type::Converter.coerce(@model, decoded) in {event: "ping"} next in {event: "error", data: String => data} decoded = Kernel.then do JSON.parse(data, symbolize_names: true) rescue JSON::ParserError data end Anthropic::Errors::APIStatusError.for( url: @url, status: @status, body: decoded, request: nil, response: @response ) else end end end # rubocop:enable Lint/DuplicateBranch # rubocop:enable Layout/LineLength # rubocop:enable Metrics/BlockLength end end end end