in packages/datastream.go [121:218]
func NewDataStream(basePath string, p *Package) (*DataStream, error) {
fsys, err := p.fs()
if err != nil {
return nil, err
}
defer fsys.Close()
manifestPath := path.Join(basePath, "manifest.yml")
// Check if manifest exists
_, err = fsys.Stat(manifestPath)
if err != nil && os.IsNotExist(err) {
return nil, fmt.Errorf("manifest does not exist for data stream: %s: %w", p.BasePath, err)
}
dataStreamPath := path.Base(basePath)
b, err := ReadAll(fsys, manifestPath)
if err != nil {
return nil, fmt.Errorf("failed to read manifest: %s: %w", err, err)
}
manifest, err := yaml.NewConfig(b, ucfg.PathSep("."))
if err != nil {
return nil, fmt.Errorf("error creating new manifest config: %w", err)
}
var d = &DataStream{
Package: p.Name,
packageRef: p,
// This is the name of the directory of the dataStream
Path: dataStreamPath,
BasePath: basePath,
}
// go-ucfg automatically calls the `Validate` method on the DataStream object here
err = manifest.Unpack(d, ucfg.PathSep("."))
if err != nil {
return nil, fmt.Errorf("error building data stream (path: %s) in package: %s: %w", dataStreamPath, p.Name, err)
}
// if id is not set, {package}.{dataStreamPath} is the default
if d.Dataset == "" {
d.Dataset = p.Name + "." + dataStreamPath
}
if d.Release == "" {
d.Release = p.Release
}
// Default for the enabled flags is true.
trueValue := true
for i := range d.Streams {
if d.Streams[i].Enabled == nil {
d.Streams[i].Enabled = &trueValue
}
// TODO: validate that the template path actually exists
if d.Streams[i].TemplatePath == "" {
d.Streams[i].TemplatePath = "stream.yml.hbs"
}
}
if !IsValidRelease(d.Release) {
return nil, fmt.Errorf("invalid release: %q", d.Release)
}
pipelineDir := path.Join(d.BasePath, "elasticsearch", DirIngestPipeline)
paths, err := fsys.Glob(path.Join(pipelineDir, "*"))
if err != nil {
return nil, err
}
if d.Elasticsearch != nil && d.Elasticsearch.IngestPipelineName == "" {
// Check that no ingest pipeline exists in the directory except default
for _, p := range paths {
if path.Base(p) == DefaultPipelineNameJSON || path.Base(p) == DefaultPipelineNameYAML {
d.Elasticsearch.IngestPipelineName = DefaultPipelineName
// TODO: remove because of legacy
d.IngestPipeline = DefaultPipelineName
break
}
}
// TODO: Remove, only here for legacy
} else if d.IngestPipeline == "" {
// Check that no ingest pipeline exists in the directory except default
for _, p := range paths {
if path.Base(p) == DefaultPipelineNameJSON || path.Base(p) == DefaultPipelineNameYAML {
d.IngestPipeline = DefaultPipelineName
break
}
}
}
if d.IngestPipeline == "" && len(paths) > 0 {
return nil, fmt.Errorf("unused pipelines in the package (dataset: %s): %s", d.Dataset, strings.Join(paths, ","))
}
return d, nil
}