exporter/collector/integrationtest/testcases/testcase.go (448 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testcases
import (
"bytes"
"encoding/json"
"fmt"
"os"
"sort"
"testing"
"time"
"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"go.opentelemetry.io/collector/pdata/ptrace"
distributionpb "google.golang.org/genproto/googleapis/api/distribution"
googlemetricpb "google.golang.org/genproto/googleapis/api/metric"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/integrationtest/protos"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector"
"github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/collector/internal/logsutil"
)
var (
// selfObsMetricsToNormalize is the set of self-observability metrics which may not record
// the same value every time due to side effects. The values of these metrics get cleared
// and are not checked in the fixture. Their labels and types are still checked.
selfObsMetricsToNormalize = map[string]struct{}{
"workload.googleapis.com/grpc.client.attempt.duration": {},
"workload.googleapis.com/grpc.client.attempt.rcvd_total_compressed_message_size": {},
"workload.googleapis.com/grpc.client.attempt.sent_total_compressed_message_size": {},
"workload.googleapis.com/grpc.client.call.duration": {},
}
// selfObsMetricsToNormalize is the set of labels on self-observability metrics which change
// each run of the fixture. Override the label with the provided value.
selfObsLabelsToNormalize = map[string]string{
"grpc_target": "dns:///127.0.0.1:40441",
}
)
const SecondProjectEnv = "SECOND_PROJECT_ID"
type TestCase struct {
// ConfigureCollector will be called to modify the default configuration for this test case. Optional.
ConfigureCollector func(cfg *collector.Config)
// ConfigureLogsExporter uses internal types to add extra post-init config to an exporter object.
ConfigureLogsExporter *logsutil.ExporterConfig
// Name of the test case
Name string
// OTLPInputFixturePath is the path to the JSON encoded OTLP
// ExportMetricsServiceRequest input metrics fixture.
OTLPInputFixturePath string
// ExpectFixturePath is the path to the JSON encoded MetricExpectFixture
// (see fixtures.proto) that contains request messages the exporter is expected to send.
ExpectFixturePath string
// CompareFixturePath is a second output fixture that should be equal to this test's output fixture.
// Used for cross-referencing multiple tests that should have the same output, without overwriting the same fixtures.
CompareFixturePath string
// When testing the SDK metrics exporter (not collector), this is the options to use. Optional.
MetricSDKExporterOptions []metric.Option
// Skip, if true, skips this test case
Skip bool
// SkipForSDK, if true, skips this test case when testing the SDK
SkipForSDK bool
// ExpectErr sets whether the test is expected to fail
ExpectErr bool
// ExpectRetries sets whether the test expects the server to report multiple attempts
ExpectRetries bool
}
func (tc *TestCase) LoadOTLPTracesInput(
t testing.TB,
startTimestamp time.Time,
endTimestamp time.Time,
) ptrace.Traces {
fixtureBytes, err := os.ReadFile(tc.OTLPInputFixturePath)
require.NoError(t, err)
unmarshaler := &ptrace.JSONUnmarshaler{}
traces, err := unmarshaler.UnmarshalTraces(fixtureBytes)
require.NoError(t, err)
for i := 0; i < traces.ResourceSpans().Len(); i++ {
rs := traces.ResourceSpans().At(i)
for j := 0; j < rs.ScopeSpans().Len(); j++ {
sss := rs.ScopeSpans().At(j)
for k := 0; k < sss.Spans().Len(); k++ {
span := sss.Spans().At(k)
if span.StartTimestamp() != 0 {
span.SetStartTimestamp(pcommon.NewTimestampFromTime(startTimestamp))
}
if span.EndTimestamp() != 0 {
span.SetEndTimestamp(pcommon.NewTimestampFromTime(endTimestamp))
}
}
}
}
return traces
}
func (tc *TestCase) LoadTraceExpectFixture(
t testing.TB,
startTimestamp time.Time,
endTimestamp time.Time,
) *protos.TraceExpectFixture {
fixtureBytes, err := os.ReadFile(tc.ExpectFixturePath)
require.NoError(t, err)
fixture := &protos.TraceExpectFixture{}
require.NoError(t, protojson.Unmarshal(fixtureBytes, fixture))
for _, request := range fixture.BatchWriteSpansRequest {
for _, span := range request.Spans {
span.StartTime = timestamppb.New(startTimestamp)
span.EndTime = timestamppb.New(endTimestamp)
}
}
return fixture
}
func (tc *TestCase) SaveRecordedTraceFixtures(
t testing.TB,
fixture *protos.TraceExpectFixture,
) {
NormalizeTraceFixture(t, fixture)
jsonBytes, err := protojson.Marshal(fixture)
require.NoError(t, err)
formatted := bytes.Buffer{}
require.NoError(t, json.Indent(&formatted, jsonBytes, "", " "))
_, err = formatted.WriteString("\n")
require.NoError(t, err)
require.NoError(t, os.WriteFile(tc.ExpectFixturePath, formatted.Bytes(), 0640))
t.Logf("Updated fixture %v", tc.ExpectFixturePath)
}
func NormalizeTraceFixture(t testing.TB, fixture *protos.TraceExpectFixture) {
normalizeSelfObs(t, fixture.SelfObservabilityMetrics)
for _, req := range fixture.BatchWriteSpansRequest {
for _, span := range req.Spans {
if span.GetStartTime() != nil {
span.StartTime = ×tamppb.Timestamp{}
}
if span.GetEndTime() != nil {
span.EndTime = ×tamppb.Timestamp{}
}
}
}
}
func (tc *TestCase) CreateTraceConfig() collector.Config {
cfg := collector.DefaultConfig()
cfg.ProjectID = "fake-project"
if tc.ConfigureCollector != nil {
tc.ConfigureCollector(&cfg)
}
return cfg
}
func (tc *TestCase) LoadOTLPLogsInput(
t testing.TB,
timestamp time.Time,
) plog.Logs {
fixtureBytes, err := os.ReadFile(tc.OTLPInputFixturePath)
require.NoError(t, err)
unmarshaler := &plog.JSONUnmarshaler{}
logs, err := unmarshaler.UnmarshalLogs(fixtureBytes)
require.NoError(t, err)
for i := 0; i < logs.ResourceLogs().Len(); i++ {
rl := logs.ResourceLogs().At(i)
for j := 0; j < rl.ScopeLogs().Len(); j++ {
sls := rl.ScopeLogs().At(j)
for k := 0; k < sls.LogRecords().Len(); k++ {
log := sls.LogRecords().At(k)
if log.Timestamp() != 0 {
log.SetTimestamp(pcommon.NewTimestampFromTime(timestamp))
}
}
}
}
return logs
}
func (tc *TestCase) CreateLogConfig() collector.Config {
cfg := collector.DefaultConfig()
cfg.ProjectID = "fake-project"
if tc.ConfigureCollector != nil {
tc.ConfigureCollector(&cfg)
}
return cfg
}
func (tc *TestCase) LoadLogExpectFixture(
t testing.TB,
timestamp time.Time,
) *protos.LogExpectFixture {
fixtureBytes, err := os.ReadFile(tc.ExpectFixturePath)
require.NoError(t, err)
fixture := &protos.LogExpectFixture{}
require.NoError(t, protojson.Unmarshal(fixtureBytes, fixture))
for _, request := range fixture.WriteLogEntriesRequests {
for _, entry := range request.Entries {
entry.Timestamp = timestamppb.New(timestamp)
}
}
return fixture
}
func (tc *TestCase) SaveRecordedLogFixtures(
t testing.TB,
fixture *protos.LogExpectFixture,
) {
NormalizeLogFixture(t, fixture)
jsonBytes, err := protojson.Marshal(fixture)
require.NoError(t, err)
formatted := bytes.Buffer{}
require.NoError(t, json.Indent(&formatted, jsonBytes, "", " "))
_, err = formatted.WriteString("\n")
require.NoError(t, err)
require.NoError(t, os.WriteFile(tc.ExpectFixturePath, formatted.Bytes(), 0640))
t.Logf("Updated fixture %v", tc.ExpectFixturePath)
}
// Normalizes timestamps which create noise in the fixture because they can
// vary each test run.
func NormalizeLogFixture(t testing.TB, fixture *protos.LogExpectFixture) {
normalizeSelfObs(t, fixture.SelfObservabilityMetrics)
for listIndex, req := range fixture.WriteLogEntriesRequests {
// sort the entries in each request
sort.Slice(fixture.WriteLogEntriesRequests[listIndex].Entries, func(i, j int) bool {
if fixture.WriteLogEntriesRequests[listIndex].Entries[i].LogName < fixture.WriteLogEntriesRequests[listIndex].Entries[j].LogName {
return fixture.WriteLogEntriesRequests[listIndex].Entries[i].LogName < fixture.WriteLogEntriesRequests[listIndex].Entries[j].LogName
}
return fixture.WriteLogEntriesRequests[listIndex].Entries[i].String() < fixture.WriteLogEntriesRequests[listIndex].Entries[j].String()
})
for _, entry := range req.Entries {
// Normalize timestamps if they are set
if entry.GetTimestamp() != nil {
entry.Timestamp = ×tamppb.Timestamp{}
}
}
}
// sort each request. if the requests have the same name (or just as likely, they both have no name set at the request level),
// peek at the first entry's logname in the request
sort.Slice(fixture.WriteLogEntriesRequests, func(i, j int) bool {
if fixture.WriteLogEntriesRequests[i].LogName != fixture.WriteLogEntriesRequests[j].LogName {
return fixture.WriteLogEntriesRequests[i].LogName < fixture.WriteLogEntriesRequests[j].LogName
}
return fixture.WriteLogEntriesRequests[i].Entries[0].LogName < fixture.WriteLogEntriesRequests[j].Entries[0].LogName
})
}
// Load OTLP metric fixture, test expectation fixtures and modify them so they're suitable for
// testing. Currently, this just updates the timestamps.
//
//nolint:revive
func (tc *TestCase) LoadOTLPMetricsInput(
t testing.TB,
startTime time.Time,
endTime time.Time,
) pmetric.Metrics {
fixtureBytes, err := os.ReadFile(tc.OTLPInputFixturePath)
require.NoError(t, err)
unmarshaler := &pmetric.JSONUnmarshaler{}
metrics, err := unmarshaler.UnmarshalMetrics(fixtureBytes)
require.NoError(t, err)
// Interface with common fields that pdata metric points have
type point interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
SetStartTimestamp(pcommon.Timestamp)
SetTimestamp(pcommon.Timestamp)
}
type pointWithExemplars interface {
point
Exemplars() pmetric.ExemplarSlice
}
updatePoint := func(p point) {
if p.StartTimestamp() != 0 {
p.SetStartTimestamp(pcommon.NewTimestampFromTime(startTime))
}
if p.Timestamp() != 0 {
p.SetTimestamp(pcommon.NewTimestampFromTime(endTime))
}
}
updatePointWithExemplars := func(p pointWithExemplars) {
updatePoint(p)
for i := 0; i < p.Exemplars().Len(); i++ {
p.Exemplars().At(i).SetTimestamp(pcommon.NewTimestampFromTime(endTime))
}
}
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
for i := 0; i < rm.ScopeMetrics().Len(); i++ {
sms := rm.ScopeMetrics().At(i)
for i := 0; i < sms.Metrics().Len(); i++ {
m := sms.Metrics().At(i)
switch m.Type() {
case pmetric.MetricTypeGauge:
for i := 0; i < m.Gauge().DataPoints().Len(); i++ {
updatePointWithExemplars(m.Gauge().DataPoints().At(i))
}
case pmetric.MetricTypeSum:
for i := 0; i < m.Sum().DataPoints().Len(); i++ {
updatePointWithExemplars(m.Sum().DataPoints().At(i))
}
case pmetric.MetricTypeHistogram:
for i := 0; i < m.Histogram().DataPoints().Len(); i++ {
updatePointWithExemplars(m.Histogram().DataPoints().At(i))
}
case pmetric.MetricTypeSummary:
for i := 0; i < m.Summary().DataPoints().Len(); i++ {
updatePoint(m.Summary().DataPoints().At(i))
}
case pmetric.MetricTypeExponentialHistogram:
for i := 0; i < m.ExponentialHistogram().DataPoints().Len(); i++ {
updatePointWithExemplars(m.ExponentialHistogram().DataPoints().At(i))
}
}
}
}
}
return metrics
}
func (tc *TestCase) LoadMetricFixture(
t testing.TB,
path string,
startTime time.Time,
endTime time.Time,
) *protos.MetricExpectFixture {
fixtureBytes, err := os.ReadFile(path)
require.NoError(t, err)
fixture := &protos.MetricExpectFixture{}
require.NoError(t, protojson.Unmarshal(fixtureBytes, fixture))
tc.updateMetricExpectFixture(t, startTime, endTime, fixture)
return fixture
}
func (tc *TestCase) updateMetricExpectFixture(
t testing.TB,
startTime time.Time,
endTime time.Time,
fixture *protos.MetricExpectFixture,
) {
reqs := append(
fixture.GetCreateTimeSeriesRequests(),
fixture.GetCreateServiceTimeSeriesRequests()...,
)
for _, req := range reqs {
for _, ts := range req.GetTimeSeries() {
for _, p := range ts.GetPoints() {
if p.GetInterval().GetStartTime() != nil {
p.GetInterval().StartTime = timestamppb.New(startTime)
}
if p.GetInterval().GetEndTime() != nil {
p.GetInterval().EndTime = timestamppb.New(endTime)
}
if ts.GetValueType() == googlemetricpb.MetricDescriptor_DISTRIBUTION {
for _, ex := range p.GetValue().GetDistributionValue().GetExemplars() {
ex.Timestamp = timestamppb.New(endTime)
}
}
}
}
}
}
func (tc *TestCase) SaveRecordedMetricFixtures(
t testing.TB,
fixture *protos.MetricExpectFixture,
) {
NormalizeMetricFixture(t, fixture)
jsonBytes, err := protojson.Marshal(fixture)
require.NoError(t, err)
formatted := bytes.Buffer{}
require.NoError(t, json.Indent(&formatted, jsonBytes, "", " "))
_, err = formatted.WriteString("\n")
require.NoError(t, err)
require.NoError(t, os.WriteFile(tc.ExpectFixturePath, formatted.Bytes(), 0640))
t.Logf("Updated fixture %v", tc.ExpectFixturePath)
}
// Normalizes timestamps which create noise in the fixture because they can
// vary each test run.
func NormalizeMetricFixture(t testing.TB, fixture *protos.MetricExpectFixture) {
normalizeTimeSeriesReqs(t, fixture.CreateTimeSeriesRequests...)
normalizeTimeSeriesReqs(t, fixture.CreateServiceTimeSeriesRequests...)
normalizeMetricDescriptorReqs(t, fixture.CreateMetricDescriptorRequests...)
normalizeSelfObs(t, fixture.SelfObservabilityMetrics)
sort.Slice(fixture.CreateTimeSeriesRequests, func(i, j int) bool {
return fixture.CreateTimeSeriesRequests[i].Name < fixture.CreateTimeSeriesRequests[j].Name
})
sort.Slice(fixture.CreateMetricDescriptorRequests, func(i, j int) bool {
if fixture.CreateMetricDescriptorRequests[i].Name != fixture.CreateMetricDescriptorRequests[j].Name {
return fixture.CreateMetricDescriptorRequests[i].Name < fixture.CreateMetricDescriptorRequests[j].Name
}
return fixture.CreateMetricDescriptorRequests[i].MetricDescriptor.Name < fixture.CreateMetricDescriptorRequests[j].MetricDescriptor.Name
})
sort.Slice(fixture.CreateServiceTimeSeriesRequests, func(i, j int) bool {
return fixture.CreateServiceTimeSeriesRequests[i].Name < fixture.CreateServiceTimeSeriesRequests[j].Name
})
}
func normalizeTimeSeriesReqs(t testing.TB, reqs ...*monitoringpb.CreateTimeSeriesRequest) {
for _, req := range reqs {
for _, ts := range req.TimeSeries {
for _, p := range ts.Points {
// Normalize timestamps if they are set
if p.GetInterval().GetStartTime() != nil {
p.GetInterval().StartTime = ×tamppb.Timestamp{}
}
if p.GetInterval().GetEndTime() != nil {
p.GetInterval().EndTime = ×tamppb.Timestamp{}
}
if ts.GetValueType() == googlemetricpb.MetricDescriptor_DISTRIBUTION {
for _, ex := range p.GetValue().GetDistributionValue().GetExemplars() {
ex.Timestamp = ×tamppb.Timestamp{}
}
}
}
// clear project ID from monitored resource
delete(ts.GetResource().GetLabels(), "project_id")
}
}
}
func normalizeMetricDescriptorReqs(t testing.TB, reqs ...*monitoringpb.CreateMetricDescriptorRequest) {
for _, req := range reqs {
if req.MetricDescriptor == nil {
continue
}
md := req.MetricDescriptor
sort.Slice(md.Labels, func(i, j int) bool {
return md.Labels[i].Key < md.Labels[j].Key
})
}
}
func normalizeSelfObs(t testing.TB, selfObs *protos.SelfObservabilityMetric) {
if selfObs == nil {
return
}
for _, req := range selfObs.CreateTimeSeriesRequests {
normalizeTimeSeriesReqs(t, req)
tss := req.TimeSeries
for _, ts := range tss {
if _, ok := selfObsMetricsToNormalize[ts.Metric.Type]; ok {
// zero out the specific value type
switch value := ts.Points[0].Value.Value.(type) {
case *monitoringpb.TypedValue_Int64Value:
value.Int64Value = 0
case *monitoringpb.TypedValue_DoubleValue:
value.DoubleValue = 0
case *monitoringpb.TypedValue_DistributionValue:
// Only preserve the bucket options and zeroed out counts
for i := range value.DistributionValue.BucketCounts {
value.DistributionValue.BucketCounts[i] = 0
}
value.DistributionValue = &distributionpb.Distribution{
BucketOptions: value.DistributionValue.BucketOptions,
BucketCounts: value.DistributionValue.BucketCounts,
}
default:
t.Logf("Do not know how to normalize typed value type %T", value)
}
}
for k, v := range selfObsLabelsToNormalize {
if _, ok := ts.Metric.Labels[k]; ok {
ts.Metric.Labels[k] = v
}
}
}
// sort time series by (type, labelset)
sort.Slice(tss, func(i, j int) bool {
iMetric := tss[i].Metric
jMetric := tss[j].Metric
if iMetric.Type == jMetric.Type {
// Doesn't need to sorted correctly, just consistently
return fmt.Sprint(iMetric.Labels) < fmt.Sprint(jMetric.Labels)
}
return iMetric.Type < jMetric.Type
})
}
normalizeMetricDescriptorReqs(t, selfObs.CreateMetricDescriptorRequests...)
// sort descriptors by type
sort.Slice(selfObs.CreateMetricDescriptorRequests, func(i, j int) bool {
return selfObs.CreateMetricDescriptorRequests[i].MetricDescriptor.Type <
selfObs.CreateMetricDescriptorRequests[j].MetricDescriptor.Type
})
}
func (tc *TestCase) SkipIfNeeded(t testing.TB) {
if tc.Skip {
t.Skip("Test case is marked to skip")
}
}
func (tc *TestCase) SkipIfNeededForSDK(t testing.TB) {
if tc.SkipForSDK {
t.Skip("Test case is marked to skip")
}
}
func (tc *TestCase) CreateCollectorMetricConfig() collector.Config {
cfg := collector.DefaultConfig()
cfg.ProjectID = "fakeprojectid"
// Set a big buffer to capture all CMD requests without dropping
cfg.MetricConfig.CreateMetricDescriptorBufferSize = 500
cfg.MetricConfig.InstrumentationLibraryLabels = false
if tc.ConfigureCollector != nil {
tc.ConfigureCollector(&cfg)
}
return cfg
}