lib/persistedretry/writeback/store.go (133 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package writeback import ( "errors" "fmt" "time" "github.com/uber/kraken/lib/persistedretry" "github.com/jmoiron/sqlx" "github.com/mattn/go-sqlite3" ) // Store stores writeback tasks. type Store struct { db *sqlx.DB } // NewStore creates a new Store. func NewStore(db *sqlx.DB) *Store { return &Store{db} } // GetPending returns all pending tasks. func (s *Store) GetPending() ([]persistedretry.Task, error) { return s.selectStatus("pending") } // GetFailed returns all failed tasks. func (s *Store) GetFailed() ([]persistedretry.Task, error) { return s.selectStatus("failed") } // AddPending adds r as pending. func (s *Store) AddPending(r persistedretry.Task) error { return s.addWithStatus(r, "pending") } // AddFailed adds r as failed. func (s *Store) AddFailed(r persistedretry.Task) error { return s.addWithStatus(r, "failed") } // MarkPending marks r as pending. func (s *Store) MarkPending(r persistedretry.Task) error { res, err := s.db.NamedExec(` UPDATE writeback_task SET status = "pending" WHERE namespace=:namespace AND name=:name `, r.(*Task)) if err != nil { return err } if n, err := res.RowsAffected(); err != nil { panic("driver does not support RowsAffected") } else if n == 0 { return persistedretry.ErrTaskNotFound } return nil } // MarkFailed marks r as failed. func (s *Store) MarkFailed(r persistedretry.Task) error { t := r.(*Task) res, err := s.db.NamedExec(` UPDATE writeback_task SET last_attempt = CURRENT_TIMESTAMP, failures = failures + 1, status = "failed" WHERE namespace=:namespace AND name=:name `, t) if err != nil { return err } if n, err := res.RowsAffected(); err != nil { panic("driver does not support RowsAffected") } else if n == 0 { return persistedretry.ErrTaskNotFound } t.Failures++ t.LastAttempt = time.Now() return nil } // Remove removes r. func (s *Store) Remove(r persistedretry.Task) error { _, err := s.db.NamedExec(` DELETE FROM writeback_task WHERE namespace=:namespace AND name=:name `, r.(*Task)) return err } // Find finds tasks matching query. func (s *Store) Find(query interface{}) ([]persistedretry.Task, error) { var tasks []*Task var err error switch q := query.(type) { case *NameQuery: err = s.db.Select(&tasks, ` SELECT namespace, name, created_at, last_attempt, failures, delay FROM writeback_task WHERE name=? `, q.name) default: return nil, errors.New("unknown query type") } if err != nil { return nil, err } return convert(tasks), nil } func (s *Store) addWithStatus(r persistedretry.Task, status string) error { query := fmt.Sprintf(` INSERT INTO writeback_task ( namespace, name, last_attempt, failures, delay, status ) VALUES ( :namespace, :name, :last_attempt, :failures, :delay, %q ) `, status) _, err := s.db.NamedExec(query, r.(*Task)) if se, ok := err.(sqlite3.Error); ok { if se.ExtendedCode == sqlite3.ErrConstraintPrimaryKey { return persistedretry.ErrTaskExists } } return err } func (s *Store) selectStatus(status string) ([]persistedretry.Task, error) { var tasks []*Task err := s.db.Select(&tasks, ` SELECT namespace, name, created_at, last_attempt, failures, delay FROM writeback_task WHERE status=? `, status) if err != nil { return nil, err } return convert(tasks), nil } func convert(tasks []*Task) (result []persistedretry.Task) { for _, t := range tasks { result = append(result, t) } return result }