plugins/storage/memory/memory.go (282 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package memory import ( "encoding/json" "fmt" "sort" "strings" "sync" "time" "github.com/facebookincubator/contest/pkg/event" "github.com/facebookincubator/contest/pkg/event/frameworkevent" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/job" "github.com/facebookincubator/contest/pkg/storage" "github.com/facebookincubator/contest/pkg/types" "github.com/facebookincubator/contest/pkg/xcontext" ) // Memory implements a storage engine which stores everything in memory. This // storage engine is very inefficient and should be used only for testing // purposes. type Memory struct { lock sync.Mutex testEvents []testevent.Event frameworkEvents []frameworkevent.Event jobIDCounter types.JobID jobInfo map[types.JobID]*jobInfo } type jobInfo struct { request *job.Request desc *job.Descriptor state job.State reports []*job.Report } func emptyEventQuery(eventQuery *event.Query) bool { return eventQuery.JobID == 0 && len(eventQuery.EventNames) == 0 && eventQuery.EmittedStartTime.IsZero() && eventQuery.EmittedEndTime.IsZero() } // emptyFrameworkEventQuery returns whether the Query contains only default values // If so, the Query is considered "empty" and doesn't result in any lookup in the // database func emptyFrameworkEventQuery(eventQuery *frameworkevent.Query) bool { return emptyEventQuery(&eventQuery.Query) } // emptyTestEventQuery returns whether the Query contains only default // values. If so, the Query is considered "empty" and doesn't result in // any lookup in the database func emptyTestEventQuery(eventQuery *testevent.Query) bool { return emptyEventQuery(&eventQuery.Query) && eventQuery.TestName == "" && eventQuery.TestStepLabel == "" } // StoreTestEvent stores a test event into the database func (m *Memory) StoreTestEvent(_ xcontext.Context, event testevent.Event) error { m.lock.Lock() defer m.lock.Unlock() m.testEvents = append(m.testEvents, event) return nil } func eventJobMatch(queryJobID types.JobID, jobID types.JobID) bool { if queryJobID != 0 && jobID != queryJobID { return false } return true } func eventRunMatch(queryRunID, runID types.RunID) bool { if queryRunID != 0 && runID != queryRunID { return false } return true } func eventNameMatch(queryEventNames []event.Name, eventName event.Name) bool { if len(queryEventNames) == 0 { // If no criteria was specified for matching the name of the event, // do not filter it out return true } for _, candidateEventName := range queryEventNames { if eventName == candidateEventName { return true } } return false } func eventTimeMatch(queryStartTime, queryEndTime time.Time, emittedTime time.Time) bool { if !queryStartTime.IsZero() && queryStartTime.Sub(emittedTime) > 0 { return false } if !queryEndTime.IsZero() && emittedTime.Sub(queryEndTime) > 0 { return false } return true } func eventTestMatch(queryTestName, testName string) bool { if queryTestName != "" && testName != queryTestName { return false } return true } func eventTestStepMatch(queryTestStepLabel, testStepLabel string) bool { if queryTestStepLabel != "" && queryTestStepLabel != testStepLabel { return false } return true } // GetTestEvents returns all test events that match the given query. func (m *Memory) GetTestEvents(_ xcontext.Context, eventQuery *testevent.Query) ([]testevent.Event, error) { m.lock.Lock() defer m.lock.Unlock() var matchingTestEvents []testevent.Event if emptyTestEventQuery(eventQuery) { return matchingTestEvents, nil } for _, event := range m.testEvents { if eventJobMatch(eventQuery.JobID, event.Header.JobID) && eventRunMatch(eventQuery.RunID, event.Header.RunID) && eventNameMatch(eventQuery.EventNames, event.Data.EventName) && eventTimeMatch(eventQuery.EmittedStartTime, eventQuery.EmittedEndTime, event.EmitTime) && eventTestMatch(eventQuery.TestName, event.Header.TestName) && eventTestStepMatch(eventQuery.TestStepLabel, event.Header.TestStepLabel) { matchingTestEvents = append(matchingTestEvents, event) } } return matchingTestEvents, nil } // Reset restores the original state of the memory storage layer func (m *Memory) Reset() error { m.lock.Lock() defer m.lock.Unlock() m.testEvents = []testevent.Event{} m.frameworkEvents = []frameworkevent.Event{} m.jobInfo = make(map[types.JobID]*jobInfo) m.jobIDCounter = 1 return nil } // StoreJobRequest stores a new job request func (m *Memory) StoreJobRequest(_ xcontext.Context, request *job.Request) (types.JobID, error) { m.lock.Lock() defer m.lock.Unlock() jobID := m.jobIDCounter m.jobIDCounter++ request.JobID = jobID info := &jobInfo{ request: request, desc: &job.Descriptor{}, state: job.JobStateUnknown, } if err := json.Unmarshal([]byte(request.JobDescriptor), info.desc); err != nil { return 0, fmt.Errorf("invalid job descriptor: %w", err) } m.jobInfo[jobID] = info return jobID, nil } // GetJobRequest retrieves a job request from the in memory list func (m *Memory) GetJobRequest(_ xcontext.Context, jobID types.JobID) (*job.Request, error) { m.lock.Lock() defer m.lock.Unlock() v := m.jobInfo[jobID] if v == nil || v.request == nil { return nil, fmt.Errorf("could not find job request with id %v", jobID) } return v.request, nil } // StoreReport stores a report associated to a job. Returns an error if there is // already a report associated with this run. func (m *Memory) StoreReport(_ xcontext.Context, report *job.Report) error { m.lock.Lock() defer m.lock.Unlock() ji := m.jobInfo[report.JobID] if ji == nil { return fmt.Errorf("could not find job with id %v", report.JobID) } for _, r := range ji.reports { if r.JobID == report.JobID && r.RunID == report.RunID && r.ReporterName == report.ReporterName { return fmt.Errorf("duplicate report %d/%d/%s", r.JobID, r.RunID, r.ReporterName) } } ji.reports = append(ji.reports, report) return nil } // GetJobReport returns the report associated to a given job func (m *Memory) GetJobReport(ctx xcontext.Context, jobID types.JobID) (*job.JobReport, error) { m.lock.Lock() defer m.lock.Unlock() jr := &job.JobReport{JobID: jobID} ji := m.jobInfo[jobID] if ji == nil { // return a job report with no results return jr, nil } // Sort reports by run id and reporter name sort.Slice(ji.reports, func(i, j int) bool { if ji.reports[i].RunID < ji.reports[j].RunID { return true } return strings.Compare(ji.reports[i].ReporterName, ji.reports[i].ReporterName) < 0 }) for _, r := range ji.reports { if r.RunID == 0 { jr.FinalReports = append(jr.FinalReports, r) } else { i := int(r.RunID) - 1 if i > len(jr.RunReports) { ctx.Errorf("Incomplete set of run reports for job %d", jobID) break } if i == len(jr.RunReports) { jr.RunReports = append(jr.RunReports, nil) } jr.RunReports[i] = append(jr.RunReports[i], r) } } return jr, nil } func (m *Memory) ListJobs(_ xcontext.Context, query *storage.JobQuery) ([]types.JobID, error) { m.lock.Lock() defer m.lock.Unlock() res := []types.JobID{} if err := job.CheckTags(query.Tags, true /* allowInternal */); err != nil { return nil, err } jobLoop: for jobId, jobInfo := range m.jobInfo { if len(query.ServerID) > 0 { if jobInfo.request.ServerID != query.ServerID { continue } } if len(query.Tags) > 0 { for _, qTag := range query.Tags { found := false for _, jTag := range jobInfo.desc.Tags { if jTag == qTag { found = true } } if !found { continue jobLoop } } } if len(query.States) > 0 { var lastEventTime time.Time jobState := job.JobStateUnknown for _, event := range m.frameworkEvents { if eventJobMatch(jobId, event.JobID) && eventNameMatch(job.JobStateEvents, event.EventName) && event.EmitTime.After(lastEventTime) { jobState, _ = job.EventNameToJobState(event.EventName) lastEventTime = event.EmitTime } } found := false for _, queryState := range query.States { if jobState == queryState { found = true break } } if !found { continue jobLoop } } res = append(res, jobId) } sort.Slice(res, func(i, j int) bool { return res[i] < res[j] }) return res, nil } // StoreFrameworkEvent stores a framework event into the database func (m *Memory) StoreFrameworkEvent(_ xcontext.Context, event frameworkevent.Event) error { m.lock.Lock() defer m.lock.Unlock() m.frameworkEvents = append(m.frameworkEvents, event) return nil } // GetFrameworkEvent retrieves a framework event from storage func (m *Memory) GetFrameworkEvent(_ xcontext.Context, eventQuery *frameworkevent.Query) ([]frameworkevent.Event, error) { m.lock.Lock() defer m.lock.Unlock() var matchingFrameworkEvents []frameworkevent.Event if emptyFrameworkEventQuery(eventQuery) { return matchingFrameworkEvents, nil } for _, event := range m.frameworkEvents { if eventJobMatch(eventQuery.JobID, event.JobID) && eventNameMatch(eventQuery.EventNames, event.EventName) && eventTimeMatch(eventQuery.EmittedStartTime, eventQuery.EmittedEndTime, event.EmitTime) { matchingFrameworkEvents = append(matchingFrameworkEvents, event) } } return matchingFrameworkEvents, nil } // Close flushes pending events and closes the database connection. func (m *Memory) Close() error { m.lock.Lock() defer m.lock.Unlock() // Invalidate internal state. m.testEvents = nil m.frameworkEvents = nil m.jobInfo = nil return nil } // Version returns the version of the memory storage layer. func (m *Memory) Version() (uint64, error) { return 0, nil } // New create a new Memory events storage backend func New() (storage.ResettableStorage, error) { m := &Memory{ jobInfo: make(map[types.JobID]*jobInfo), jobIDCounter: 1, } return m, nil }