func NewDataStream()

in packages/datastream.go [121:218]


func NewDataStream(basePath string, p *Package) (*DataStream, error) {
	fsys, err := p.fs()
	if err != nil {
		return nil, err
	}
	defer fsys.Close()

	manifestPath := path.Join(basePath, "manifest.yml")

	// Check if manifest exists
	_, err = fsys.Stat(manifestPath)
	if err != nil && os.IsNotExist(err) {
		return nil, fmt.Errorf("manifest does not exist for data stream: %s: %w", p.BasePath, err)
	}

	dataStreamPath := path.Base(basePath)

	b, err := ReadAll(fsys, manifestPath)
	if err != nil {
		return nil, fmt.Errorf("failed to read manifest: %s: %w", err, err)
	}

	manifest, err := yaml.NewConfig(b, ucfg.PathSep("."))
	if err != nil {
		return nil, fmt.Errorf("error creating new manifest config: %w", err)
	}
	var d = &DataStream{
		Package:    p.Name,
		packageRef: p,

		// This is the name of the directory of the dataStream
		Path:     dataStreamPath,
		BasePath: basePath,
	}

	// go-ucfg automatically calls the `Validate` method on the DataStream object here
	err = manifest.Unpack(d, ucfg.PathSep("."))
	if err != nil {
		return nil, fmt.Errorf("error building data stream (path: %s) in package: %s: %w", dataStreamPath, p.Name, err)
	}

	// if id is not set, {package}.{dataStreamPath} is the default
	if d.Dataset == "" {
		d.Dataset = p.Name + "." + dataStreamPath
	}

	if d.Release == "" {
		d.Release = p.Release
	}

	// Default for the enabled flags is true.
	trueValue := true
	for i := range d.Streams {
		if d.Streams[i].Enabled == nil {
			d.Streams[i].Enabled = &trueValue
		}

		// TODO: validate that the template path actually exists
		if d.Streams[i].TemplatePath == "" {
			d.Streams[i].TemplatePath = "stream.yml.hbs"
		}
	}

	if !IsValidRelease(d.Release) {
		return nil, fmt.Errorf("invalid release: %q", d.Release)
	}

	pipelineDir := path.Join(d.BasePath, "elasticsearch", DirIngestPipeline)
	paths, err := fsys.Glob(path.Join(pipelineDir, "*"))
	if err != nil {
		return nil, err
	}

	if d.Elasticsearch != nil && d.Elasticsearch.IngestPipelineName == "" {
		// Check that no ingest pipeline exists in the directory except default
		for _, p := range paths {
			if path.Base(p) == DefaultPipelineNameJSON || path.Base(p) == DefaultPipelineNameYAML {
				d.Elasticsearch.IngestPipelineName = DefaultPipelineName
				// TODO: remove because of legacy
				d.IngestPipeline = DefaultPipelineName
				break
			}
		}
		// TODO: Remove, only here for legacy
	} else if d.IngestPipeline == "" {
		// Check that no ingest pipeline exists in the directory except default
		for _, p := range paths {
			if path.Base(p) == DefaultPipelineNameJSON || path.Base(p) == DefaultPipelineNameYAML {
				d.IngestPipeline = DefaultPipelineName
				break
			}
		}
	}
	if d.IngestPipeline == "" && len(paths) > 0 {
		return nil, fmt.Errorf("unused pipelines in the package (dataset: %s): %s", d.Dataset, strings.Join(paths, ","))
	}
	return d, nil
}