func StreamChecker()

in internal/pkg/agent/application/filters/stream_checker.go [25:151]


func StreamChecker(log *logger.Logger, ast *transpiler.AST) error {
	inputsNode, found := transpiler.Lookup(ast, "inputs")
	if !found {
		return nil
	}

	inputsNodeList, ok := inputsNode.Value().(*transpiler.List)
	if !ok {
		return nil
	}

	inputsNodeListCollection, ok := inputsNodeList.Value().([]transpiler.Node)
	if !ok {
		return errors.New("inputs is not a list", errors.TypeConfig)
	}

	for _, inputNode := range inputsNodeListCollection {
		namespace := "default"
		datasetName := "generic"
		// fail only if data_stream.namespace or data_stream[namespace] is found and invalid
		// not provided values are ok and will be fixed by rules
		if nsNode, found := inputNode.Find("data_stream.namespace"); found {
			nsKey, ok := nsNode.(*transpiler.Key)
			if ok {
				namespace = nsKey.Value().(transpiler.Node).String()
			}
		} else {
			dsNode, found := inputNode.Find("data_stream")
			if found {
				// got a datastream
				datasetMap, ok := dsNode.Value().(*transpiler.Dict)
				if ok {
					nsNode, found := datasetMap.Find("namespace")
					if found {
						nsKey, ok := nsNode.(*transpiler.Key)
						if ok {
							namespace = nsKey.Value().(transpiler.Node).String()
						}
					}
				}
			}
		}

		if !matchesNamespaceContraints(namespace) {
			return ErrInvalidNamespace
		}

		// get the type, longest type for now is metrics
		datasetType := "metrics"
		if nsNode, found := inputNode.Find("data_stream.type"); found {
			nsKey, ok := nsNode.(*transpiler.Key)
			if ok {
				newDataset := nsKey.Value().(transpiler.Node).String()
				datasetType = newDataset
			}
		} else {
			dsNode, found := inputNode.Find("data_stream")
			if found {
				// got a dataset
				datasetMap, ok := dsNode.Value().(*transpiler.Dict)
				if ok {
					nsNode, found := datasetMap.Find("type")
					if found {
						nsKey, ok := nsNode.(*transpiler.Key)
						if ok {
							newDataset := nsKey.Value().(transpiler.Node).String()
							datasetType = newDataset
						}
					}
				}
			}
		}

		if !matchesTypeConstraints(datasetType) {
			return ErrInvalidIndex
		}

		streamsNode, ok := inputNode.Find("streams")
		if ok {
			streamsList, ok := streamsNode.Value().(*transpiler.List)
			if ok {
				streamNodes, ok := streamsList.Value().([]transpiler.Node)
				if !ok {
					return errors.New("streams is not a list", errors.TypeConfig)
				}

				for _, streamNode := range streamNodes {
					streamMap, ok := streamNode.(*transpiler.Dict)
					if !ok {
						continue
					}

					// fix this only if in compact form
					if dsNameNode, found := streamMap.Find("data_stream.dataset"); found {
						dsKey, ok := dsNameNode.(*transpiler.Key)
						if ok {
							datasetName = dsKey.Value().(transpiler.Node).String()
							break
						}
					} else {
						datasetNode, found := streamMap.Find("data_stream")
						if found {
							datasetMap, ok := datasetNode.Value().(*transpiler.Dict)
							if !ok {
								continue
							}

							dsNameNode, found := datasetMap.Find("dataset")
							if found {
								dsKey, ok := dsNameNode.(*transpiler.Key)
								if ok {
									datasetName = dsKey.Value().(transpiler.Node).String()
									break
								}
							}
						}
					}
				}
			}
		}
		if !matchesDatasetConstraints(datasetName) {
			return ErrInvalidDataset
		}
	}

	return nil
}