dev/import-beats/datasets.go (108 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/elastic/package-registry/packages"
)
type dataStreamContent struct {
name string
beatType string
manifest packages.DataStream
agent agentContent
elasticsearch elasticsearchContent
fields fieldsContent
}
type dataStreamContentArray []dataStreamContent
func (dca dataStreamContentArray) names() []string {
var names []string
for _, dc := range dca {
names = append(names, dc.name)
}
return names
}
type dataStreamManifestMultiplePipelines struct {
IngestPipeline []string `yaml:"ingest_pipeline"`
}
type dataStreamManifestSinglePipeline struct {
IngestPipeline string `yaml:"ingest_pipeline"`
}
func createDataStreams(beatType, modulePath, moduleName, moduleTitle string, moduleFields []fieldDefinition,
filteredEcsModuleFieldNames []string, ecsFields fieldDefinitionArray) (dataStreamContentArray, error) {
dataStreamDirs, err := ioutil.ReadDir(modulePath)
if err != nil {
return nil, errors.Wrapf(err, "cannot read module directory %s", modulePath)
}
var contents []dataStreamContent
for _, dataStreamDir := range dataStreamDirs {
if !dataStreamDir.IsDir() {
continue
}
dataStreamName := dataStreamDir.Name()
if dataStreamName == "_meta" {
continue
}
dataStreamPath := filepath.Join(modulePath, dataStreamName)
_, err := os.Stat(filepath.Join(dataStreamPath, "_meta"))
if os.IsNotExist(err) {
_, err = os.Stat(filepath.Join(dataStreamPath, "manifest.yml"))
if os.IsNotExist(err) {
log.Printf("\t%s: not a valid dataStream, skipped", dataStreamName)
continue
}
}
log.Printf("\t%s: dataStream found", dataStreamName)
// fields
dataStreamFields, err := loadDataStreamFields(modulePath, moduleName, dataStreamName)
if err != nil {
return nil, errors.Wrapf(err, "loading dataStream fields failed (modulePath: %s, dataStreamName: %s)",
modulePath, dataStreamName)
}
dataStreamFields, filteredEcsDataStreamFieldNames, err := filterMigratedFields(dataStreamFields, ecsFields.names())
if err != nil {
return nil, errors.Wrapf(err, "filtering uncommon migrated failed (modulePath: %s, dataStreamName: %s)",
modulePath, dataStreamName)
}
foundEcsFieldNames := uniqueStringValues(append(filteredEcsModuleFieldNames, filteredEcsDataStreamFieldNames...))
ecsFields := filterEcsFields(ecsFields, foundEcsFieldNames)
fieldsFiles := map[string]fieldDefinitionArray{}
if len(ecsFields) > 0 {
fieldsFiles["ecs.yml"] = ecsFields
}
if len(moduleFields) > 0 && len(moduleFields[0].Fields) > 0 {
fieldsFiles["package-fields.yml"] = moduleFields
}
if len(dataStreamFields) > 0 {
fieldsFiles["fields.yml"] = dataStreamFields
}
fieldsFiles["base-fields.yml"] = baseFields
fields := fieldsContent{
files: fieldsFiles,
}
// elasticsearch
elasticsearch, err := loadElasticsearchContent(dataStreamPath)
if err != nil {
return nil, errors.Wrapf(err, "loading elasticsearch content failed (dataStreamPath: %s)", dataStreamPath)
}
// streams and agents
streams, agent, err := createStreams(modulePath, moduleName, moduleTitle, dataStreamName, beatType)
if err != nil {
return nil, errors.Wrapf(err, "creating streams failed (dataStreamPath: %s)", dataStreamPath)
}
// manifest
manifest := packages.DataStream{
Title: fmt.Sprintf("%s %s %s", moduleTitle, dataStreamName, beatType),
Release: "experimental",
Type: beatType,
Streams: streams,
}
contents = append(contents, dataStreamContent{
name: dataStreamName,
beatType: beatType,
manifest: manifest,
agent: agent,
elasticsearch: elasticsearch,
fields: fields,
})
}
return contents, nil
}