in dev/import-beats/elasticsearch.go [37:112]
func loadElasticsearchContent(dataStreamPath string) (elasticsearchContent, error) {
var esc elasticsearchContent
dataStreamManifestPath := filepath.Join(dataStreamPath, "manifest.yml")
dataStreamManifestFile, err := ioutil.ReadFile(dataStreamManifestPath)
if os.IsNotExist(err) {
return elasticsearchContent{}, nil // no manifest.yml file found,
}
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "reading dataStream manifest file failed (path: %s)", dataStreamManifestPath)
}
var ingestPipelines []string
var dmsp dataStreamManifestSinglePipeline
err = yaml.Unmarshal(dataStreamManifestFile, &dmsp)
if err == nil {
if len(dmsp.IngestPipeline) > 0 {
ingestPipelines = append(ingestPipelines, dmsp.IngestPipeline)
}
} else {
var dmmp dataStreamManifestMultiplePipelines
err = yaml.Unmarshal(dataStreamManifestFile, &dmmp)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "unmarshalling dataStream manifest file failed (path: %s)", dataStreamManifestPath)
}
if len(dmmp.IngestPipeline) > 0 {
ingestPipelines = append(ingestPipelines, dmmp.IngestPipeline...)
}
}
for _, ingestPipeline := range ingestPipelines {
ingestPipeline = ensurePipelineFormat(ingestPipeline)
log.Printf("\tingest-pipeline found: %s", ingestPipeline)
var targetFileName string
if len(ingestPipelines) == 1 {
targetFileName, err = buildSingleIngestPipelineTargetName(ingestPipeline)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "can't build single ingest pipeline target name (path: %s)", ingestPipeline)
}
} else {
targetFileName, err = determineIngestPipelineTargetName(ingestPipeline)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "can't determine ingest pipeline target name (path: %s)", ingestPipeline)
}
}
pipelinePath := filepath.Join(dataStreamPath, ingestPipeline)
body, err := ioutil.ReadFile(pipelinePath)
if err != nil {
return elasticsearchContent{}, errors.Wrapf(err, "reading pipeline body failed (path: %s)", pipelinePath)
}
// Fix missing "---" at the beginning of the YAML pipeline.
if strings.HasSuffix(targetFileName, ".yml") && bytes.Index(body, []byte("---")) != 0 {
body = append([]byte("---\n"), body...)
}
ipc := ingestPipelineContent{
targetFileName: targetFileName,
body: adjustUnsupportedStructuresInPipeline(body),
}
err = validateIngestPipeline(ipc)
if err != nil {
return elasticsearchContent{},
errors.Wrapf(err, "validation of modified ingest pipeline failed (original path: %s)", pipelinePath)
}
esc.ingestPipelines = append(esc.ingestPipelines, ipc)
}
return esc, nil
}