in packages/datastream.go [220:280]
func (d *DataStream) Validate() error {
if ValidationDisabled {
return nil
}
if strings.Contains(d.Dataset, "-") {
return fmt.Errorf("data stream name is not allowed to contain `-`: %s", d.Dataset)
}
if !d.validType() {
return fmt.Errorf("type is not valid: %s", d.Type)
}
fsys, err := d.packageRef.fs()
if err != nil {
return err
}
defer fsys.Close()
// In case an ingest pipeline is set, check if it is around
pipelineDir := path.Join(d.BasePath, "elasticsearch", DirIngestPipeline)
if d.IngestPipeline != "" {
var validFound bool
jsonPipelinePath := path.Join(pipelineDir, d.IngestPipeline+".json")
_, errJSON := fsys.Stat(jsonPipelinePath)
if errJSON != nil && !os.IsNotExist(errJSON) {
return fmt.Errorf("stat ingest pipeline JSON file failed (path: %s): %w", jsonPipelinePath, errJSON)
}
if !os.IsNotExist(errJSON) {
err := validateIngestPipelineFile(fsys, jsonPipelinePath)
if err != nil {
return fmt.Errorf("validating ingest pipeline JSON file failed (path: %s): %w", jsonPipelinePath, err)
}
validFound = true
}
yamlPipelinePath := path.Join(pipelineDir, d.IngestPipeline+".yml")
_, errYAML := fsys.Stat(yamlPipelinePath)
if errYAML != nil && !os.IsNotExist(errYAML) {
return fmt.Errorf("stat ingest pipeline YAML file failed (path: %s): %w", jsonPipelinePath, errYAML)
}
if !os.IsNotExist(errYAML) {
err := validateIngestPipelineFile(fsys, yamlPipelinePath)
if err != nil {
return fmt.Errorf("validating ingest pipeline YAML file failed (path: %s): %w", jsonPipelinePath, err)
}
validFound = true
}
if !validFound {
return fmt.Errorf("defined ingest_pipeline does not exist: %s", pipelineDir+d.IngestPipeline)
}
}
err = d.validateRequiredFields(fsys)
if err != nil {
return fmt.Errorf("validating required fields failed: %w", err)
}
return nil
}