plugins/storage/rdbms/init.go (193 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under the MIT license found in the // LICENSE file in the root directory of this source tree. package rdbms import ( "database/sql" "fmt" "sync" "time" "github.com/facebookincubator/contest/pkg/event/frameworkevent" "github.com/facebookincubator/contest/pkg/event/testevent" "github.com/facebookincubator/contest/pkg/storage" log "github.com/sirupsen/logrus" "github.com/facebookincubator/contest/tools/migration/rdbms/migrationlib" // this blank import registers the mysql driver _ "github.com/go-sql-driver/mysql" ) // txbeginner defines an interface for a backend which supports beginning a transaction type txbeginner interface { Begin() (*sql.Tx, error) } // db defines an interface for a backend that supports Query and Exec Operations type db interface { Exec(query string, args ...interface{}) (sql.Result, error) Query(query string, args ...interface{}) (*sql.Rows, error) } // tx defines an interface for a backend that supports transaction like operations type tx interface { Commit() error Rollback() error } // RDBMS implements a storage engine which stores ConTest information in a relational // database via the database/sql package. With the current implementation, only MySQL // is officially supported. Within MySQL, the current limitations are the following: // // It's not possible to use prepared statements. Not all MySQL connectors // implementing database/sql support prepared statements, so the plugin cannot // depend on them. type RDBMS struct { dbURI, driverName string buffTestEvents []testevent.Event buffFrameworkEvents []frameworkevent.Event testEventsLock sync.Mutex frameworkEventsLock sync.Mutex closeCh chan struct{} closeWG *sync.WaitGroup // sql.Tx is not safe for concurrent use. This means that both Query, Exec operations // and rows scanning should be serialized. txLock is acquired and released by all // methods of the public interface exposed by RDBMS. txLock sync.Mutex db db sqlDB *sql.DB // Events are buffered internally before being flushed to the database. // Buffer size and flush interval are defined per-buffer, as there is // a separate buffer for TestEvent and FrameworkEvent testEventsFlushSize int testEventsFlushInterval time.Duration frameworkEventsFlushSize int frameworkEventsFlushInterval time.Duration } func (r *RDBMS) lockTx() { if _, ok := r.db.(tx); !ok { return } r.txLock.Lock() } func (r *RDBMS) unlockTx() { if _, ok := r.db.(tx); !ok { return } r.txLock.Unlock() } // BeginTx returns a storage.TransactionalStorage object backed by a transactional db object func (r *RDBMS) BeginTx() (storage.TransactionalStorage, error) { txdb, ok := r.db.(txbeginner) if !ok { return nil, fmt.Errorf("backend does not support initiating a transaction") } tx, err := txdb.Begin() if err != nil { return nil, err } return &RDBMS{db: tx, sqlDB: r.sqlDB, closeCh: r.closeCh, closeWG: r.closeWG}, nil } // Commit persists the current transaction, if there is one active func (r *RDBMS) Commit() error { tx, ok := r.db.(tx) if !ok { return fmt.Errorf("no active transaction") } return tx.Commit() } // Rollback rolls back the current transaction, if there is one active func (r *RDBMS) Rollback() error { tx, ok := r.db.(tx) if !ok { return fmt.Errorf("no active transaction") } return tx.Rollback() } // Close flushes pending events and closes the database connection. func (r *RDBMS) Close() error { close(r.closeCh) r.closeWG.Wait() r.sqlDB.Close() r.sqlDB = nil r.db = nil return nil } // Version returns the current version of the RDBMS schema func (r *RDBMS) Version() (uint64, error) { return migrationlib.DBVersion(r.db) } // Reset wipes entire database contents. Used in tests. func (r *RDBMS) Reset() error { for _, t := range []string{"jobs", "job_tags", "run_reports", "final_reports", "test_events", "framework_events"} { if _, err := r.db.Exec(fmt.Sprintf("TRUNCATE TABLE %s", t)); err != nil { return err } } return nil } func (r *RDBMS) init() error { driverName := "mysql" if r.driverName != "" { driverName = r.driverName } sqlDB, err := sql.Open(driverName, r.dbURI) if err != nil { return fmt.Errorf("could not initialize database: %w", err) } if err := sqlDB.Ping(); err != nil { sqlDB.Close() return fmt.Errorf("unable to contact database: %w", err) } r.db = sqlDB r.sqlDB = sqlDB if r.testEventsFlushInterval > 0 { r.closeWG.Add(1) go func() { done := false for !done { select { case <-time.After(r.testEventsFlushInterval): if err := r.flushTestEvents(); err != nil { log.Warningf("Failed to flush test events: %v", err) } case <-r.closeCh: // Flush one last time if err := r.flushTestEvents(); err != nil { log.Warningf("Failed to flush test events: %v", err) } done = true } } r.closeWG.Done() }() } if r.frameworkEventsFlushInterval > 0 { r.closeWG.Add(1) go func() { done := false for !done { select { case <-time.After(r.frameworkEventsFlushInterval): if err := r.flushFrameworkEvents(); err != nil { log.Warningf("Failed to flush test events: %v", err) } case <-r.closeCh: // Flush one last time if err := r.flushFrameworkEvents(); err != nil { log.Warningf("Failed to flush test events: %v", err) } done = true } } r.closeWG.Done() }() } return nil } // Opt is a function type that sets parameters on the RDBMS object type Opt func(rdbms *RDBMS) // TestEventsFlushSize defines maximum size of the test events buffer after which // events are flushed to the database. func TestEventsFlushSize(flushSize int) Opt { return func(rdbms *RDBMS) { rdbms.testEventsFlushSize = flushSize } } // TestEventsFlushInterval defines the interval at which buffered test events are // stored into the database func TestEventsFlushInterval(flushInterval time.Duration) Opt { return func(rdbms *RDBMS) { rdbms.testEventsFlushInterval = flushInterval } } // FrameworkEventsFlushSize defines maximum size of the framework events buffer // after which events are flushed to the database. func FrameworkEventsFlushSize(flushSize int) Opt { return func(rdbms *RDBMS) { rdbms.frameworkEventsFlushSize = flushSize } } // FrameworkEventsFlushInterval defines the interval at which buffered framework // events are stored into the database func FrameworkEventsFlushInterval(flushInterval time.Duration) Opt { return func(rdbms *RDBMS) { rdbms.frameworkEventsFlushInterval = flushInterval } } // DriverName allows using a mysql-compatible driver (e.g. a wrapper around mysql // or a syntax-compatible variant). func DriverName(name string) Opt { return func(rdbms *RDBMS) { rdbms.driverName = name } } // New creates a RDBMS events storage backend with default parameters func New(dbURI string, opts ...Opt) (storage.Storage, error) { // Default flushInterval and buffer sizes are zero (i.e., by default the backend is not buffered) rdbms := RDBMS{ dbURI: dbURI, closeCh: make(chan struct{}), closeWG: &sync.WaitGroup{}, } for _, Opt := range opts { Opt(&rdbms) } if err := rdbms.init(); err != nil { return nil, err } return &rdbms, nil }