in internal/pkg/agent/application/filters/stream_checker.go [25:151]
func StreamChecker(log *logger.Logger, ast *transpiler.AST) error {
inputsNode, found := transpiler.Lookup(ast, "inputs")
if !found {
return nil
}
inputsNodeList, ok := inputsNode.Value().(*transpiler.List)
if !ok {
return nil
}
inputsNodeListCollection, ok := inputsNodeList.Value().([]transpiler.Node)
if !ok {
return errors.New("inputs is not a list", errors.TypeConfig)
}
for _, inputNode := range inputsNodeListCollection {
namespace := "default"
datasetName := "generic"
// fail only if data_stream.namespace or data_stream[namespace] is found and invalid
// not provided values are ok and will be fixed by rules
if nsNode, found := inputNode.Find("data_stream.namespace"); found {
nsKey, ok := nsNode.(*transpiler.Key)
if ok {
namespace = nsKey.Value().(transpiler.Node).String()
}
} else {
dsNode, found := inputNode.Find("data_stream")
if found {
// got a datastream
datasetMap, ok := dsNode.Value().(*transpiler.Dict)
if ok {
nsNode, found := datasetMap.Find("namespace")
if found {
nsKey, ok := nsNode.(*transpiler.Key)
if ok {
namespace = nsKey.Value().(transpiler.Node).String()
}
}
}
}
}
if !matchesNamespaceContraints(namespace) {
return ErrInvalidNamespace
}
// get the type, longest type for now is metrics
datasetType := "metrics"
if nsNode, found := inputNode.Find("data_stream.type"); found {
nsKey, ok := nsNode.(*transpiler.Key)
if ok {
newDataset := nsKey.Value().(transpiler.Node).String()
datasetType = newDataset
}
} else {
dsNode, found := inputNode.Find("data_stream")
if found {
// got a dataset
datasetMap, ok := dsNode.Value().(*transpiler.Dict)
if ok {
nsNode, found := datasetMap.Find("type")
if found {
nsKey, ok := nsNode.(*transpiler.Key)
if ok {
newDataset := nsKey.Value().(transpiler.Node).String()
datasetType = newDataset
}
}
}
}
}
if !matchesTypeConstraints(datasetType) {
return ErrInvalidIndex
}
streamsNode, ok := inputNode.Find("streams")
if ok {
streamsList, ok := streamsNode.Value().(*transpiler.List)
if ok {
streamNodes, ok := streamsList.Value().([]transpiler.Node)
if !ok {
return errors.New("streams is not a list", errors.TypeConfig)
}
for _, streamNode := range streamNodes {
streamMap, ok := streamNode.(*transpiler.Dict)
if !ok {
continue
}
// fix this only if in compact form
if dsNameNode, found := streamMap.Find("data_stream.dataset"); found {
dsKey, ok := dsNameNode.(*transpiler.Key)
if ok {
datasetName = dsKey.Value().(transpiler.Node).String()
break
}
} else {
datasetNode, found := streamMap.Find("data_stream")
if found {
datasetMap, ok := datasetNode.Value().(*transpiler.Dict)
if !ok {
continue
}
dsNameNode, found := datasetMap.Find("dataset")
if found {
dsKey, ok := dsNameNode.(*transpiler.Key)
if ok {
datasetName = dsKey.Value().(transpiler.Node).String()
break
}
}
}
}
}
}
}
if !matchesDatasetConstraints(datasetName) {
return ErrInvalidDataset
}
}
return nil
}