in index.go [319:442]
func uploadVisualizations(visualizations []Visualization) {
indexName := "legacy_vis"
es, err := elasticsearch.NewClient(elasticsearch.Config{})
if err != nil {
fmt.Printf("problem generating elasticsearch client")
}
log.Println(es.Info())
_, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true))
if err != nil {
fmt.Printf("Problem deleting old index %v", err)
}
mapping := `{
"mappings": {
"properties": {
"doc": { "type": "flattened", "depth_limit": 50 },
"manifest": { "type": "flattened" },
"soType": { "type": "keyword" },
"app": { "type": "keyword" },
"source": { "type": "keyword" },
"link": { "type": "keyword" },
"dashboard": { "type": "keyword" },
"path": { "type": "keyword" },
"vis_type": { "type": "keyword" },
"vis_tsvb_type": { "type": "keyword" },
"vis_title": { "type": "keyword" },
"gh_owner": { "type": "keyword" },
"owning_group": { "type": "keyword" },
"is_legacy": { "type": "boolean" },
"commit": {
"properties": {
"hash": { "type": "keyword" },
"author": { "type": "keyword" },
"date": { "type": "date" }
}
}
}
}
}`
_, err = es.Indices.Create(indexName, es.Indices.Create.WithBody(strings.NewReader(mapping)))
if err != nil {
fmt.Printf("Problem creating index: %v\n", err)
}
bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Index: indexName,
Client: es,
NumWorkers: runtime.NumCPU(),
FlushBytes: 5e+6,
FlushInterval: 30 * time.Second, // The periodic flush interval
})
if err != nil {
log.Fatalf("Error creating the indexer: %s", err)
}
var countSuccessful uint64
for i, vis := range visualizations {
// Prepare the data payload: encode vis to JSON
data, err := json.Marshal(vis)
if err != nil {
log.Fatalf("Cannot encode visualization %d: %s", i, err)
}
// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//
// Add an item to the BulkIndexer
//
err = bi.Add(
context.Background(),
esutil.BulkIndexerItem{
// Action field configures the operation to perform (index, create, delete, update)
Action: "index",
// DocumentID is the (optional) document ID
// DocumentID: strconv.Itoa(a.ID),
// Body is an `io.Reader` with the payload
Body: bytes.NewReader(data),
// OnSuccess is called for each successful operation
OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
atomic.AddUint64(&countSuccessful, 1)
},
// OnFailure is called for each failed operation
OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
if err != nil {
log.Printf("ERROR: %s", err)
} else {
log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
}
},
},
)
if err != nil {
log.Fatalf("Unexpected error: %s", err)
}
}
// Close the indexer
if err := bi.Close(context.Background()); err != nil {
log.Fatalf("Unexpected error: %s", err)
}
biStats := bi.Stats()
log.Printf("%v", biStats)
// beatsData := collectBeats()
// for _, vis := range beatsData {
// fmt.Printf("%v\n", vis)
// }
}