spec/support/mock_intake.rb (196 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# frozen_string_literal: true
require 'json'
require 'timeout'
begin
if ::Rack.release >= '3.1'
require 'rack'
end
rescue NoMethodError
require 'rack/chunked'
end
class MockIntake
def initialize
clear!
@span_types = JSON.parse(File.read('./spec/fixtures/span_types.json'))
end
attr_reader(
:errors,
:metadatas,
:metricsets,
:requests,
:spans,
:transactions
)
def self.instance
@instance ||= new
end
class << self
extend Forwardable
def_delegator :instance, :stub!
def_delegator :instance, :stubbed?
def_delegator :instance, :clear!
def_delegator :instance, :reset!
end
def stub!
@cloud_provider_stubs = {
aws: WebMock.stub_request(
:get, ElasticAPM::Metadata::CloudInfo::AWS_URI
).to_timeout,
gcp: WebMock.stub_request(
:get, ElasticAPM::Metadata::CloudInfo::GCP_URI
).to_timeout,
azure: WebMock.stub_request(
:get, ElasticAPM::Metadata::CloudInfo::AZURE_URI
).to_timeout
}
@central_config_stub =
WebMock.stub_request(
:get, %r{^http://localhost:8200/config/v1/agents/?$}
).to_return(body: '{}')
@server_version_stub =
WebMock.stub_request(:get, %r{^http://localhost:8200/$}).
to_return(body: '{"version":8.0}')
@request_stub =
WebMock.stub_request(
:post, %r{^http://localhost:8200/intake/v2/events/?$}
).to_rack(self)
self
end
def stubbed?
!!@request_stub && @central_config_stub
end
def clear!
@requests = []
@errors = []
@metadatas = []
@metricsets = []
@spans = []
@transactions = []
end
def reset!
clear!
@request_stub = nil
@central_config_stub = nil
@cloud_provider_stubs = nil
@server_version_stub = nil
end
def call(env)
request = Rack::Request.new(env)
@requests << request
metadata, *rest = parse_request_body(request)
metadatas << metadata.values.first
rest.each do |obj|
catalog obj
end
[202, {}, ['ok']]
end
def parse_request_body(request)
body =
if request.env['HTTP_CONTENT_ENCODING'].include?('gzip')
gunzip(request.body.read)
else
request.body.read
end
body
.split("\n")
.map { |json| JSON.parse(json) }
end
private
def gunzip(string)
sio = StringIO.new(string)
gz = Zlib::GzipReader.new(sio, encoding: Encoding::ASCII_8BIT)
gz.read
ensure
gz&.close
end
def catalog(obj)
case obj.keys.first
when 'transaction' then transactions << obj.values.first
when 'error' then errors << obj.values.first
when 'metricset' then metricsets << obj.values.first
when 'span'
validate_span!(obj.values.first)
spans << obj.values.first
end
end
def validate_span!(span)
type, subtype, _action = span['type'].split('.')
begin
info = @span_types.fetch(type)
rescue KeyError
puts "Unknown span.type `#{type}'\nPossible types: #{@span_types.keys.join(', ')}"
pp span
raise
end
return unless (allowed_subtypes = info['subtypes'])
if !info['optional_subtype'] && !subtype
msg = "span.subtype missing when required for type `#{type}',\n" \
"Possible subtypes: #{allowed_subtypes}"
puts msg # print because errors are swallowed
pp span
raise msg
end
allowed_subtypes.fetch(subtype)
rescue KeyError => e
puts "Unknown span.subtype `#{subtype.inspect}'\n" \
"Possible subtypes: #{allowed_subtypes}"
pp span
puts e # print because errors are swallowed
raise
end
module WaitFor
# rubocop:disable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def wait_for(timeout: 5, **expected)
if expected.empty? && !block_given?
raise ArgumentError, 'Either args or block required'
end
unless MockIntake.stubbed?
raise 'Not stubbed – did you forget :mock_intake?'
end
Timeout.timeout(timeout) do
loop do
sleep 0.01
missing = expected.reduce(0) do |total, (kind, count)|
total + (count - @mock_intake.send(kind).length)
end
next if missing > 0
unless missing == 0
puts format(
'Expected %s. Got %s',
expected,
missing
)
print_received
end
if block_given?
next unless yield(@mock_intake)
end
break true
end
end
rescue Timeout::Error
puts format('Died waiting for %s', block_given? ? 'block' : expected)
puts '--- Received: ---'
print_received
raise
end
# rubocop:enable Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
def print_received
pp(
transactions: @mock_intake.transactions.map { |o| o['name'] },
spans: @mock_intake.spans.map { |o| o['name'] },
errors: @mock_intake.errors.map { |o| o['culprit'] },
metricsets: @mock_intake.metricsets,
metadatas: @mock_intake.metadatas.count
)
end
end
end
RSpec.configure do |config|
config.before :each, :mock_intake do
MockIntake.stub! unless MockIntake.stubbed?
@mock_intake = MockIntake.instance
end
config.after :each, :mock_intake do
MockIntake.reset!
@mock_intake = nil
end
config.include MockIntake::WaitFor, :mock_intake
end