in index.go [502:622]
func uploadDatastreams(datastreams []map[string]interface{}) {
indexName := "integration_data_stream"
es, err := elasticsearch.NewClient(elasticsearch.Config{})
if err != nil {
fmt.Printf("problem generating elasticsearch client")
}
log.Println(es.Info())
_, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true))
if err != nil {
fmt.Printf("Problem deleting old index %v", err)
}
mapping := `{
"mappings": {
"dynamic_templates": [
{
"default_as_keyword": {
"match_mapping_type": "*",
"path_match": "*default",
"runtime": {
"type": "keyword"
}
}
},
{
"dynamic_as_keyword": {
"match_mapping_type": "*",
"path_match": "*mappings.dynamic",
"runtime": {
"type": "keyword"
}
}
}
]
}
}`
_, err = es.Indices.Create(indexName, es.Indices.Create.WithBody(strings.NewReader(mapping)))
if err != nil {
fmt.Printf("Problem creating index: %v\n", err)
}
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: indexName,
Client: es,
NumWorkers: runtime.NumCPU(),
FlushBytes: 5e+6,
FlushInterval: 30 * time.Second, // The periodic flush interval
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
var countSuccessful uint64
for i, vis := range datastreams {
// Prepare the data payload: encode vis to JSON
data, err := json.Marshal(vis)
if err != nil {
log.Fatalf("Cannot encode datastream %d: %s", i, err)
}
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//
// Add an item to the BulkIndexer
//
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",
// DocumentID is the (optional) document ID
// DocumentID: strconv.Itoa(a.ID),
// Body is an `io.Reader` with the payload
Body: bytes.NewReader(data),
// OnSuccess is called for each successful operation
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
// OnFailure is called for each failed operation
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
}
// Close the indexer
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
biStats := bi.Stats()
log.Printf("%v", biStats)
// beatsData := collectBeats()
// for _, vis := range beatsData {
// fmt.Printf("%v\n", vis)
// }
}