pkg/degradation-detector/postDegradation.go (118 lines of code) (raw):

package degradation_detector import ( "bytes" "context" "encoding/json" "errors" "fmt" "log/slog" "net/http" "sync" "time" "github.com/JetBrains/ij-perf-report-aggregator/pkg/server/meta" ) type InsertionResults struct { Degradation DegradationWithSettings Error error } func FilterErrors(insertionResults <-chan InsertionResults) <-chan DegradationWithSettings { ch := make(chan DegradationWithSettings, 100) go func() { for result := range insertionResults { if result.Error != nil { slog.Error("error while inserting degradation", "error", result.Error, "degradation", result.Degradation) continue } ch <- result.Degradation } close(ch) }() return ch } type accidentWriter interface { DBTestName() string } func (s PerformanceSettings) DBTestName() string { return s.Project + "/" + s.Metric } func (s StartupSettings) DBTestName() string { return s.Product + "/" + s.Project + "/" + s.Metric } func (s FleetStartupSettings) DBTestName() string { return "fleet" + "/" + s.Metric } type accidentState struct { mu sync.Mutex posted bool // true if POST attempt was made successfullyInserted bool // true if got 200 OK (newly inserted) } func PostDegradations(client *http.Client, backendURL string, degradations <-chan DegradationWithSettings) chan InsertionResults { url := backendURL + "/api/meta/accidents" insertionResults := make(chan InsertionResults, 100) go func() { defer close(insertionResults) var wg sync.WaitGroup accidentStates := sync.Map{} // map[string]*accidentState for degradation := range degradations { wg.Go(func() { d := degradation.Details if d.timestamp < time.Now().Add(-672*time.Hour).UnixMilli() { // Do not post degradations older than 28 days return } accidentKey := fmt.Sprintf("%s:%s", degradation.Settings.DBTestName(), d.Build) stateInterface, _ := accidentStates.LoadOrStore(accidentKey, &accidentState{}) state, ok := stateInterface.(*accidentState) if !ok { insertionResults <- InsertionResults{Error: errors.New("unexpected type in accidentStates map")} return } state.mu.Lock() defer state.mu.Unlock() if state.posted { if state.successfullyInserted { insertionResults <- InsertionResults{DegradationWithSettings{d, degradation.Settings}, nil} } return } state.posted = true date := time.UnixMilli(d.timestamp).UTC().Format("2006-01-02") medianMessage := getMessageBasedOnMedianChange(d.medianValues) kind := "InferredRegression" if !d.IsDegradation { kind = "InferredImprovement" } insertParams := meta.AccidentInsertParams{Date: date, Test: degradation.Settings.DBTestName(), Kind: kind, Reason: medianMessage, BuildNumber: d.Build, UserName: "R2D2"} params, err := json.Marshal(insertParams) if err != nil { insertionResults <- InsertionResults{Error: fmt.Errorf("failed to marshal query: %w", err)} return } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(params)) if err != nil { insertionResults <- InsertionResults{Error: fmt.Errorf("failed to create request: %w", err)} return } req.Header.Set("Content-Type", "application/json") resp, err := client.Do(req) if err != nil { insertionResults <- InsertionResults{Error: fmt.Errorf("failed to send POST request: %w", err)} return } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { state.successfullyInserted = true insertionResults <- InsertionResults{DegradationWithSettings{d, degradation.Settings}, nil} return } // the accident already exists if resp.StatusCode == http.StatusConflict { return } insertionResults <- InsertionResults{Error: fmt.Errorf("failed to post Details: %v", resp.Status)} }) } wg.Wait() }() return insertionResults }