func loadElasticsearchContent()

in dev/import-beats/elasticsearch.go [37:112]


func loadElasticsearchContent(dataStreamPath string) (elasticsearchContent, error) {
	var esc elasticsearchContent

	dataStreamManifestPath := filepath.Join(dataStreamPath, "manifest.yml")
	dataStreamManifestFile, err := ioutil.ReadFile(dataStreamManifestPath)
	if os.IsNotExist(err) {
		return elasticsearchContent{}, nil // no manifest.yml file found,
	}
	if err != nil {
		return elasticsearchContent{}, errors.Wrapf(err, "reading dataStream manifest file failed (path: %s)", dataStreamManifestPath)
	}

	var ingestPipelines []string
	var dmsp dataStreamManifestSinglePipeline
	err = yaml.Unmarshal(dataStreamManifestFile, &dmsp)
	if err == nil {
		if len(dmsp.IngestPipeline) > 0 {
			ingestPipelines = append(ingestPipelines, dmsp.IngestPipeline)
		}
	} else {
		var dmmp dataStreamManifestMultiplePipelines
		err = yaml.Unmarshal(dataStreamManifestFile, &dmmp)
		if err != nil {
			return elasticsearchContent{}, errors.Wrapf(err, "unmarshalling dataStream manifest file failed (path: %s)", dataStreamManifestPath)
		}

		if len(dmmp.IngestPipeline) > 0 {
			ingestPipelines = append(ingestPipelines, dmmp.IngestPipeline...)
		}
	}

	for _, ingestPipeline := range ingestPipelines {
		ingestPipeline = ensurePipelineFormat(ingestPipeline)

		log.Printf("\tingest-pipeline found: %s", ingestPipeline)

		var targetFileName string
		if len(ingestPipelines) == 1 {
			targetFileName, err = buildSingleIngestPipelineTargetName(ingestPipeline)
			if err != nil {
				return elasticsearchContent{}, errors.Wrapf(err, "can't build single ingest pipeline target name (path: %s)", ingestPipeline)
			}
		} else {
			targetFileName, err = determineIngestPipelineTargetName(ingestPipeline)
			if err != nil {
				return elasticsearchContent{}, errors.Wrapf(err, "can't determine ingest pipeline target name (path: %s)", ingestPipeline)
			}
		}

		pipelinePath := filepath.Join(dataStreamPath, ingestPipeline)
		body, err := ioutil.ReadFile(pipelinePath)
		if err != nil {
			return elasticsearchContent{}, errors.Wrapf(err, "reading pipeline body failed (path: %s)", pipelinePath)
		}

		// Fix missing "---" at the beginning of the YAML pipeline.
		if strings.HasSuffix(targetFileName, ".yml") && bytes.Index(body, []byte("---")) != 0 {
			body = append([]byte("---\n"), body...)
		}

		ipc := ingestPipelineContent{
			targetFileName: targetFileName,
			body:           adjustUnsupportedStructuresInPipeline(body),
		}

		err = validateIngestPipeline(ipc)
		if err != nil {
			return elasticsearchContent{},
				errors.Wrapf(err, "validation of modified ingest pipeline failed (original path: %s)", pipelinePath)
		}

		esc.ingestPipelines = append(esc.ingestPipelines, ipc)
	}

	return esc, nil
}