plugins/storage/rdbms/events.go (332 lines of code) (raw):
// Copyright (c) Facebook, Inc. and its affiliates.
//
// This source code is licensed under the MIT license found in the
// LICENSE file in the root directory of this source tree.
package rdbms
import (
"bytes"
"database/sql"
"encoding/json"
"fmt"
"github.com/facebookincubator/contest/pkg/event"
"github.com/facebookincubator/contest/pkg/event/frameworkevent"
"github.com/facebookincubator/contest/pkg/event/testevent"
"github.com/facebookincubator/contest/pkg/job"
"github.com/facebookincubator/contest/pkg/target"
"github.com/facebookincubator/contest/pkg/types"
"github.com/facebookincubator/contest/pkg/xcontext"
)
func assembleQuery(baseQuery bytes.Buffer, selectClauses []string) (string, error) {
if len(selectClauses) == 0 {
return "", fmt.Errorf("no select clauses available, the query should specify at least one clause")
}
initialClause := true
for _, clause := range selectClauses {
if initialClause {
baseQuery.WriteString(fmt.Sprintf(" where %s", clause))
initialClause = false
} else {
baseQuery.WriteString(fmt.Sprintf(" and %s", clause))
}
}
baseQuery.WriteString(" order by event_id")
return baseQuery.String(), nil
}
func buildEventQuery(baseQuery bytes.Buffer, eventQuery *event.Query) ([]string, []interface{}) {
selectClauses := []string{}
fields := []interface{}{}
if eventQuery != nil && eventQuery.JobID != 0 {
selectClauses = append(selectClauses, "job_id=?")
fields = append(fields, eventQuery.JobID)
}
if eventQuery != nil && len(eventQuery.EventNames) != 0 {
if len(eventQuery.EventNames) == 1 {
selectClauses = append(selectClauses, "event_name=?")
} else {
queryStr := "event_name in"
for i := 0; i < len(eventQuery.EventNames); i++ {
if i == 0 {
queryStr = fmt.Sprintf("%s (?", queryStr)
} else if i < len(eventQuery.EventNames)-1 {
queryStr = fmt.Sprintf("%s, ?", queryStr)
} else {
queryStr = fmt.Sprintf("%s, ?)", queryStr)
}
}
selectClauses = append(selectClauses, queryStr)
}
for i := 0; i < len(eventQuery.EventNames); i++ {
fields = append(fields, eventQuery.EventNames[i])
}
}
if eventQuery != nil && !eventQuery.EmittedStartTime.IsZero() {
selectClauses = append(selectClauses, "emit_time>=?")
fields = append(fields, eventQuery.EmittedStartTime)
}
if eventQuery != nil && !eventQuery.EmittedEndTime.IsZero() {
selectClauses = append(selectClauses, "emit_time<=?")
fields = append(fields, eventQuery.EmittedStartTime)
}
return selectClauses, fields
}
func buildFrameworkEventQuery(baseQuery bytes.Buffer, frameworkEventQuery *frameworkevent.Query) (string, []interface{}, error) {
selectClauses, fields := buildEventQuery(baseQuery, &frameworkEventQuery.Query)
query, err := assembleQuery(baseQuery, selectClauses)
if err != nil {
return "", nil, fmt.Errorf("could not assemble query for framework events: %v", err)
}
return query, fields, nil
}
func buildTestEventQuery(baseQuery bytes.Buffer, testEventQuery *testevent.Query) (string, []interface{}, error) {
if testEventQuery == nil {
return "", nil, fmt.Errorf("cannot build empty testevent query")
}
selectClauses, fields := buildEventQuery(baseQuery, &testEventQuery.Query)
if testEventQuery.RunID != types.RunID(0) {
selectClauses = append(selectClauses, "run_id=?")
fields = append(fields, testEventQuery.RunID)
}
if testEventQuery.TestName != "" {
selectClauses = append(selectClauses, "test_name=?")
fields = append(fields, testEventQuery.TestName)
}
if testEventQuery.TestStepLabel != "" {
selectClauses = append(selectClauses, "test_step_label=?")
fields = append(fields, testEventQuery.TestStepLabel)
}
query, err := assembleQuery(baseQuery, selectClauses)
if err != nil {
return "", nil, fmt.Errorf("could not assemble query for framework events: %v", err)
}
return query, fields, nil
}
// TestEventField is a function type which retrieves information from a TestEvent object.
type TestEventField func(ev testevent.Event) interface{}
// TestEventJobID returns the JobID from an events.TestEvent object
func TestEventJobID(ev testevent.Event) interface{} {
if ev.Header == nil {
return nil
}
return ev.Header.JobID
}
// TestEventRunID returns the RunID from a
func TestEventRunID(ev testevent.Event) interface{} {
if ev.Header == nil {
return nil
}
return ev.Header.RunID
}
// TestEventTestName returns the test name from an events.TestEvent object
func TestEventTestName(ev testevent.Event) interface{} {
if ev.Header == nil {
return nil
}
return ev.Header.TestName
}
// TestEventTestStepLabel returns the test step label from an events.TestEvent object
func TestEventTestStepLabel(ev testevent.Event) interface{} {
if ev.Header == nil {
return nil
}
return ev.Header.TestStepLabel
}
// TestEventName returns the event name from an events.TestEvent object
func TestEventName(ev testevent.Event) interface{} {
if ev.Data == nil {
return nil
}
return ev.Data.EventName
}
// TestEventTargetID returns the target id from an events.TestEvent object
func TestEventTargetID(ev testevent.Event) interface{} {
if ev.Data == nil || ev.Data.Target == nil {
return nil
}
return ev.Data.Target.ID
}
// TestEventPayload returns the payload from an events.TestEvent object
func TestEventPayload(ev testevent.Event) interface{} {
if ev.Data == nil {
return nil
}
return ev.Data.Payload
}
// TestEventEmitTime returns the emission timestamp from an events.TestEvent object
func TestEventEmitTime(ev testevent.Event) interface{} {
return ev.EmitTime
}
// StoreTestEvent appends an event to the internal buffer and triggers a flush
// when the internal storage utilization goes beyond `testEventsFlushSize`
func (r *RDBMS) StoreTestEvent(_ xcontext.Context, event testevent.Event) error {
defer r.testEventsLock.Unlock()
r.testEventsLock.Lock()
r.buffTestEvents = append(r.buffTestEvents, event)
if len(r.buffTestEvents) >= r.testEventsFlushSize {
return r.flushTestEventsLocked()
}
return nil
}
// flushTestEventsLocked forces a flush of the pending test events to the database.
// Requires that the caller has already locked the corresponding buffer.
func (r *RDBMS) flushTestEventsLocked() error {
r.lockTx()
defer r.unlockTx()
insertStatement := "insert into test_events (job_id, run_id, test_name, test_step_label, event_name, target_id, payload, emit_time) values (?, ?, ?, ?, ?, ?, ?, ?)"
for _, event := range r.buffTestEvents {
_, err := r.db.Exec(
insertStatement,
TestEventJobID(event),
TestEventRunID(event),
TestEventTestName(event),
TestEventTestStepLabel(event),
TestEventName(event),
TestEventTargetID(event),
TestEventPayload(event),
TestEventEmitTime(event))
if err != nil {
return fmt.Errorf("could not store event in database: %v", err)
}
}
r.buffTestEvents = nil
return nil
}
// flushTestEvents forces a flush of the pending test events to the database.
func (r *RDBMS) flushTestEvents() error {
r.testEventsLock.Lock()
defer r.testEventsLock.Unlock()
return r.flushTestEventsLocked()
}
// GetTestEvents retrieves test events matching the query fields provided
func (r *RDBMS) GetTestEvents(ctx xcontext.Context, eventQuery *testevent.Query) ([]testevent.Event, error) {
// Flush pending events before Get operations
err := r.flushTestEvents()
if err != nil {
return nil, fmt.Errorf("could not flush events before reading events: %v", err)
}
r.lockTx()
defer r.unlockTx()
baseQuery := bytes.Buffer{}
baseQuery.WriteString("select event_id, job_id, run_id, test_name, test_step_label, event_name, target_id, payload, emit_time from test_events")
query, fields, err := buildTestEventQuery(baseQuery, eventQuery)
if err != nil {
return nil, fmt.Errorf("could not execute select query for test events: %v", err)
}
results := []testevent.Event{}
ctx.Debugf("Executing query: %s, fields: %v", query, fields)
rows, err := r.db.Query(query, fields...)
if err != nil {
return nil, err
}
// TargetID might be null, so a type which supports null should be used with Scan
var (
targetID sql.NullString
payload sql.NullString
)
defer func() {
err := rows.Close()
if err != nil {
ctx.Warnf("could not close rows for test events: %v", err)
}
}()
for rows.Next() {
data := testevent.Data{}
header := testevent.Header{}
event := testevent.New(&header, &data)
var eventID int
err := rows.Scan(
&eventID,
&header.JobID,
&header.RunID,
&header.TestName,
&header.TestStepLabel,
&data.EventName,
&targetID,
&payload,
&event.EmitTime,
)
if err != nil {
return nil, fmt.Errorf("could not read results from db: %v", err)
}
if targetID.Valid {
t := target.Target{ID: targetID.String}
data.Target = &t
}
if payload.Valid {
rawPayload := json.RawMessage(payload.String)
data.Payload = &rawPayload
}
results = append(results, event)
}
return results, nil
}
// FrameworkEventField is a function type which retrieves information from a FrameworkEvent object
type FrameworkEventField func(ev frameworkevent.Event) interface{}
// FrameworkEventJobID returns the JobID from a events.TestEvent object
func FrameworkEventJobID(ev frameworkevent.Event) interface{} {
return ev.JobID
}
// FrameworkEventName returns the name for the FrameworkEvent object
func FrameworkEventName(ev frameworkevent.Event) interface{} {
return ev.EventName
}
// FrameworkEventPayload returns the payload from a events.FrameworkEvent object
func FrameworkEventPayload(ev frameworkevent.Event) interface{} {
return ev.Payload
}
// FrameworkEventEmitTime returns the emission timestamp from a events.FrameworkEvent object
func FrameworkEventEmitTime(ev frameworkevent.Event) interface{} {
return ev.EmitTime
}
// StoreFrameworkEvent appends an event to the internal buffer and triggers a flush
// when the internal storage utilization goes beyond `frameworkEventsFlushSize`
func (r *RDBMS) StoreFrameworkEvent(ctx xcontext.Context, event frameworkevent.Event) error {
defer r.frameworkEventsLock.Unlock()
r.frameworkEventsLock.Lock()
r.buffFrameworkEvents = append(r.buffFrameworkEvents, event)
if len(r.buffFrameworkEvents) >= r.frameworkEventsFlushSize {
return r.flushFrameworkEventsLocked()
}
return nil
}
const (
insertFEStmt = "INSERT INTO framework_events (job_id, event_name, payload, emit_time) VALUES (?, ?, ?, ?)"
updateJobStateStmt = "UPDATE jobs SET state = ? WHERE job_id = ?"
)
// flushFrameworkEventsLocked forces a flush of the pending frameworks events to the database
// Requires that the caller has already locked the corresponding buffer.
func (r *RDBMS) flushFrameworkEventsLocked() error {
r.lockTx()
defer r.unlockTx()
// TODO: put this into a transaction.
jobStateUpdates := map[types.JobID]job.State{}
for _, event := range r.buffFrameworkEvents {
_, err := r.db.Exec(
insertFEStmt,
FrameworkEventJobID(event),
FrameworkEventName(event),
FrameworkEventPayload(event),
FrameworkEventEmitTime(event))
if err != nil {
return fmt.Errorf("could not store event in database: %v", err)
}
if sn, err := job.EventNameToJobState(event.EventName); err == nil {
jobStateUpdates[event.JobID] = sn
}
}
for jobID, state := range jobStateUpdates {
if _, err := r.db.Exec(updateJobStateStmt, state, jobID); err != nil {
return fmt.Errorf("could not update state of job %d: %w", jobID, err)
}
}
r.buffFrameworkEvents = nil
return nil
}
// flushFrameworkEvents forces a flush of the pending frameworks events to the database
func (r *RDBMS) flushFrameworkEvents() error {
r.frameworkEventsLock.Lock()
defer r.frameworkEventsLock.Unlock()
return r.flushFrameworkEventsLocked()
}
// GetFrameworkEvent retrieves framework events matching the query fields provided
func (r *RDBMS) GetFrameworkEvent(ctx xcontext.Context, eventQuery *frameworkevent.Query) ([]frameworkevent.Event, error) {
// Flush pending events before Get operations
err := r.flushFrameworkEvents()
if err != nil {
return nil, fmt.Errorf("could not flush events before reading events: %v", err)
}
r.lockTx()
defer r.unlockTx()
baseQuery := bytes.Buffer{}
baseQuery.WriteString(`select event_id, job_id, event_name, payload, emit_time from framework_events`)
query, fields, err := buildFrameworkEventQuery(baseQuery, eventQuery)
if err != nil {
return nil, fmt.Errorf("could not execute select query for test events: %v", err)
}
results := []frameworkevent.Event{}
ctx.Debugf("Executing query: %s, fields: %v", query, fields)
rows, err := r.db.Query(query, fields...)
if err != nil {
return nil, err
}
defer func() {
if err := rows.Close(); err != nil {
ctx.Warnf("could not close rows for framework events: %v", err)
}
}()
for rows.Next() {
event := frameworkevent.New()
var eventID int
err := rows.Scan(&eventID, &event.JobID, &event.EventName, &event.Payload, &event.EmitTime)
if err != nil {
return nil, fmt.Errorf("could not read results from db: %v", err)
}
results = append(results, event)
}
return results, nil
}