index.go (477 lines of code) (raw):

package main import ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "os" "os/exec" "path/filepath" "regexp" "strings" "context" "runtime" "sync/atomic" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esutil" "github.com/elastic/kbncontent" "sigs.k8s.io/yaml" ) type CommitData struct { Hash string `json:"hash"` Author string `json:"author"` Date string `json:"date"` } func getCommitData(path string) (CommitData, error) { dir, file := filepath.Split(path) gitCmd := fmt.Sprintf("cd %s && git log --date=iso-strict \"%s\"", dir, file) cmd := exec.Command("sh", "-c", gitCmd) var out bytes.Buffer cmd.Stdout = &out if err := cmd.Run(); err != nil { return CommitData{}, err } stdout := out.String() reg := regexp.MustCompile("commit (.+)\nAuthor: (.*)\nDate: (.*)") submatches := reg.FindSubmatch([]byte(stdout)) commitData := CommitData{Hash: string(submatches[1]), Author: strings.TrimSpace(string(submatches[2])), Date: strings.TrimSpace(string(submatches[3]))} return commitData, nil } type Visualization struct { Doc map[string]interface{} `json:"doc"` SoType string `json:"soType"` App string `json:"app"` Source string `json:"source"` Link string `json:"link"` Dashboard string `json:"dashboard"` Path string `json:"path"` Commit CommitData `json:"commit"` Manifest map[string]interface{} `json:"manifest"` GithubOwner string `json:"gh_owner"` VisType string `json:"vis_type,omitempty"` TSVBType string `json:"vis_tsvb_type,omitempty"` VisTitle string `json:"vis_title,omitempty"` IsLegacy bool `json:"is_legacy"` OwningGroup string `json:"owning_group"` } func getGithubOwner(manifest map[string]interface{}) string { if githubOwner, ok := manifest["owner"].(map[string]interface{})["github"].(string); ok { return githubOwner } return "" } func getOwningGroup(githubOwner string) string { if githubOwner == "" { return "" } if githubOwner == "elastic/security-external-integrations" || githubOwner == "elastic/security-asset-management" { return "security" } else if githubOwner == "elastic/ml-ui" { return "platform" } else { return "observability" } } func collectVisualizationFolder(app, path, source string, dashboards map[string]string, folderName string) []Visualization { visPath := filepath.Join(path, folderName) if _, err := os.Stat(visPath); os.IsNotExist(err) { return nil } var visualizations []Visualization files, err := ioutil.ReadDir(visPath) if err != nil { return nil } for _, file := range files { visFilePath := filepath.Join(visPath, file.Name()) contents, err := ioutil.ReadFile(visFilePath) if err != nil { continue } var doc map[string]interface{} if err := json.Unmarshal(contents, &doc); err != nil { continue } commit, _ := getCommitData(visFilePath) dashboardTitle, _ := dashboards[doc["id"].(string)] desc, _ := kbncontent.DescribeVisualizationSavedObject(doc) visualization := Visualization{ Doc: desc.Doc, SoType: desc.SavedObjectType, Link: desc.Link, VisType: desc.Type(), TSVBType: desc.TSVBType(), VisTitle: desc.Title(), IsLegacy: desc.IsLegacy(), Path: visFilePath, App: app, Source: source, Dashboard: dashboardTitle, Commit: commit, } visualizations = append(visualizations, visualization) } return visualizations } func collectDashboardFolder(app, path, source string) ([]Visualization, map[string]string) { dashboardPath := filepath.Join(path, "dashboard") if _, err := os.Stat(dashboardPath); os.IsNotExist(err) { return nil, nil } var visualizations []Visualization dashboards := make(map[string]string) files, err := os.ReadDir(dashboardPath) if err != nil { fmt.Printf("Error reading dashboards directory: %v\n", err) return nil, nil } for _, file := range files { dashboardFilePath := filepath.Join(dashboardPath, file.Name()) contents, err := ioutil.ReadFile(dashboardFilePath) if err != nil { fmt.Printf("Error reading dashboard file: %v\n", err) continue } var dashboard map[string]interface{} if err := json.Unmarshal(contents, &dashboard); err != nil { continue } commit, _ := getCommitData(dashboardFilePath) dashboardReferences := dashboard["references"].([]interface{}) for _, reference := range dashboardReferences { ref := reference.(map[string]interface{}) dashboards[ref["id"].(string)] = dashboard["attributes"].(map[string]interface{})["title"].(string) } panels, err := kbncontent.DescribeByValueDashboardPanels(dashboard) if err != nil { fmt.Printf("Issue parsing dashboard panels for %s: %v\n", dashboardFilePath, err) } for _, panel := range panels { visualizations = append(visualizations, Visualization{ Doc: panel.Doc, SoType: panel.SavedObjectType, Link: panel.Link, VisType: panel.Type(), TSVBType: panel.TSVBType(), VisTitle: panel.Title(), IsLegacy: panel.IsLegacy(), App: app, Source: source, Dashboard: dashboard["attributes"].(map[string]interface{})["title"].(string), Path: dashboardFilePath, Commit: commit, }) } } return visualizations, dashboards } // TODO I think some of this logic is superfluous. It may just serve to coerce the manifest into the correct type. func collectManifest(manifestPath string) map[string]interface{} { manifestContents, err := ioutil.ReadFile(manifestPath) if err != nil { log.Printf("Error reading manifest: %v\n", err) } bytes, jsonErr := yaml.YAMLToJSON(manifestContents) if jsonErr != nil { log.Printf("Error converting manifest YAML to JSON: %v\n", jsonErr) } var manifest map[string]interface{} marshalErr := json.Unmarshal(bytes, &manifest) if marshalErr != nil { log.Printf("Error marshalling manifest JSON: %v\n", marshalErr) } return manifest } func CollectIntegrationsVisualizations(integrationsPath string) []Visualization { var allVis []Visualization packages, err := os.ReadDir(filepath.Join(integrationsPath, "packages")) fmt.Printf("Collecting integrations\n") if err != nil { fmt.Printf("Error: %v\n", err) return allVis } for _, packageInfo := range packages { if packageInfo.IsDir() { packagePath := filepath.Join(integrationsPath, "packages", packageInfo.Name(), "kibana") manifestPath := filepath.Join(integrationsPath, "packages", packageInfo.Name(), "manifest.yml") manifest := collectManifest(manifestPath) visualizations, dashboards := collectDashboardFolder(packageInfo.Name(), packagePath, "integration") for _, folderName := range []string{"visualization", "lens", "map", "search"} { visualizations = append(visualizations, collectVisualizationFolder(packageInfo.Name(), packagePath, "integration", dashboards, folderName)...) } fmt.Printf("Collected %d vis in %s\n", len(visualizations), packageInfo.Name()) for _, vis := range visualizations { vis.Manifest = manifest vis.GithubOwner = getGithubOwner(manifest) vis.OwningGroup = getOwningGroup(vis.GithubOwner) allVis = append(allVis, vis) } } } return allVis } func CollectIntegrationsDataStreams(integrationsPath string) []map[string]interface{} { var allDataStreams []map[string]interface{} packages, err := os.ReadDir(filepath.Join(integrationsPath, "packages")) fmt.Printf("Collecting integrations\n") if err != nil { fmt.Printf("Error: %v\n", err) return allDataStreams } for _, packageInfo := range packages { if packageInfo.IsDir() { packagePath := filepath.Join(integrationsPath, "packages", packageInfo.Name()) buildYmlPath := filepath.Join(packagePath, "_dev", "build", "build.yml") dataStreamPackagePath := filepath.Join(integrationsPath, "packages", packageInfo.Name(), "data_stream") dataStreams, err := os.ReadDir(dataStreamPackagePath) if err != nil { fmt.Printf("Error: %v\n", err) } integrationManifestPath := filepath.Join(packagePath, "manifest.yml") integrationManifest := collectManifest(integrationManifestPath) buildYml := collectManifest(buildYmlPath) // check whther the integration has a _dev/benchmark folder benchmarkPath := filepath.Join(packagePath, "_dev", "benchmark") if _, err := os.Stat(benchmarkPath); err == nil { // add flag to the integration manifest integrationManifest["has_benchmark"] = true } else { integrationManifest["has_benchmark"] = false } for _, dataStream := range dataStreams { manifestPath := filepath.Join(dataStreamPackagePath, dataStream.Name(), "manifest.yml") dataStreamManifest := collectManifest(manifestPath) // enrich data stream manifest with integration manifest dataStreamManifest["integration"] = integrationManifest dataStreamManifest["buildYml"] = buildYml // check whether the data streams has a _dev/test/pipeline folder pipelinePath := filepath.Join(dataStreamPackagePath, dataStream.Name(), "_dev", "test", "pipeline") if _, err := os.Stat(pipelinePath); err == nil { // add flag to the data stream manifest dataStreamManifest["has_pipeline_test"] = true } else { dataStreamManifest["has_pipeline_test"] = false } // same for system test systemTestPath := filepath.Join(dataStreamPackagePath, dataStream.Name(), "_dev", "test", "system") if _, err := os.Stat(systemTestPath); err == nil { dataStreamManifest["has_system_test"] = true } else { dataStreamManifest["has_system_test"] = false } allDataStreams = append(allDataStreams, dataStreamManifest) } } } return allDataStreams } // func collectBeats() []map[string]interface{} { // allVis := []map[string]interface{}{} // recurse := func(root string) { // files, err := ioutil.ReadDir(root) // if err != nil { // return // } // for _, file := range files { // if file.IsDir() && file.Name() == "7" { // path := filepath.Join(root, "7") // visualizations, dashboards := collectDashboardFolder(root, path, "beat") // visualizations = append(visualizations, collectVisualizationFolder(root, path, "beat", dashboards, "visualization")...) // visualizations = append(visualizations, collectVisualizationFolder(root, path, "beat", dashboards, "lens")...) // visualizations = append(visualizations, collectVisualizationFolder(root, path, "beat", dashboards, "map")...) // fmt.Printf("Collected %d vis in %s\n", len(visualizations), root) // allVis = append(allVis, visualizations...) // } else if file.IsDir() { // recurse(filepath.Join(root, file.Name())) // } // } // } // recurse("./beats") // return allVis // } func uploadVisualizations(visualizations []Visualization) { indexName := "legacy_vis" es, err := elasticsearch.NewClient(elasticsearch.Config{}) if err != nil { fmt.Printf("problem generating elasticsearch client") } log.Println(es.Info()) _, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true)) if err != nil { fmt.Printf("Problem deleting old index %v", err) } mapping := `{ "mappings": { "properties": { "doc": { "type": "flattened", "depth_limit": 50 }, "manifest": { "type": "flattened" }, "soType": { "type": "keyword" }, "app": { "type": "keyword" }, "source": { "type": "keyword" }, "link": { "type": "keyword" }, "dashboard": { "type": "keyword" }, "path": { "type": "keyword" }, "vis_type": { "type": "keyword" }, "vis_tsvb_type": { "type": "keyword" }, "vis_title": { "type": "keyword" }, "gh_owner": { "type": "keyword" }, "owning_group": { "type": "keyword" }, "is_legacy": { "type": "boolean" }, "commit": { "properties": { "hash": { "type": "keyword" }, "author": { "type": "keyword" }, "date": { "type": "date" } } } } } }` _, err = es.Indices.Create(indexName, es.Indices.Create.WithBody(strings.NewReader(mapping))) if err != nil { fmt.Printf("Problem creating index: %v\n", err) } bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: indexName, Client: es, NumWorkers: runtime.NumCPU(), FlushBytes: 5e+6, FlushInterval: 30 * time.Second, // The periodic flush interval }) if err != nil { log.Fatalf("Error creating the indexer: %s", err) } var countSuccessful uint64 for i, vis := range visualizations { // Prepare the data payload: encode vis to JSON data, err := json.Marshal(vis) if err != nil { log.Fatalf("Cannot encode visualization %d: %s", i, err) } // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Add an item to the BulkIndexer // err = bi.Add( context.Background(), esutil.BulkIndexerItem{ // Action field configures the operation to perform (index, create, delete, update) Action: "index", // DocumentID is the (optional) document ID // DocumentID: strconv.Itoa(a.ID), // Body is an `io.Reader` with the payload Body: bytes.NewReader(data), // OnSuccess is called for each successful operation OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { atomic.AddUint64(&countSuccessful, 1) }, // OnFailure is called for each failed operation OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Fatalf("Unexpected error: %s", err) } } // Close the indexer if err := bi.Close(context.Background()); err != nil { log.Fatalf("Unexpected error: %s", err) } biStats := bi.Stats() log.Printf("%v", biStats) // beatsData := collectBeats() // for _, vis := range beatsData { // fmt.Printf("%v\n", vis) // } } func saveVisualizationsToFile(visualizations []Visualization) { // Marshal the data into a JSON string jsonData, err := json.Marshal(visualizations) if err != nil { fmt.Println("Error marshaling JSON:", err) return } // Define the file path filePath := "result.json" // Create or open the file for writing file, err := os.Create(filePath) if err != nil { fmt.Println("Error creating file:", err) return } defer file.Close() // Write the JSON data to the file _, err = file.Write(jsonData) if err != nil { fmt.Println("Error writing JSON to file:", err) return } fmt.Printf("JSON data saved to %s\n", filePath) } func saveDataStreamsToFile(datastreams []map[string]interface{}) { // Marshal the data into a JSON string jsonData, err := json.Marshal(datastreams) if err != nil { fmt.Println("Error marshaling JSON:", err) return } // Define the file path filePath := "result_data_stream.json" // Create or open the file for writing file, err := os.Create(filePath) if err != nil { fmt.Println("Error creating file:", err) return } defer file.Close() // Write the JSON data to the file _, err = file.Write(jsonData) if err != nil { fmt.Println("Error writing JSON to file:", err) return } fmt.Printf("JSON data saved to %s\n", filePath) } func uploadDatastreams(datastreams []map[string]interface{}) { indexName := "integration_data_stream" es, err := elasticsearch.NewClient(elasticsearch.Config{}) if err != nil { fmt.Printf("problem generating elasticsearch client") } log.Println(es.Info()) _, err = es.Indices.Delete([]string{indexName}, es.Indices.Delete.WithIgnoreUnavailable(true)) if err != nil { fmt.Printf("Problem deleting old index %v", err) } mapping := `{ "mappings": { "dynamic_templates": [ { "default_as_keyword": { "match_mapping_type": "*", "path_match": "*default", "runtime": { "type": "keyword" } } }, { "dynamic_as_keyword": { "match_mapping_type": "*", "path_match": "*mappings.dynamic", "runtime": { "type": "keyword" } } } ] } }` _, err = es.Indices.Create(indexName, es.Indices.Create.WithBody(strings.NewReader(mapping))) if err != nil { fmt.Printf("Problem creating index: %v\n", err) } bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ Index: indexName, Client: es, NumWorkers: runtime.NumCPU(), FlushBytes: 5e+6, FlushInterval: 30 * time.Second, // The periodic flush interval }) if err != nil { log.Fatalf("Error creating the indexer: %s", err) } var countSuccessful uint64 for i, vis := range datastreams { // Prepare the data payload: encode vis to JSON data, err := json.Marshal(vis) if err != nil { log.Fatalf("Cannot encode datastream %d: %s", i, err) } // >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> // // Add an item to the BulkIndexer // err = bi.Add( context.Background(), esutil.BulkIndexerItem{ // Action field configures the operation to perform (index, create, delete, update) Action: "index", // DocumentID is the (optional) document ID // DocumentID: strconv.Itoa(a.ID), // Body is an `io.Reader` with the payload Body: bytes.NewReader(data), // OnSuccess is called for each successful operation OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { atomic.AddUint64(&countSuccessful, 1) }, // OnFailure is called for each failed operation OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { if err != nil { log.Printf("ERROR: %s", err) } else { log.Printf("ERROR: %s: %s", res.Error.Type, res.Error.Reason) } }, }, ) if err != nil { log.Fatalf("Unexpected error: %s", err) } } // Close the indexer if err := bi.Close(context.Background()); err != nil { log.Fatalf("Unexpected error: %s", err) } biStats := bi.Stats() log.Printf("%v", biStats) // beatsData := collectBeats() // for _, vis := range beatsData { // fmt.Printf("%v\n", vis) // } } func main() { visualizations := CollectIntegrationsVisualizations("./integrations") saveVisualizationsToFile(visualizations) uploadVisualizations(visualizations) dataStreams := CollectIntegrationsDataStreams("./integrations") saveDataStreamsToFile(dataStreams) uploadDatastreams(dataStreams) }