func uploadDatastreams()

in index.go [502:622]


func uploadDatastreams(datastreams []map[string]interface{}) {
	indexName := "integration_data_stream"

	es, err := elasticsearch.NewClient(elasticsearch.Config{})

	if err != nil {
		fmt.Printf("problem generating elasticsearch client")
	}

	log.Println(es.Info())

	_, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true))

	if err != nil {
		fmt.Printf("Problem deleting old index %v", err)
	}

	mapping := `{
	    "mappings": {
			"dynamic_templates": [
				{
				  "default_as_keyword": {
					"match_mapping_type": "*",
					"path_match": "*default",
					"runtime": {
					  "type": "keyword"
					}
				  }
				},
				{
				  "dynamic_as_keyword": {
					"match_mapping_type": "*",
					"path_match": "*mappings.dynamic",
					"runtime": {
					  "type": "keyword"
					}
				  }
				}
			]
		}
	}`

	_, err = es.Indices.Create(indexName, es.Indices.Create.WithBody(strings.NewReader(mapping)))

	if err != nil {
		fmt.Printf("Problem creating index: %v\n", err)
	}

	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:         indexName,
		Client:        es,
		NumWorkers:    runtime.NumCPU(),
		FlushBytes:    5e+6,
		FlushInterval: 30 * time.Second, // The periodic flush interval
	})

	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}

	var countSuccessful uint64

	for i, vis := range datastreams {
		// Prepare the data payload: encode vis to JSON

		data, err := json.Marshal(vis)
		if err != nil {
			log.Fatalf("Cannot encode datastream %d: %s", i, err)
		}

		// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
		//
		// Add an item to the BulkIndexer
		//
		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				// Action field configures the operation to perform (index, create, delete, update)
				Action: "index",

				// DocumentID is the (optional) document ID
				// DocumentID: strconv.Itoa(a.ID),

				// Body is an `io.Reader` with the payload
				Body: bytes.NewReader(data),

				// OnSuccess is called for each successful operation
				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					atomic.AddUint64(&countSuccessful, 1)
				},

				// OnFailure is called for each failed operation
				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					if err != nil {
						log.Printf("ERROR: %s", err)
					} else {
						log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
					}
				},
			},
		)

		if err != nil {
			log.Fatalf("Unexpected error: %s", err)
		}
	}

	// Close the indexer
	if err := bi.Close(context.Background()); err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}

	biStats := bi.Stats()

	log.Printf("%v", biStats)

	// beatsData := collectBeats()
	// for _, vis := range beatsData {
	// 	fmt.Printf("%v\n", vis)
	// }
}