func()

in packages/datastream.go [220:280]


func (d *DataStream) Validate() error {
	if ValidationDisabled {
		return nil
	}

	if strings.Contains(d.Dataset, "-") {
		return fmt.Errorf("data stream name is not allowed to contain `-`: %s", d.Dataset)
	}

	if !d.validType() {
		return fmt.Errorf("type is not valid: %s", d.Type)
	}

	fsys, err := d.packageRef.fs()
	if err != nil {
		return err
	}
	defer fsys.Close()

	// In case an ingest pipeline is set, check if it is around
	pipelineDir := path.Join(d.BasePath, "elasticsearch", DirIngestPipeline)
	if d.IngestPipeline != "" {
		var validFound bool

		jsonPipelinePath := path.Join(pipelineDir, d.IngestPipeline+".json")
		_, errJSON := fsys.Stat(jsonPipelinePath)
		if errJSON != nil && !os.IsNotExist(errJSON) {
			return fmt.Errorf("stat ingest pipeline JSON file failed (path: %s): %w", jsonPipelinePath, errJSON)
		}
		if !os.IsNotExist(errJSON) {
			err := validateIngestPipelineFile(fsys, jsonPipelinePath)
			if err != nil {
				return fmt.Errorf("validating ingest pipeline JSON file failed (path: %s): %w", jsonPipelinePath, err)
			}
			validFound = true
		}

		yamlPipelinePath := path.Join(pipelineDir, d.IngestPipeline+".yml")
		_, errYAML := fsys.Stat(yamlPipelinePath)
		if errYAML != nil && !os.IsNotExist(errYAML) {
			return fmt.Errorf("stat ingest pipeline YAML file failed (path: %s): %w", jsonPipelinePath, errYAML)
		}
		if !os.IsNotExist(errYAML) {
			err := validateIngestPipelineFile(fsys, yamlPipelinePath)
			if err != nil {
				return fmt.Errorf("validating ingest pipeline YAML file failed (path: %s): %w", jsonPipelinePath, err)
			}
			validFound = true
		}

		if !validFound {
			return fmt.Errorf("defined ingest_pipeline does not exist: %s", pipelineDir+d.IngestPipeline)
		}
	}

	err = d.validateRequiredFields(fsys)
	if err != nil {
		return fmt.Errorf("validating required fields failed: %w", err)
	}
	return nil
}