commands/verify/verify.go (254 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package verify
import (
"context"
"fmt"
"sync"
"time"
"github.com/spf13/cobra"
"github.com/apache/skywalking-infra-e2e/internal/components/verifier"
"github.com/apache/skywalking-infra-e2e/internal/config"
"github.com/apache/skywalking-infra-e2e/internal/logger"
"github.com/apache/skywalking-infra-e2e/internal/util"
"github.com/apache/skywalking-infra-e2e/pkg/output"
)
var (
query string
actual string
expected string
printer output.Printer
)
func init() {
Verify.Flags().StringVarP(&query, "query", "q", "", "the query to get the actual data, the result of the query should in YAML format")
Verify.Flags().StringVarP(&actual, "actual", "a", "", "the actual data file, only YAML file format is supported")
Verify.Flags().StringVarP(&expected, "expected", "e", "", "the expected data file, only YAML file format is supported")
}
// Verify verifies that the actual data satisfies the expected data pattern.
var Verify = &cobra.Command{
Use: "verify",
Short: "verify if the actual data match the expected data",
RunE: func(cmd *cobra.Command, args []string) error {
if expected != "" {
return verifySingleCase(expected, actual, query)
}
// If there is no given flags.
return DoVerifyAccordingConfig()
},
}
// verifyInfo contains necessary information about verification
type verifyInfo struct {
caseNumber int
retryCount int
interval time.Duration
failFast bool
}
func verifySingleCase(expectedFile, actualFile, query string) error {
expectedData, err := util.ReadFileContent(expectedFile)
if err != nil {
return fmt.Errorf("failed to read the expected data file: %v", err)
}
var actualData, sourceName, stderr string
if actualFile != "" {
sourceName = actualFile
actualData, err = util.ReadFileContent(actualFile)
if err != nil {
return fmt.Errorf("failed to read the actual data file: %v", err)
}
} else if query != "" {
sourceName = query
actualData, stderr, err = util.ExecuteCommand(query)
if err != nil {
return fmt.Errorf("failed to execute the query: %s, output: %s, error: %v", query, actualData, stderr)
}
}
if err = verifier.Verify(actualData, expectedData); err != nil {
if me, ok := err.(*verifier.MismatchError); ok {
return fmt.Errorf("failed to verify the output: %s, error:\n%v", sourceName, me.Error())
}
return fmt.Errorf("failed to verify the output: %s, error:\n%v", sourceName, err)
}
return nil
}
// concurrentlyVerifySingleCase verifies a single case in concurrency mode,
// it will call the cancel function if the case fails and the fail-fast is enabled.
func concurrentlyVerifySingleCase(
ctx context.Context,
cancel context.CancelFunc,
v *config.VerifyCase,
verifyInfo *verifyInfo,
) (res *output.CaseResult) {
res = &output.CaseResult{}
defer func() {
if res.Err != nil && verifyInfo.failFast {
cancel()
}
}()
if v.GetExpected() == "" {
res.Msg = fmt.Sprintf("failed to verify %v:", caseName(v))
res.Err = fmt.Errorf("the expected data file for %v is not specified", caseName(v))
return res
}
for current := 0; current <= verifyInfo.retryCount; current++ {
select {
case <-ctx.Done():
res.Skip = true
return res
default:
if err := verifySingleCase(v.GetExpected(), v.GetActual(), v.Query); err == nil {
if current == 0 {
res.Msg = fmt.Sprintf("verified %v\n", caseName(v))
} else {
res.Msg = fmt.Sprintf("verified %v, retried %d time(s)\n", caseName(v), current)
}
return res
} else if current != verifyInfo.retryCount {
time.Sleep(verifyInfo.interval)
} else {
res.Msg = fmt.Sprintf("failed to verify %v, retried %d time(s):", caseName(v), current)
res.Err = err
}
}
}
return res
}
// verifyCasesConcurrently verifies the cases concurrently.
func verifyCasesConcurrently(verify *config.Verify, verifyInfo *verifyInfo) error {
res := make([]*output.CaseResult, len(verify.Cases))
for i := range res {
res[i] = &output.CaseResult{}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
for idx := range verify.Cases {
wg.Add(1)
go func(i int) {
defer wg.Done()
// Check if the context is canceled before verifying the case.
select {
case <-ctx.Done():
res[i].Skip = true
return
default:
// It's safe to do this, since each goroutine only modifies a single, different, designated slice element.
res[i] = concurrentlyVerifySingleCase(ctx, cancel, &verify.Cases[i], verifyInfo)
}
}(idx)
}
wg.Wait()
_, errNum, _ := printer.PrintResult(res)
if errNum > 0 {
return fmt.Errorf("failed to verify %d case(s)", errNum)
}
return nil
}
// verifyCasesSerially verifies the cases serially.
func verifyCasesSerially(verify *config.Verify, verifyInfo *verifyInfo) (err error) {
// A case may be skipped in fail-fast mode, so set it in advance.
res := make([]*output.CaseResult, len(verify.Cases))
for i := range res {
res[i] = &output.CaseResult{
Skip: true,
}
}
defer func() {
_, errNum, _ := printer.PrintResult(res)
if errNum > 0 {
err = fmt.Errorf("failed to verify %d case(s)", errNum)
}
}()
for idx := range verify.Cases {
printer.Start()
v := &verify.Cases[idx]
if v.GetExpected() == "" {
res[idx].Skip = false
res[idx].Msg = fmt.Sprintf("failed to verify %v", caseName(v))
res[idx].Err = fmt.Errorf("the expected data file for %v is not specified", caseName(v))
printer.Warning(res[idx].Msg)
printer.Fail(res[idx].Err.Error())
if verifyInfo.failFast {
return
}
continue
}
for current := 0; current <= verifyInfo.retryCount; current++ {
if e := verifySingleCase(v.GetExpected(), v.GetActual(), v.Query); e == nil {
if current == 0 {
res[idx].Msg = fmt.Sprintf("verified %v \n", caseName(v))
} else {
res[idx].Msg = fmt.Sprintf("verified %v, retried %d time(s)\n", caseName(v), current)
}
res[idx].Skip = false
printer.Success(res[idx].Msg)
break
} else if current != verifyInfo.retryCount {
if current == 0 {
printer.UpdateText(fmt.Sprintf("failed to verify %v, will continue retry:", caseName(v)))
} else {
printer.UpdateText(fmt.Sprintf("failed to verify %v, retry [%d/%d]", caseName(v), current, verifyInfo.retryCount))
}
time.Sleep(verifyInfo.interval)
} else {
res[idx].Msg = fmt.Sprintf("failed to verify %v, retried %d time(s):", caseName(v), current)
res[idx].Err = e
res[idx].Skip = false
printer.UpdateText(fmt.Sprintf("failed to verify %v, retry [%d/%d]", caseName(v), current, verifyInfo.retryCount))
printer.Warning(res[idx].Msg)
printer.Fail(res[idx].Err.Error())
if verifyInfo.failFast {
return
}
}
}
}
return nil
}
func caseName(v *config.VerifyCase) string {
if v.Name == "" {
if v.Actual != "" {
return fmt.Sprintf("case[%s]", v.Actual)
}
return fmt.Sprintf("case[%s]", v.Query)
}
return v.Name
}
// DoVerifyAccordingConfig reads cases from the config file and verifies them.
func DoVerifyAccordingConfig() error {
if config.GlobalConfig.Error != nil {
return config.GlobalConfig.Error
}
e2eConfig := config.GlobalConfig.E2EConfig
retryCount := e2eConfig.Verify.RetryStrategy.Count
if retryCount <= 0 {
retryCount = 0
}
interval, err := parseInterval(e2eConfig.Verify.RetryStrategy.Interval)
if err != nil {
return err
}
failFast := e2eConfig.Verify.FailFast
caseNumber := len(e2eConfig.Verify.Cases)
VerifyInfo := verifyInfo{
caseNumber,
retryCount,
interval,
failFast,
}
concurrency := e2eConfig.Verify.Concurrency
if concurrency {
// enable batch output mode when concurrency is enabled
printer = output.NewPrinter(true)
return verifyCasesConcurrently(&e2eConfig.Verify, &VerifyInfo)
}
printer = output.NewPrinter(util.BatchMode)
return verifyCasesSerially(&e2eConfig.Verify, &VerifyInfo)
}
// TODO remove this in 2.0.0
func parseInterval(retryInterval any) (time.Duration, error) {
var interval time.Duration
var err error
switch itv := retryInterval.(type) {
case int:
logger.Log.Warnf(`configuring verify.retry.interval with number is deprecated
and will be removed in future version, please use Duration style instead, such as 10s, 1m.`)
interval = time.Duration(itv) * time.Millisecond
case string:
if interval, err = time.ParseDuration(itv); err != nil {
return 0, err
}
case nil:
interval = 0
default:
return 0, fmt.Errorf("failed to parse verify.retry.interval: %v", retryInterval)
}
if interval < 0 {
interval = 1 * time.Second
}
return interval, nil
}