func uploadVisualizations()

in index.go [319:442]


func uploadVisualizations(visualizations []Visualization) {
	indexName := "legacy_vis"

	es, err := elasticsearch.NewClient(elasticsearch.Config{})

	if err != nil {
		fmt.Printf("problem generating elasticsearch client")
	}

	log.Println(es.Info())

	_, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true))

	if err != nil {
		fmt.Printf("Problem deleting old index %v", err)
	}

	mapping := `{
	    "mappings": {
		"properties": {
			"doc": { "type": "flattened", "depth_limit": 50 },
			"manifest": { "type": "flattened" },
			"soType": { "type": "keyword" },
			"app": { "type": "keyword" },
			"source": { "type": "keyword" }, 
			"link": { "type": "keyword" }, 
			"dashboard": { "type": "keyword" }, 
			"path": { "type": "keyword" },
			"vis_type": { "type": "keyword" },
			"vis_tsvb_type": { "type": "keyword" },
			"vis_title": { "type": "keyword" },
			"gh_owner": { "type": "keyword" },
			"owning_group": { "type": "keyword" },
			"is_legacy": { "type": "boolean" },
			"commit": {
				"properties": {
					"hash": { "type": "keyword" },
					"author": { "type": "keyword" },
					"date": { "type": "date" }
				}
			}
		 }
	    }
	}`

	_, err = es.Indices.Create(indexName, es.Indices.Create.WithBody(strings.NewReader(mapping)))

	if err != nil {
		fmt.Printf("Problem creating index: %v\n", err)
	}

	bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
		Index:         indexName,
		Client:        es,
		NumWorkers:    runtime.NumCPU(),
		FlushBytes:    5e+6,
		FlushInterval: 30 * time.Second, // The periodic flush interval
	})

	if err != nil {
		log.Fatalf("Error creating the indexer: %s", err)
	}

	var countSuccessful uint64

	for i, vis := range visualizations {
		// Prepare the data payload: encode vis to JSON

		data, err := json.Marshal(vis)
		if err != nil {
			log.Fatalf("Cannot encode visualization %d: %s", i, err)
		}

		// >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
		//
		// Add an item to the BulkIndexer
		//
		err = bi.Add(
			context.Background(),
			esutil.BulkIndexerItem{
				// Action field configures the operation to perform (index, create, delete, update)
				Action: "index",

				// DocumentID is the (optional) document ID
				// DocumentID: strconv.Itoa(a.ID),

				// Body is an `io.Reader` with the payload
				Body: bytes.NewReader(data),

				// OnSuccess is called for each successful operation
				OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
					atomic.AddUint64(&countSuccessful, 1)
				},

				// OnFailure is called for each failed operation
				OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
					if err != nil {
						log.Printf("ERROR: %s", err)
					} else {
						log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason)
					}
				},
			},
		)

		if err != nil {
			log.Fatalf("Unexpected error: %s", err)
		}
	}

	// Close the indexer
	if err := bi.Close(context.Background()); err != nil {
		log.Fatalf("Unexpected error: %s", err)
	}

	biStats := bi.Stats()

	log.Printf("%v", biStats)

	// beatsData := collectBeats()
	// for _, vis := range beatsData {
	// 	fmt.Printf("%v\n", vis)
	// }
}