func createDataStreams()

in dev/import-beats/datasets.go [48:139]


func createDataStreams(beatType, modulePath, moduleName, moduleTitle string, moduleFields []fieldDefinition,
	filteredEcsModuleFieldNames []string, ecsFields fieldDefinitionArray) (dataStreamContentArray, error) {
	dataStreamDirs, err := ioutil.ReadDir(modulePath)
	if err != nil {
		return nil, errors.Wrapf(err, "cannot read module directory %s", modulePath)
	}

	var contents []dataStreamContent
	for _, dataStreamDir := range dataStreamDirs {
		if !dataStreamDir.IsDir() {
			continue
		}
		dataStreamName := dataStreamDir.Name()

		if dataStreamName == "_meta" {
			continue
		}

		dataStreamPath := filepath.Join(modulePath, dataStreamName)
		_, err := os.Stat(filepath.Join(dataStreamPath, "_meta"))
		if os.IsNotExist(err) {
			_, err = os.Stat(filepath.Join(dataStreamPath, "manifest.yml"))
			if os.IsNotExist(err) {
				log.Printf("\t%s: not a valid dataStream, skipped", dataStreamName)
				continue
			}
		}

		log.Printf("\t%s: dataStream found", dataStreamName)

		// fields
		dataStreamFields, err := loadDataStreamFields(modulePath, moduleName, dataStreamName)
		if err != nil {
			return nil, errors.Wrapf(err, "loading dataStream fields failed (modulePath: %s, dataStreamName: %s)",
				modulePath, dataStreamName)
		}
		dataStreamFields, filteredEcsDataStreamFieldNames, err := filterMigratedFields(dataStreamFields, ecsFields.names())
		if err != nil {
			return nil, errors.Wrapf(err, "filtering uncommon migrated failed (modulePath: %s, dataStreamName: %s)",
				modulePath, dataStreamName)
		}

		foundEcsFieldNames := uniqueStringValues(append(filteredEcsModuleFieldNames, filteredEcsDataStreamFieldNames...))
		ecsFields := filterEcsFields(ecsFields, foundEcsFieldNames)

		fieldsFiles := map[string]fieldDefinitionArray{}
		if len(ecsFields) > 0 {
			fieldsFiles["ecs.yml"] = ecsFields
		}
		if len(moduleFields) > 0 && len(moduleFields[0].Fields) > 0 {
			fieldsFiles["package-fields.yml"] = moduleFields
		}
		if len(dataStreamFields) > 0 {
			fieldsFiles["fields.yml"] = dataStreamFields
		}
		fieldsFiles["base-fields.yml"] = baseFields

		fields := fieldsContent{
			files: fieldsFiles,
		}

		// elasticsearch
		elasticsearch, err := loadElasticsearchContent(dataStreamPath)
		if err != nil {
			return nil, errors.Wrapf(err, "loading elasticsearch content failed (dataStreamPath: %s)", dataStreamPath)
		}

		// streams and agents
		streams, agent, err := createStreams(modulePath, moduleName, moduleTitle, dataStreamName, beatType)
		if err != nil {
			return nil, errors.Wrapf(err, "creating streams failed (dataStreamPath: %s)", dataStreamPath)
		}

		// manifest
		manifest := packages.DataStream{
			Title:   fmt.Sprintf("%s %s %s", moduleTitle, dataStreamName, beatType),
			Release: "experimental",
			Type:    beatType,
			Streams: streams,
		}

		contents = append(contents, dataStreamContent{
			name:          dataStreamName,
			beatType:      beatType,
			manifest:      manifest,
			agent:         agent,
			elasticsearch: elasticsearch,
			fields:        fields,
		})
	}
	return contents, nil
}