main.go (439 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package main import ( "bufio" "bytes" "encoding/json" "flag" "fmt" "go/build" "io" "log" "net/http" "net/url" "os" "os/exec" "runtime" "strconv" "strings" "time" "github.com/blang/semver" "github.com/kr/pretty" "github.com/pkg/errors" "golang.org/x/tools/benchmark/parse" "golang.org/x/tools/go/vcs" ) var ( tagsFlag = flag.String( "tag", "", "comma-separated list of key=value pairs to add to each document", ) verboseFlag = flag.Bool("v", false, "Be verbose") ) type esError struct { Type string `json:"type"` Reason string `json:"reason"` } func (e *esError) Error() string { return e.Reason } const ( exceptionResourceAlreadyExists = "resource_already_exists_exception" ) type elasticsearchConfig struct { host string user string pass string index string } type benchmark struct { parse.Benchmark extra map[string]float64 } type fieldProperties map[string]interface{} const ( fieldExecutedAt = "executed_at" fieldName = "name" fieldIterations = "iterations" fieldPkg = "pkg" fieldHostname = "hostname" fieldGoVersion = "go_version" fieldOSVersion = "os_version" fieldGOOS = "goos" fieldGOARCH = "goarch" fieldNSPerOp = "ns_per_op" fieldMBPerS = "mb_per_s" fieldAllocedBytesPerOp = "alloced_bytes_per_op" fieldAllocsPerOp = "allocs_per_op" fieldGit = "git" fieldGitCommit = "commit" fieldGitSubject = "subject" fieldGitCommitter = "committer" fieldGitCommitterDate = "date" fieldExtraMetrics = "extra_metrics" ) var ( esFieldProperties = map[string]fieldProperties{ fieldExecutedAt: {"type": "date"}, fieldName: {"type": "keyword"}, fieldIterations: {"type": "long"}, fieldPkg: {"type": "keyword"}, fieldHostname: {"type": "keyword"}, fieldGoVersion: {"type": "keyword"}, fieldOSVersion: {"type": "keyword"}, fieldGOOS: {"type": "keyword"}, fieldGOARCH: {"type": "keyword"}, fieldNSPerOp: {"type": "double"}, fieldMBPerS: {"type": "double"}, fieldAllocedBytesPerOp: {"type": "long"}, fieldAllocsPerOp: {"type": "long"}, fieldGit: { "properties": map[string]fieldProperties{ fieldGitCommit: {"type": "text"}, fieldGitSubject: {"type": "text"}, fieldGitCommitter: { "properties": map[string]fieldProperties{ fieldGitCommitterDate: {"type": "date"}, }, }, }, }, } esExtraMetricsDynamicTemplate = map[string]interface{}{ fieldExtraMetrics: map[string]interface{}{ "path_match": "extra_metrics.*", "mapping": map[string]string{ "type": "float", }, }, } ) func main() { var esConfig elasticsearchConfig flag.StringVar(&esConfig.host, "es", "", `Elasticsearch URL into which the benchmark data should be indexed, e.g. http://localhost:9200`, ) flag.StringVar(&esConfig.index, "index", "gobench", "Elasticsearch index into which the benchmarks should be stored.", ) flag.StringVar(&esConfig.user, "es-username", "", "Elasticsearch username used for authentication.", ) flag.StringVar(&esConfig.pass, "es-password", "", "Elasticsearch password used for authentication.", ) flag.Parse() tags := make(map[string]string) for _, field := range strings.Split(*tagsFlag, ",") { field = strings.TrimSpace(field) if field == "" { continue } i := strings.IndexRune(field, '=') if i == -1 { fmt.Fprintf( os.Stderr, "invalid key-value pair %q in -tags: missing '='\n", field, ) os.Exit(2) } key, value := field[:i], field[i+1:] tags[strings.TrimSpace(key)] = strings.TrimSpace(value) } var output io.Writer var buf bytes.Buffer var esURL *url.URL if esConfig.host != "" { url, err := url.Parse(esConfig.host) if err != nil { fmt.Fprintf(os.Stderr, "invalid Elasticsearch URL %q: %s\n", esConfig.host, err) os.Exit(2) } esURL = url output = &buf if *verboseFlag { output = io.MultiWriter(output, os.Stdout) } } else { output = os.Stdout } encoder := json.NewEncoder(output) if esURL != nil { if err := createMapping(esConfig); err != nil { log.Fatalf("error creating/updating mapping: %s", err) } } var pkg, goos, goarch string timestamp := time.Now().UTC() scanner := bufio.NewScanner(os.Stdin) for scanner.Scan() { line := scanner.Text() switch { case strings.HasPrefix(line, "pkg:"): pkg = strings.TrimSpace(line[len("pkg:"):]) case strings.HasPrefix(line, "goos:"): goos = strings.TrimSpace(line[len("goos:"):]) case strings.HasPrefix(line, "goarch:"): goarch = strings.TrimSpace(line[len("goarch:"):]) default: if b, err := parse.ParseLine(line); err == nil { result := benchmark{Benchmark: *b} result.extra = parseExtraMetrics(line) encodeIndexOp( encoder, result, pkg, goos, goarch, tags, timestamp, esConfig, ) } } } if err := scanner.Err(); err != nil { log.Fatal(err) } if esURL == nil { // Encoded to stdout. return } bulkURL := *esURL bulkURL.Path += "/_bulk" req, err := http.NewRequest(http.MethodPost, bulkURL.String(), &buf) if esConfig.user != "" && esConfig.pass != "" { req.SetBasicAuth(esConfig.user, esConfig.pass) } req.Header.Set("Content-Type", "application/x-ndjson") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatalf("error executing bulk updates: %s", err) } if err := handleResponse(resp); err != nil { log.Fatalf("error executing bulk updates: %s", err) } } func createMapping(cfg elasticsearchConfig) error { // Versions of Elasticsearch prior to 7.0.0 require type names. esVersion, err := getEsVersion(cfg.host, cfg.user, cfg.pass) if err != nil { return err } includeTypeName := esVersion.LT(semver.MustParse("7.0.0")) var body bytes.Buffer properties := map[string]interface{}{ "properties": esFieldProperties, "dynamic_templates": []interface{}{esExtraMetricsDynamicTemplate}, } if includeTypeName { properties = map[string]interface{}{"_doc": properties} } if err := json.NewEncoder(&body).Encode(map[string]interface{}{"mappings": properties}); err != nil { return err } mappingURL := cfg.host + "/" + cfg.index req, err := http.NewRequest(http.MethodPut, mappingURL, &body) if err != nil { return err } if cfg.user != "" && cfg.pass != "" { req.SetBasicAuth(cfg.user, cfg.pass) } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } if err := handleResponse(resp); err != nil { esErr, ok := err.(*esError) if ok && esErr.Type == exceptionResourceAlreadyExists { if *verboseFlag { log.Printf("index %q already exists", cfg.index) } return nil } return err } return nil } func getEsVersion(host, user, pass string) (*semver.Version, error) { req, err := http.NewRequest("GET", host, nil) if err != nil { return nil, err } if user != "" || pass != "" { req.SetBasicAuth(user, pass) } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } var esVersion struct { Version struct { Number string } `json:"version"` } defer resp.Body.Close() if resp.StatusCode != 200 { return nil, fmt.Errorf("received unexpected %d status code", resp.StatusCode) } if err := json.NewDecoder(resp.Body).Decode(&esVersion); err != nil { return nil, err } return semver.New(esVersion.Version.Number) } func encodeIndexOp( encoder *json.Encoder, b benchmark, pkg, goos, goarch string, tags map[string]string, timestamp time.Time, cfg elasticsearchConfig, ) { doc := map[string]interface{}{ fieldExecutedAt: timestamp, fieldName: b.Name, fieldIterations: b.N, fieldPkg: pkg, fieldGoVersion: runtime.Version(), fieldGOOS: goos, fieldGOARCH: goarch, } if b.Measured&parse.NsPerOp != 0 { doc[fieldNSPerOp] = b.NsPerOp } if b.Measured&parse.MBPerS != 0 { doc[fieldMBPerS] = b.MBPerS } if b.Measured&parse.AllocedBytesPerOp != 0 { doc[fieldAllocedBytesPerOp] = b.AllocedBytesPerOp } if b.Measured&parse.AllocsPerOp != 0 { doc[fieldAllocsPerOp] = b.AllocsPerOp } if len(b.extra) > 0 { apmbench := b.extra doc[fieldExtraMetrics] = apmbench } addHost(doc) addVCS(pkg, doc) for key, value := range tags { doc[key] = value } // Versions of Elasticsearch >= 8.0.0 require no _type field esVersion, err := getEsVersion(cfg.host, cfg.user, cfg.pass) if err != nil { log.Fatal(err) } includeTypDoc := esVersion.LT(semver.MustParse("8.0.0")) type Index struct { Index string `json:"_index"` Type string `json:"_type,omitempty"` } indexAction := struct { Index Index `json:"index"` }{Index: Index{ Index: cfg.index, }} if includeTypDoc { indexAction.Index.Type = "_doc" } if err := encoder.Encode(indexAction); err != nil { log.Fatal(err) } if err := encoder.Encode(doc); err != nil { log.Fatal(err) } } func handleResponse(resp *http.Response) error { defer resp.Body.Close() if !*verboseFlag && resp.StatusCode == http.StatusOK { return nil } result := make(map[string]interface{}) if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { log.Fatal(err) } if resp.StatusCode == http.StatusOK { pretty.Println(result) return nil } errorObj, ok := result["error"].(map[string]interface{}) if !ok { return errors.Errorf("%s", resp.Status) } return &esError{ Type: errorObj["type"].(string), Reason: errorObj["reason"].(string), } } func addHost(doc map[string]interface{}) { if hostname, err := os.Hostname(); err == nil { doc[fieldHostname] = hostname } switch runtime.GOOS { case "linux": if output, err := exec.Command("uname", "-r").Output(); err == nil { doc[fieldOSVersion] = strings.TrimSpace(string(output)) } } } func addVCS(pkgpath string, doc map[string]interface{}) { pkg, err := build.Import(pkgpath, "", build.FindOnly) if err != nil { return } vcsCmd, _, err := vcs.FromDir(pkg.Dir, pkg.SrcRoot) if err != nil { return } switch vcsCmd.Cmd { case "git": cmd := exec.Command("git", "log", "-1", "--format=%H %ct %s") cmd.Dir = pkg.Dir output, err := cmd.CombinedOutput() if err != nil { return } fields := strings.SplitN(strings.TrimSpace(string(output)), " ", 3) if len(fields) == 3 { gitFields := map[string]interface{}{ fieldGitCommit: fields[0], fieldGitSubject: fields[2], } unixSec, err := strconv.ParseInt(fields[1], 10, 64) if err == nil { committerDate := time.Unix(unixSec, 0).UTC() gitFields[fieldGitCommitter] = map[string]interface{}{ fieldGitCommitterDate: committerDate, } } doc[fieldGit] = gitFields } } } func parseExtraMetrics(line string) map[string]float64 { entries := strings.Split(line, "\t") // If the result has less than 3 columns, it doesn't contain // extra metrics to be reported. if len(entries) < 3 { return nil } result := make(map[string]float64) // Ignore the first three entries since they're fixed to be the benchmark, // name, iterations and ns/op. for _, entry := range entries[3:] { parts := strings.Split(strings.TrimSpace(entry), " ") if len(parts) < 2 { continue } key := strings.TrimSpace(parts[1]) value, err := strconv.ParseFloat(strings.TrimSpace(parts[0]), 64) if err != nil { continue } switch key { case "ns/op", "MB/s", "B/op", "allocs/op": // Ignore the native benchmark fields continue default: escapedKey := strings.ReplaceAll(key, "/", "_") result[escapedKey] = value } } if len(result) > 0 { return result } return nil }