pkg/storage/cassandra/store.go (2,402 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package cassandra import ( "bytes" "compress/gzip" "context" "encoding/json" "fmt" "io/ioutil" "os" "reflect" "sort" "strconv" "strings" "time" "github.com/uber/peloton/.gen/peloton/api/v0/job" "github.com/uber/peloton/.gen/peloton/api/v0/peloton" "github.com/uber/peloton/.gen/peloton/api/v0/query" "github.com/uber/peloton/.gen/peloton/api/v0/task" "github.com/uber/peloton/.gen/peloton/api/v0/update" pb_volume "github.com/uber/peloton/.gen/peloton/api/v0/volume" "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless" v1alphapeloton "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton" "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" "github.com/uber/peloton/.gen/peloton/private/models" versionutil "github.com/uber/peloton/pkg/common/util/entityversion" "github.com/uber/peloton/pkg/common" apiconvertor "github.com/uber/peloton/pkg/common/api" "github.com/uber/peloton/pkg/common/backoff" "github.com/uber/peloton/pkg/common/util" "github.com/uber/peloton/pkg/storage" "github.com/uber/peloton/pkg/storage/cassandra/api" "github.com/uber/peloton/pkg/storage/cassandra/impl" ormcassandra "github.com/uber/peloton/pkg/storage/connectors/cassandra" ormobjects "github.com/uber/peloton/pkg/storage/objects" qb "github.com/uber/peloton/pkg/storage/querybuilder" _ "github.com/gemnasium/migrate/driver/cassandra" // Pull in C* driver for migrate "github.com/gemnasium/migrate/migrate" "github.com/gocql/gocql" "github.com/gogo/protobuf/proto" "github.com/golang/protobuf/ptypes" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/uber-go/tally" "go.uber.org/yarpc/yarpcerrors" ) const ( taskIDFmt = "%s-%d" // DB table names jobConfigTable = "job_config" jobRuntimeTable = "job_runtime" jobIndexTable = "job_index" taskConfigV2Table = "task_config_v2" taskConfigTable = "task_config" taskRuntimeTable = "task_runtime" podEventsTable = "pod_events" updatesTable = "update_info" podWorkflowEventsTable = "pod_workflow_events" frameworksTable = "frameworks" updatesByJobView = "mv_updates_by_job" volumeTable = "persistent_volumes" // DB field names creationTimeField = "creation_time" completionTimeField = "completion_time" stateField = "state" // Task query sort by field hostField = "host" instanceIDField = "instanceId" messageField = "message" nameField = "name" reasonField = "reason" _defaultQueryLimit uint32 = 10 _defaultQueryMaxLimit uint32 = 100 _defaultWorkflowEventsDedupeWarnLimit = 1000 jobIndexTimeFormat = "20060102150405" jobQueryDefaultSpanInDays = 7 jobQueryJitter = time.Second * 30 // _defaultPodEventsLimit is default number of pod events // to read if not provided for jobID + instanceID _defaultPodEventsLimit = 100 // Default context timeout for the method to cleanup old // job updates from the storage _jobUpdatesCleanupTimeout = 120 * time.Second ) // GenerateTestCassandraConfig generates a test config for local C* client // This is meant for sharing testing code only, not for production func GenerateTestCassandraConfig() *Config { return &Config{ CassandraConn: &impl.CassandraConn{ ContactPoints: []string{"127.0.0.1"}, Port: 9043, CQLVersion: "3.4.2", MaxGoRoutines: 1000, }, StoreName: "peloton_test", Migrations: "migrations", Replication: &Replication{ Strategy: "SimpleStrategy", Replicas: []*Replica{ { Name: "replication_factor", Value: 1, }, }, }, } } // ToOrmConfig is needed to generate ORM config from legacy config so that the // ORM code doesn't depend on legacy storage code and can be imported into the // legacy code func ToOrmConfig(c *Config) *ormcassandra.Config { return &ormcassandra.Config{ CassandraConn: &ormcassandra.CassandraConn{ ContactPoints: c.CassandraConn.ContactPoints, Port: c.CassandraConn.Port, Username: c.CassandraConn.Username, Password: c.CassandraConn.Password, Consistency: c.CassandraConn.Consistency, ConnectionsPerHost: c.CassandraConn.ConnectionsPerHost, Timeout: c.CassandraConn.Timeout, SocketKeepalive: c.CassandraConn.SocketKeepalive, ProtoVersion: c.CassandraConn.ProtoVersion, TTL: c.CassandraConn.TTL, LocalDCOnly: c.CassandraConn.LocalDCOnly, DataCenter: c.CassandraConn.DataCenter, PageSize: c.CassandraConn.PageSize, RetryCount: c.CassandraConn.RetryCount, HostPolicy: c.CassandraConn.HostPolicy, TimeoutLimit: c.CassandraConn.TimeoutLimit, CQLVersion: c.CassandraConn.CQLVersion, MaxGoRoutines: c.CassandraConn.MaxGoRoutines, }, StoreName: c.StoreName, } } type luceneClauses []string // AutoMigrate migrates the db schemas for cassandra func (c *Config) AutoMigrate() []error { connString := c.MigrateString() errs, ok := migrate.UpSync(connString, c.Migrations) log.Infof("UpSync complete") if !ok { log.Errorf("UpSync failed with errors: %v", errs) return errs } return nil } // MigrateString returns the db string required for database migration // The code assumes that the keyspace (indicated by StoreName) is already created func (c *Config) MigrateString() string { // see https://github.com/gemnasium/migrate/pull/17 on why disable_init_host_lookup is needed // This is for making local testing faster with docker running on mac connStr := fmt.Sprintf("cassandra://%v:%v/%v?protocol=4&disable_init_host_lookup", c.CassandraConn.ContactPoints[0], c.CassandraConn.Port, c.StoreName) if len(c.CassandraConn.Username) != 0 { connStr = fmt.Sprintf("cassandra://%v:%v@%v:%v/%v", c.CassandraConn.Username, c.CassandraConn.Password, c.CassandraConn.ContactPoints[0], c.CassandraConn.Port, c.StoreName) } connStr = strings.Replace(connStr, " ", "", -1) log.Infof("Cassandra migration string %v", connStr) return connStr } // Store implements JobStore, TaskStore, UpdateStore, FrameworkInfoStore, // and PersistentVolumeStore using a cassandra backend // TODO: Break this up into different files (and or structs) that implement // each of these interfaces to keep code modular. type Store struct { DataStore api.DataStore jobConfigOps ormobjects.JobConfigOps jobRuntimeOps ormobjects.JobRuntimeOps jobUpdateEventsOps ormobjects.JobUpdateEventsOps taskConfigV2Ops ormobjects.TaskConfigV2Ops metrics *storage.Metrics Conf *Config retryPolicy backoff.RetryPolicy } // NewStore creates a Store func NewStore(config *Config, scope tally.Scope) (*Store, error) { dataStore, err := impl.CreateStore(config.CassandraConn, config.StoreName, scope) if err != nil { log.Errorf("Failed to NewStore, err=%v", err) return nil, err } ormStore, ormErr := ormobjects.NewCassandraStore( ToOrmConfig(config), scope) if ormErr != nil { log.WithError(ormErr).Fatal("Failed to create ORM store for Cassandra") } return &Store{ DataStore: dataStore, // DO NOT ADD MORE ORM Objects here. These are added here for // supporting Job.Query() which cannot be fully moved to ORM jobConfigOps: ormobjects.NewJobConfigOps(ormStore), jobRuntimeOps: ormobjects.NewJobRuntimeOps(ormStore), jobUpdateEventsOps: ormobjects.NewJobUpdateEventsOps(ormStore), taskConfigV2Ops: ormobjects.NewTaskConfigV2Ops(ormStore), metrics: storage.NewMetrics(scope.SubScope("storage")), Conf: config, retryPolicy: backoff.NewRetryPolicy(5, 50*time.Millisecond), }, nil } func (s *Store) handleDataStoreError(err error, p backoff.Retrier) error { retry := false newErr := err switch err.(type) { // TBD handle errOverloaded and errBootstrapping after error types added in gocql case *gocql.RequestErrReadFailure: s.metrics.ErrorMetrics.ReadFailure.Inc(1) return yarpcerrors.AbortedErrorf("read failure during statement execution %v", err.Error()) case *gocql.RequestErrWriteFailure: s.metrics.ErrorMetrics.WriteFailure.Inc(1) return yarpcerrors.AbortedErrorf("write failure during statement execution %v", err.Error()) case *gocql.RequestErrAlreadyExists: s.metrics.ErrorMetrics.AlreadyExists.Inc(1) return yarpcerrors.AlreadyExistsErrorf("already exists error during statement execution %v", err.Error()) case *gocql.RequestErrReadTimeout: s.metrics.ErrorMetrics.ReadTimeout.Inc(1) return yarpcerrors.DeadlineExceededErrorf("read timeout during statement execution: %v", err.Error()) case *gocql.RequestErrWriteTimeout: s.metrics.ErrorMetrics.WriteTimeout.Inc(1) return yarpcerrors.DeadlineExceededErrorf("write timeout during statement execution: %v", err.Error()) case *gocql.RequestErrUnavailable: s.metrics.ErrorMetrics.RequestUnavailable.Inc(1) retry = true newErr = yarpcerrors.UnavailableErrorf("request unavailable during statement execution: %v", err.Error()) } switch err { case gocql.ErrTooManyTimeouts: s.metrics.ErrorMetrics.TooManyTimeouts.Inc(1) return yarpcerrors.DeadlineExceededErrorf("too many timeouts during statement execution: %v", err.Error()) case gocql.ErrUnavailable: s.metrics.ErrorMetrics.ConnUnavailable.Inc(1) retry = true newErr = yarpcerrors.UnavailableErrorf("unavailable error during statement execution: %v", err.Error()) case gocql.ErrSessionClosed: s.metrics.ErrorMetrics.SessionClosed.Inc(1) retry = true newErr = yarpcerrors.UnavailableErrorf("session closed during statement execution: %v", err.Error()) case gocql.ErrNoConnections: s.metrics.ErrorMetrics.NoConnections.Inc(1) retry = true newErr = yarpcerrors.UnavailableErrorf("no connections during statement execution: %v", err.Error()) case gocql.ErrConnectionClosed: s.metrics.ErrorMetrics.ConnectionClosed.Inc(1) retry = true newErr = yarpcerrors.UnavailableErrorf("connections closed during statement execution: %v", err.Error()) case gocql.ErrNoStreams: s.metrics.ErrorMetrics.NoStreams.Inc(1) retry = true newErr = yarpcerrors.UnavailableErrorf("no streams during statement execution: %v", err.Error()) } if retry { if backoff.CheckRetry(p) { return nil } return newErr } return newErr } func (s *Store) executeWrite(ctx context.Context, stmt api.Statement) (api.ResultSet, error) { p := backoff.NewRetrier(s.retryPolicy) for { result, err := s.DataStore.Execute(ctx, stmt) if err == nil { return result, err } err = s.handleDataStoreError(err, p) if err != nil { if !common.IsTransientError(err) { s.metrics.ErrorMetrics.NotTransient.Inc(1) } return result, err } } } func (s *Store) executeRead( ctx context.Context, stmt api.Statement) ([]map[string]interface{}, error) { p := backoff.NewRetrier(s.retryPolicy) for { result, err := s.DataStore.Execute(ctx, stmt) if err == nil { if result != nil { defer result.Close() } allResults, nErr := result.All(ctx) if nErr == nil { return allResults, nErr } result.Close() err = nErr } err = s.handleDataStoreError(err, p) if err != nil { if !common.IsTransientError(err) { s.metrics.ErrorMetrics.NotTransient.Inc(1) } return nil, err } } } // Compress a blob using gzip func compress(buffer []byte) ([]byte, error) { var b bytes.Buffer w := gzip.NewWriter(&b) if _, err := w.Write(buffer); err != nil { return nil, err } if err := w.Close(); err != nil { return nil, err } return b.Bytes(), nil } // Uncompress a blob using gzip, return original blob if it was not compressed func uncompress(buffer []byte) ([]byte, error) { b := bytes.NewBuffer(buffer) r, err := gzip.NewReader(b) if err != nil { if err == gzip.ErrHeader { // blob was not compressed, so we can ignore this error. We can // look for only checksum errors which will mean data corruption return buffer, nil } return nil, err } defer r.Close() uncompressed, err := ioutil.ReadAll(r) if err != nil { return nil, err } return uncompressed, nil } // GetMaxJobConfigVersion returns the maximum version of configs of a given job func (s *Store) GetMaxJobConfigVersion( ctx context.Context, jobID string) (uint64, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("MAX(version)").From(jobConfigTable). Where(qb.Eq{"job_id": jobID}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.Errorf("Fail to get max version of job %v: %v", jobID, err) return 0, err } log.Debugf("max version: %v", allResults) for _, value := range allResults { for _, max := range value { // version is store as big int in Cassandra // gocql would cast big int to int64 return uint64(max.(int64)), nil } } return 0, nil } // WithTimeRangeFilter will take timerange and time_field (creation_time|completion_time) as // input and create a range filter on those fields and append to the clauses list func (c *luceneClauses) WithTimeRangeFilter(timeRange *peloton.TimeRange, timeField string) error { if timeRange == nil || c == nil { return nil } if timeField != creationTimeField && timeField != completionTimeField { return fmt.Errorf("Invalid time field %s", timeField) } // Create filter if time range is not nil min, err := ptypes.Timestamp(timeRange.GetMin()) if err != nil { log.WithField("timeRange", timeRange). WithField("timeField", timeField). WithError(err). Error("fail to get min time range") return err } max, err := ptypes.Timestamp(timeRange.GetMax()) if err != nil { log.WithField("timeRange", timeRange). WithField("timeField", timeField). WithError(err). Error("fail to get max time range") return err } // validate min and max limits are legit (i.e. max > min) if max.Before(min) { return fmt.Errorf("Incorrect timerange") } timeRangeMinStr := fmt.Sprintf(min.Format(jobIndexTimeFormat)) timeRangeMaxStr := fmt.Sprintf(max.Format(jobIndexTimeFormat)) *c = append(*c, fmt.Sprintf(`{type: "range", field:"%s", lower: "%s", upper: "%s", include_lower: true}`, timeField, timeRangeMinStr, timeRangeMaxStr)) return nil } // QueryJobs returns all jobs in the resource pool that matches the spec. func (s *Store) QueryJobs(ctx context.Context, respoolID *peloton.ResourcePoolID, spec *job.QuerySpec, summaryOnly bool) ([]*job.JobInfo, []*job.JobSummary, uint32, error) { // Query is based on stratio lucene index on jobs. // See https://github.com/Stratio/cassandra-lucene-index // We are using "must" for the labels and only return the jobs that contains all // label values // TODO: investigate if there are any golang library that can build lucene query var clauses luceneClauses if spec == nil { return nil, nil, 0, nil } // Labels field must contain value of the specified labels for _, label := range spec.GetLabels() { clauses = append(clauses, fmt.Sprintf(`{type: "contains", field:"labels", values:%s}`, strconv.Quote(label.Value))) } // jobconfig field must contain all specified keywords for _, word := range spec.GetKeywords() { // Lucene for some reason does wildcard search as case insensitive // However, to match individual words we still need to match // by exact keyword. Using boolean filter to do this. // using the "should" syntax will enable us to match on either // wildcard search or exact match wildcardWord := fmt.Sprintf("*%s*", strings.ToLower(word)) clauses = append(clauses, fmt.Sprintf( `{type: "boolean",`+ `should: [`+ `{type: "wildcard", field:"config", value:%s},`+ `{type: "match", field:"config", value:%s}`+ `]`+ `}`, strconv.Quote(wildcardWord), strconv.Quote(word))) } // Add support on query by job state // queryTerminalStates will be set if the spec contains any // terminal job state. In this case we will restrict the // job query to query for jobs over the last 7 days. // This is a temporary fix so that lucene index query doesn't // time out when searching for ALL jobs with terminal states // which is a huge number. // TODO (adityacb): change this once we have query spec support // a custom time range queryTerminalStates := false if len(spec.GetJobStates()) > 0 { values := "" for i, s := range spec.GetJobStates() { if util.IsPelotonJobStateTerminal(s) { queryTerminalStates = true } values = values + strconv.Quote(s.String()) if i < len(spec.JobStates)-1 { values = values + "," } } clauses = append(clauses, fmt.Sprintf(`{type: "contains", field:"state", values:[%s]}`, values)) } if respoolID != nil { clauses = append(clauses, fmt.Sprintf(`{type: "contains", field:"respool_id", values:%s}`, strconv.Quote(respoolID.GetValue()))) } owner := spec.GetOwner() if owner != "" { clauses = append(clauses, fmt.Sprintf(`{type: "match", field:"owner", value:%s}`, strconv.Quote(owner))) } name := spec.GetName() if name != "" { wildcardName := fmt.Sprintf("*%s*", name) clauses = append(clauses, fmt.Sprintf(`{type: "wildcard", field:"name", value:%s}`, strconv.Quote(wildcardName))) } creationTimeRange := spec.GetCreationTimeRange() completionTimeRange := spec.GetCompletionTimeRange() err := clauses.WithTimeRangeFilter(creationTimeRange, creationTimeField) if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } err = clauses.WithTimeRangeFilter(completionTimeRange, completionTimeField) if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } // If no time range is specified in query spec, but the query is for terminal state, // use default time range if creationTimeRange == nil && completionTimeRange == nil && queryTerminalStates { // Add jobQueryJitter to max bound to account for jobs // that have just been created. // if time range is not specified and the job is in terminal state, // apply a default range of last 7 days // TODO (adityacb): remove artificially enforcing default time range for // completed jobs once UI supports query by time range. now := time.Now().Add(jobQueryJitter).UTC() max, err := ptypes.TimestampProto(now) if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } min, err := ptypes.TimestampProto(now.AddDate(0, 0, -jobQueryDefaultSpanInDays)) if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } defaultCreationTimeRange := &peloton.TimeRange{Min: min, Max: max} err = clauses.WithTimeRangeFilter(defaultCreationTimeRange, "creation_time") if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } } where := `expr(job_index_lucene_v2, '{filter: [` for i, c := range clauses { if i > 0 { where += ", " } where += c } where += "]" // add default sorting by creation time in descending order in case orderby // is not specificed in the query spec var orderBy = spec.GetPagination().GetOrderBy() if orderBy == nil || len(orderBy) == 0 { orderBy = []*query.OrderBy{ { Order: query.OrderBy_DESC, Property: &query.PropertyPath{ Value: "creation_time", }, }, } } // add sorter into the query where += ", sort:[" count := 0 for _, order := range orderBy { where += fmt.Sprintf("{field: \"%s\"", order.Property.GetValue()) if order.Order == query.OrderBy_DESC { where += ", reverse: true" } where += "}" if count < len(orderBy)-1 { where += "," } count++ } where += "]" where += "}')" maxLimit := _defaultQueryMaxLimit if spec.GetPagination().GetMaxLimit() != 0 { maxLimit = spec.GetPagination().GetMaxLimit() } where += fmt.Sprintf(" Limit %d", maxLimit) log.WithField("where", where).Debug("query string") queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("job_id", "name", "owner", "job_type", "respool_id", "instance_count", "labels", "runtime_info"). From(jobIndexTable) stmt = stmt.Where(where) allResults, err := s.executeRead(ctx, stmt) if err != nil { uql, args, _, _ := stmt.ToUql() log.WithField("labels", spec.GetLabels()). WithField("uql", uql). WithField("args", args). WithError(err). Error("fail to query jobs") s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } total := uint32(len(allResults)) // Apply offset and limit. begin := spec.GetPagination().GetOffset() if begin > total { begin = total } allResults = allResults[begin:] end := _defaultQueryLimit if spec.GetPagination() != nil { limit := spec.GetPagination().GetLimit() if limit > 0 { // end should not be 0, it will yield in empty result end = limit } } if end > uint32(len(allResults)) { end = uint32(len(allResults)) } allResults = allResults[:end] summaryResults, err := s.getJobSummaryFromResultMap(ctx, allResults) if summaryOnly { if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } // Lucene index entry for some batch jobs may be out of sync with the // base job_index table. Scrub such jobs from the summary list. summaryResults, err := s.reconcileStaleBatchJobsFromJobSummaryList( ctx, summaryResults, queryTerminalStates) if err != nil { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, err } s.metrics.JobMetrics.JobQuery.Inc(1) return nil, summaryResults, total, nil } var results []*job.JobInfo for _, value := range allResults { id, ok := value["job_id"].(qb.UUID) if !ok { s.metrics.JobMetrics.JobQueryFail.Inc(1) return nil, nil, 0, fmt.Errorf("got invalid response from cassandra") } jobID := &peloton.JobID{ Value: id.String(), } jobRuntime, err := s.jobRuntimeOps.Get(ctx, jobID) if err != nil { log.WithError(err). WithField("job_id", id.String()). Warn("no job runtime found when executing jobs query") continue } // TODO (chunyang.shen): use job/task cache to get JobConfig T1760469 jobConfig, _, err := s.jobConfigOps.GetCurrentVersion(ctx, jobID) if err != nil { log.WithField("labels", spec.GetLabels()). WithField("job_id", id.String()). WithError(err). Error("fail to query jobs as not able to get job config") continue } // Unset instance config as its size can be huge as a workaround for UI query. // We should figure out long term support for grpc size limit. jobConfig.InstanceConfig = nil results = append(results, &job.JobInfo{ Id: jobID, Config: jobConfig, Runtime: jobRuntime, }) } s.metrics.JobMetrics.JobQuery.Inc(1) return results, summaryResults, total, nil } // CreateTaskRuntime creates a task runtime for a peloton job func (s *Store) CreateTaskRuntime( ctx context.Context, jobID *peloton.JobID, instanceID uint32, runtime *task.RuntimeInfo, owner string, jobType job.JobType) error { runtimeBuffer, err := proto.Marshal(runtime) if err != nil { log.WithField("job_id", jobID.GetValue()). WithField("instance_id", instanceID). WithError(err). Error("Failed to create task runtime") s.metrics.TaskMetrics.TaskCreateFail.Inc(1) return err } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Insert(taskRuntimeTable). Columns( "job_id", "instance_id", "version", "update_time", "state", "runtime_info"). Values( jobID.GetValue(), instanceID, runtime.GetRevision().GetVersion(), time.Now().UTC(), runtime.GetState().String(), runtimeBuffer) // IfNotExist() will cause Writing task runtimes to Cassandra concurrently // failed with Operation timed out issue when batch size is small, e.g. 1. // For now, we have to drop the IfNotExist() taskID := fmt.Sprintf(taskIDFmt, jobID, instanceID) if err := s.applyStatement(ctx, stmt, taskID); err != nil { s.metrics.TaskMetrics.TaskCreateFail.Inc(1) return err } s.metrics.TaskMetrics.TaskCreate.Inc(1) err = s.addPodEvent(ctx, jobID, instanceID, runtime) if err != nil { log.Errorf("Unable to log task state changes for job ID %v instance %v, error = %v", jobID.GetValue(), instanceID, err) return err } return nil } // addPodEvent upserts single pod state change for a Job -> Instance -> Run. // Task state events are sorted by reverse chronological run_id and time of event. func (s *Store) addPodEvent( ctx context.Context, jobID *peloton.JobID, instanceID uint32, runtime *task.RuntimeInfo) error { var runID, prevRunID, desiredRunID uint64 var err, errMessage error errLog := false if runID, err = util.ParseRunID( runtime.GetMesosTaskId().GetValue()); err != nil { errLog = true errMessage = err } // when creating a task, GetPrevMesosTaskId is empty, // set prevRunID to 0 if len(runtime.GetPrevMesosTaskId().GetValue()) == 0 { prevRunID = 0 } else if prevRunID, err = util.ParseRunID( runtime.GetPrevMesosTaskId().GetValue()); err != nil { errLog = true errMessage = err } // old job does not have desired mesos task id, make it the same as runID // TODO: remove the line after all tasks have desired mesos task id if len(runtime.GetDesiredMesosTaskId().GetValue()) == 0 { desiredRunID = runID } else if desiredRunID, err = util.ParseRunID( runtime.GetDesiredMesosTaskId().GetValue()); err != nil { errLog = true errMessage = err } if errLog { s.metrics.TaskMetrics.PodEventsAddFail.Inc(1) return errMessage } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Insert(podEventsTable). Columns( "job_id", "instance_id", "run_id", "desired_run_id", "previous_run_id", "update_time", "actual_state", "goal_state", "healthy", "hostname", "agent_id", "config_version", "desired_config_version", "volumeID", "message", "reason", "update_timestamp"). Values( jobID.GetValue(), instanceID, runID, desiredRunID, prevRunID, qb.UUID{UUID: gocql.UUIDFromTime(time.Now())}, runtime.GetState().String(), runtime.GetGoalState().String(), runtime.GetHealthy().String(), runtime.GetHost(), runtime.GetAgentID().GetValue(), runtime.GetConfigVersion(), runtime.GetDesiredConfigVersion(), runtime.GetVolumeID().GetValue(), runtime.GetMessage(), runtime.GetReason(), time.Now()).Into(podEventsTable) err = s.applyStatement(ctx, stmt, runtime.GetMesosTaskId().GetValue()) if err != nil { s.metrics.TaskMetrics.PodEventsAddFail.Inc(1) return err } s.metrics.TaskMetrics.PodEventsAddSuccess.Inc(1) return nil } // GetPodEvents returns pod events for a Job + Instance + PodID (optional) // Pod events are sorted by PodID + Timestamp // only is called from this file func (s *Store) GetPodEvents( ctx context.Context, jobID string, instanceID uint32, podID ...string) ([]*pod.PodEvent, error) { var stmt qb.SelectBuilder queryBuilder := s.DataStore.NewQuery() // Events are sorted in descinding order by PodID and then update time. stmt = queryBuilder.Select("*").From(podEventsTable). Where(qb.Eq{ "job_id": jobID, "instance_id": instanceID}) if len(podID) > 0 && len(podID[0]) > 0 { runID, err := util.ParseRunID(podID[0]) if err != nil { return nil, err } stmt = stmt.Where(qb.Eq{"run_id": runID}) } else { statement := queryBuilder.Select("run_id").From(podEventsTable). Where(qb.Eq{ "job_id": jobID, "instance_id": instanceID}). Limit(1) res, err := s.executeRead(ctx, statement) if err != nil { s.metrics.TaskMetrics.PodEventsGetFail.Inc(1) return nil, err } for _, value := range res { stmt = stmt.Where(qb.Eq{"run_id": value["run_id"].(int64)}) } } allResults, err := s.executeRead(ctx, stmt) if err != nil { s.metrics.TaskMetrics.PodEventsGetFail.Inc(1) return nil, err } var podEvents []*pod.PodEvent b := bytes.Buffer{} b.WriteString(jobID) b.WriteString("-") b.WriteString(strconv.FormatUint(uint64(instanceID), 10)) podName := b.String() for _, value := range allResults { podEvent := &pod.PodEvent{} b.Reset() b.WriteString(podName) b.WriteString("-") b.WriteString(strconv.FormatInt(value["run_id"].(int64), 10)) mesosTaskID := b.String() b.Reset() b.WriteString(podName) b.WriteString("-") b.WriteString(strconv.FormatInt(value["previous_run_id"].(int64), 10)) prevMesosTaskID := b.String() b.Reset() b.WriteString(podName) b.WriteString("-") b.WriteString(strconv.FormatInt(value["desired_run_id"].(int64), 10)) desiredMesosTaskID := b.String() // Set podEvent fields podEvent.PodId = &v1alphapeloton.PodID{ Value: mesosTaskID, } podEvent.PrevPodId = &v1alphapeloton.PodID{ Value: prevMesosTaskID, } podEvent.DesiredPodId = &v1alphapeloton.PodID{ Value: desiredMesosTaskID, } podEvent.Timestamp = value["update_time"].(qb.UUID).Time().Format(time.RFC3339) podEvent.Version = versionutil.GetPodEntityVersion( uint64(value["config_version"].(int64))) podEvent.DesiredVersion = versionutil.GetPodEntityVersion( uint64(value["desired_config_version"].(int64))) podEvent.ActualState = apiconvertor.ConvertTaskStateToPodState( task.TaskState(task.TaskState_value[value["actual_state"].(string)])).String() podEvent.DesiredState = apiconvertor.ConvertTaskStateToPodState( task.TaskState(task.TaskState_value[value["goal_state"].(string)])).String() podEvent.Healthy = pod.HealthState( task.HealthState_value[value["healthy"].(string)]).String() podEvent.Message = value["message"].(string) podEvent.Reason = value["reason"].(string) podEvent.AgentId = value["agent_id"].(string) podEvent.Hostname = value["hostname"].(string) podEvents = append(podEvents, podEvent) } s.metrics.TaskMetrics.PodEventsGetSucess.Inc(1) return podEvents, nil } // DeletePodEvents deletes the pod events for provided JobID, // InstanceID and RunID in the range [fromRunID-toRunID) func (s *Store) DeletePodEvents( ctx context.Context, jobID string, instanceID uint32, fromRunID uint64, toRunID uint64, ) error { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder. Delete(podEventsTable). Where(qb.Eq{"job_id": jobID, "instance_id": instanceID}). Where("run_id >= ?", fromRunID). Where("run_id < ?", toRunID) if err := s.applyStatement(ctx, stmt, jobID); err != nil { s.metrics.TaskMetrics.PodEventsDeleteFail.Inc(1) return err } s.metrics.TaskMetrics.PodEventsDeleteSucess.Inc(1) return nil } // GetTasksForJobResultSet returns the result set that can be used to iterate each task in a job // Caller need to call result.Close() func (s *Store) GetTasksForJobResultSet(ctx context.Context, id *peloton.JobID) ([]map[string]interface{}, error) { jobID := id.GetValue() queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*").From(taskRuntimeTable). Where(qb.Eq{"job_id": jobID}) result, err := s.executeRead(ctx, stmt) if err != nil { log.Errorf("Fail to GetTasksForJobResultSet by jobId %v, err=%v", jobID, err) return nil, err } return result, nil } // GetTasksForJob returns all the task runtimes (no configuration) in a map of tasks.TaskInfo for a peloton job func (s *Store) GetTasksForJob(ctx context.Context, id *peloton.JobID) (map[uint32]*task.TaskInfo, error) { allResults, err := s.GetTasksForJobResultSet(ctx, id) if err != nil { log.WithField("job_id", id.GetValue()). WithError(err). Error("Fail to GetTasksForJob") s.metrics.TaskMetrics.TaskGetForJobFail.Inc(1) return nil, err } resultMap := make(map[uint32]*task.TaskInfo) for _, value := range allResults { var record TaskRuntimeRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithField("value", value). WithError(err). Error("Failed to Fill into TaskRuntimeRecord") s.metrics.TaskMetrics.TaskGetForJobFail.Inc(1) continue } runtime, err := record.GetTaskRuntime() if err != nil { log.WithField("record", record). WithError(err). Error("Failed to parse task runtime from record") s.metrics.TaskMetrics.TaskGetForJobFail.Inc(1) continue } taskInfo := &task.TaskInfo{ Runtime: runtime, InstanceId: uint32(record.InstanceID), JobId: id, } s.metrics.TaskMetrics.TaskGetForJob.Inc(1) resultMap[taskInfo.InstanceId] = taskInfo } return resultMap, nil } // GetTaskConfigs returns the task configs for a list of instance IDs, // job ID and config version. func (s *Store) GetTaskConfigs(ctx context.Context, id *peloton.JobID, instanceIDs []uint32, version uint64) (map[uint32]*task.TaskConfig, *models.ConfigAddOn, error) { taskConfigMap := make(map[uint32]*task.TaskConfig) var configAddOn *models.ConfigAddOn var backFill bool // add default instance ID to read the default config var dbInstanceIDs []int for _, instance := range instanceIDs { dbInstanceIDs = append(dbInstanceIDs, int(instance)) } dbInstanceIDs = append(dbInstanceIDs, common.DefaultTaskConfigID) stmt := s.DataStore.NewQuery().Select("*").From(taskConfigV2Table). Where( qb.Eq{ "job_id": id.GetValue(), "version": version, "instance_id": dbInstanceIDs, }) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithField("job_id", id.GetValue()). WithField("instance_ids", instanceIDs). WithField("version", version). WithError(err). Error("Failed to get task configs") s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1) return taskConfigMap, nil, err } if len(allResults) == 0 { // Try to get task configs from legacy task_config table stmt := s.DataStore.NewQuery().Select("*").From(taskConfigTable). Where( qb.Eq{ "job_id": id.GetValue(), "version": version, "instance_id": dbInstanceIDs, }) allResults, err = s.executeRead(ctx, stmt) if err != nil { s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1) return taskConfigMap, nil, errors.Wrap( err, fmt.Sprintf( "failed to get task configs for %v", id.GetValue()), ) } if len(allResults) == 0 { return taskConfigMap, nil, nil } s.metrics.TaskMetrics.TaskGetConfigLegacy.Inc(1) backFill = true } var defaultConfig *task.TaskConfig // Read all the overridden task configs and the default task config for _, value := range allResults { var record TaskConfigRecord if err := FillObject(value, &record, reflect.TypeOf(record)); err != nil { log.WithField("value", value). WithError(err). Error("Failed to Fill into TaskRecord") s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1) return nil, nil, err } taskConfig, err := record.GetTaskConfig() if err != nil { return nil, nil, err } if record.InstanceID == common.DefaultTaskConfigID { // get the default config defaultConfig = taskConfig continue } taskConfigMap[uint32(record.InstanceID)] = taskConfig // Read config addon from the first result entry. This is because config // add-on is same for all tasks of a job if configAddOn != nil { continue } if configAddOn, err = record.GetConfigAddOn(); err != nil { log.WithField("value", value). WithError(err). Error("Failed to Unmarshal system labels") s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1) return nil, nil, err } } // Fill the instances which don't have a overridden config with the default // config for _, instance := range instanceIDs { if _, ok := taskConfigMap[instance]; !ok { // use the default config for this instance if defaultConfig == nil { // we should never be here. // Either every instance has a override config or we have a // default config. s.metrics.TaskMetrics.TaskGetConfigFail.Inc(1) return nil, nil, yarpcerrors.NotFoundErrorf("unable to read default task config") } taskConfigMap[instance] = defaultConfig } } if backFill { // back fill entry from task_config to task_config_v2 worker := func(i uint32) error { var cfg *task.TaskConfig var ok bool if cfg, ok = taskConfigMap[i]; !ok { return yarpcerrors.NotFoundErrorf( "failed to get config for instance %v", id, ) } return s.taskConfigV2Ops.Create( ctx, id, int64(i), cfg, configAddOn, nil, version, ) } err := util.RunInParallel(id.GetValue(), instanceIDs, worker) if err != nil { log.WithError(err).Info("failed to backfill task_config_v2") s.metrics.TaskMetrics.TaskConfigBackFillFail.Inc(1) } else { s.metrics.TaskMetrics.TaskConfigBackFill.Inc(1) } } s.metrics.TaskMetrics.TaskGetConfigs.Inc(1) return taskConfigMap, configAddOn, nil } func (s *Store) getTaskInfoFromRuntimeRecord(ctx context.Context, id *peloton.JobID, record *TaskRuntimeRecord) (*task.TaskInfo, error) { runtime, err := record.GetTaskRuntime() if err != nil { log.Errorf("Failed to parse task runtime from record, val = %v err= %v", record, err) return nil, err } config, _, err := s.taskConfigV2Ops.GetTaskConfig(ctx, id, uint32(record.InstanceID), runtime.ConfigVersion) if err != nil { return nil, err } return &task.TaskInfo{ Runtime: runtime, Config: config, InstanceId: uint32(record.InstanceID), JobId: id, }, nil } // GetTasksForJobAndStates returns the tasks for a peloton job which are in // one of the specified states. // result map key is TaskID, value is TaskHost func (s *Store) GetTasksForJobAndStates( ctx context.Context, id *peloton.JobID, states []task.TaskState) (map[uint32]*task.TaskInfo, error) { jobID := id.GetValue() queryBuilder := s.DataStore.NewQuery() taskStates := make(map[string]bool) for _, state := range states { taskStates[state.String()] = true } stmt := queryBuilder.Select("instance_id", "state").From(taskRuntimeTable). Where(qb.Eq{"job_id": jobID}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("job_id", jobID). Error("Failed to GetTasksForJobAndStates") s.metrics.TaskMetrics.TaskGetForJobAndStatesFail.Inc(1) return nil, err } resultMap := make(map[uint32]*task.TaskInfo) for _, value := range allResults { var record TaskRuntimeRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithError(err). WithField("job_id", jobID). WithField("value", value). Error("GetTasksForJobAndStates failed to Fill into TaskRecord") s.metrics.TaskMetrics.TaskGetForJobAndStatesFail.Inc(1) return nil, err } for i := 0; i < len(taskStates); i++ { if _, ok := taskStates[record.State]; !ok { continue } resultMap[uint32(record.InstanceID)], err = s.getTask(ctx, id.GetValue(), uint32(record.InstanceID)) if err != nil { log.WithError(err). WithField("job_id", jobID). WithField("instance_id", record.InstanceID). WithField("value", value). Error("Failed to get taskInfo from task") s.metrics.TaskMetrics.TaskGetForJobAndStatesFail.Inc(1) return nil, err } s.metrics.TaskMetrics.TaskGetForJobAndStates.Inc(1) } } s.metrics.TaskMetrics.TaskGetForJobAndStates.Inc(1) return resultMap, nil } func specContains(specifier []string, item string) bool { if len(specifier) == 0 { return true } return util.Contains(specifier, item) } // GetTasksByQuerySpec returns the tasks for a peloton job which satisfy the QuerySpec // field 'state' is filtered by DB query, field 'name', 'host' is filter func (s *Store) GetTasksByQuerySpec( ctx context.Context, jobID *peloton.JobID, spec *task.QuerySpec) (map[uint32]*task.TaskInfo, error) { taskStates := spec.GetTaskStates() names := spec.GetNames() hosts := spec.GetHosts() var tasks map[uint32]*task.TaskInfo var err error if len(taskStates) == 0 { //Get all tasks for the job if query doesn't specify the task state(s) tasks, err = s.GetTasksForJobByRange(ctx, jobID, nil) } else { //Get tasks with specified states tasks, err = s.GetTasksForJobAndStates(ctx, jobID, taskStates) } if err != nil { log.WithError(err). WithField("job_id", jobID.GetValue()). WithField("states", taskStates). Error("QueryTasks failed to get tasks for the job") s.metrics.TaskMetrics.TaskQueryTasksFail.Inc(1) return nil, err } filteredTasks := make(map[uint32]*task.TaskInfo) // Filtering name and host start := time.Now() for _, task := range tasks { taskName := task.GetConfig().GetName() taskHost := task.GetRuntime().GetHost() if specContains(names, taskName) && specContains(hosts, taskHost) { filteredTasks[task.InstanceId] = task } // Deleting a task, to let it GC and not block memory till entire task list if iterated. delete(tasks, task.InstanceId) } log.WithFields(log.Fields{ "jobID": jobID, "query_type": "In memory filtering", "Names": names, "hosts": hosts, "task_size": len(tasks), "duration": time.Since(start).Seconds(), }).Debug("Query in memory filtering time") return filteredTasks, nil } // GetTaskRuntimesForJobByRange returns the Task RuntimeInfo for batch jobs by // instance ID range. func (s *Store) GetTaskRuntimesForJobByRange(ctx context.Context, id *peloton.JobID, instanceRange *task.InstanceRange) (map[uint32]*task.RuntimeInfo, error) { jobID := id.GetValue() result := make(map[uint32]*task.RuntimeInfo) queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*"). From(taskRuntimeTable). Where(qb.Eq{"job_id": jobID}) if instanceRange != nil { stmt = stmt.Where("instance_id >= ?", instanceRange.From). Where("instance_id < ?", instanceRange.To) } allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("job_id", jobID). WithField("range", instanceRange). Error("fail to get task rutimes for jobs by range") s.metrics.TaskMetrics.TaskGetRuntimesForJobRangeFail.Inc(1) return nil, err } if len(allResults) == 0 { return result, nil } for _, value := range allResults { var record TaskRuntimeRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithField("job_id", jobID). WithField("range", instanceRange). WithError(err). Error("failed to fill runtime into task record") s.metrics.TaskMetrics.TaskGetRuntimesForJobRangeFail.Inc(1) return nil, err } runtime, err := record.GetTaskRuntime() if err != nil { return result, err } result[uint32(record.InstanceID)] = runtime } s.metrics.TaskMetrics.TaskGetRuntimesForJobRange.Inc(1) return result, nil } // GetTasksForJobByRange returns the TaskInfo for batch jobs by // instance ID range. func (s *Store) GetTasksForJobByRange(ctx context.Context, id *peloton.JobID, instanceRange *task.InstanceRange) (map[uint32]*task.TaskInfo, error) { jobID := id.GetValue() result := make(map[uint32]*task.TaskInfo) queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*"). From(taskRuntimeTable). Where(qb.Eq{"job_id": jobID}) if instanceRange != nil { stmt = stmt.Where("instance_id >= ?", instanceRange.From). Where("instance_id < ?", instanceRange.To) } allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("job_id", jobID). WithField("range", instanceRange). Error("Fail to GetTasksForBatchJobsByRange") s.metrics.TaskMetrics.TaskGetForJobRangeFail.Inc(1) return nil, err } if len(allResults) == 0 { return result, nil } // create map of instanceID->runtime runtimeMap := make(map[uint32]*task.RuntimeInfo) for _, value := range allResults { var record TaskRuntimeRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithField("job_id", jobID). WithField("range", instanceRange). WithError(err). Error("Failed to Fill into TaskRecord") s.metrics.TaskMetrics.TaskGetForJobRangeFail.Inc(1) return nil, err } runtime, err := record.GetTaskRuntime() if err != nil { return result, err } runtimeMap[uint32(record.InstanceID)] = runtime } if len(runtimeMap) == 0 { return result, nil } // map of configVersion-> list of instance IDS with that version // // NB: For batch jobs the assumption is that most( // if not all) of the tasks will have the same task config version. // So we can use this optimization to get all the configs with just 1 DB // call. In the worst case if all tasks have a different config version // then it'll take 1 DB call for each task config. configVersions := make(map[uint64][]uint32) for instanceID, runtime := range runtimeMap { instances, ok := configVersions[runtime.GetConfigVersion()] if !ok { instances = []uint32{} } instances = append(instances, instanceID) configVersions[runtime.GetConfigVersion()] = instances } log.WithField("config_versions_map", configVersions).Debug("config versions to read") // map of instanceID -> task config configMap := make(map[uint32]*task.TaskConfig) for configVersion, instances := range configVersions { // Get the configs for a particular config version configs, _, err := s.GetTaskConfigs(ctx, id, instances, configVersion) if err != nil { return result, err } // appends the configs for instanceID, config := range configs { configMap[instanceID] = config } } // We have the task configs and the task runtimes, so we can // create task infos for instanceID, runtime := range runtimeMap { config := configMap[instanceID] result[instanceID] = &task.TaskInfo{ InstanceId: instanceID, JobId: id, Config: config, Runtime: runtime, } } // The count should be the same log.WithField("count_runtime", len(runtimeMap)). WithField("count_config", len(configMap)). Debug("runtime vs config") s.metrics.TaskMetrics.TaskGetForJobRange.Inc(1) return result, nil } // GetTaskRuntime for a job and instance id. func (s *Store) GetTaskRuntime(ctx context.Context, jobID *peloton.JobID, instanceID uint32) (*task.RuntimeInfo, error) { record, err := s.getTaskRuntimeRecord(ctx, jobID.GetValue(), instanceID) if err != nil { log.WithError(err). WithField("job_id", jobID.GetValue()). WithField("instance_id", instanceID). Errorf("failed to get task runtime record") return nil, err } runtime, err := record.GetTaskRuntime() if err != nil { log.WithError(err). WithField("record", record). Errorf("failed to parse task runtime from record") return nil, err } return runtime, err } // UpdateTaskRuntime updates a task for a peloton job func (s *Store) UpdateTaskRuntime( ctx context.Context, jobID *peloton.JobID, instanceID uint32, runtime *task.RuntimeInfo, jobType job.JobType) error { runtimeBuffer, err := proto.Marshal(runtime) if err != nil { s.metrics.TaskMetrics.TaskUpdateFail.Inc(1) return err } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Update(taskRuntimeTable). Set("version", runtime.Revision.Version). Set("update_time", time.Now().UTC()). Set("state", runtime.GetState().String()). Set("runtime_info", runtimeBuffer). Where(qb.Eq{"job_id": jobID.GetValue(), "instance_id": instanceID}) if err := s.applyStatement(ctx, stmt, fmt.Sprintf(taskIDFmt, jobID.GetValue(), instanceID)); err != nil { s.metrics.TaskMetrics.TaskUpdateFail.Inc(1) return err } s.metrics.TaskMetrics.TaskUpdate.Inc(1) s.addPodEvent(ctx, jobID, instanceID, runtime) return nil } // GetTaskForJob returns a task by jobID and instanceID func (s *Store) GetTaskForJob(ctx context.Context, jobID string, instanceID uint32) (map[uint32]*task.TaskInfo, error) { taskID := fmt.Sprintf(taskIDFmt, jobID, int(instanceID)) taskInfo, err := s.GetTaskByID(ctx, taskID) if err != nil { return nil, err } result := make(map[uint32]*task.TaskInfo) result[instanceID] = taskInfo return result, nil } // DeleteTaskRuntime deletes runtime of a particular task . // It is used to delete a task when update workflow reduces the instance // count during an update. The pod events are retained in case the user // wants to fetch the events or the logs from a previous run of a deleted task. // The task configurations from previous versions are retained in case // auto-rollback gets triggered. func (s *Store) DeleteTaskRuntime( ctx context.Context, id *peloton.JobID, instanceID uint32) error { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Delete(taskRuntimeTable). Where(qb.Eq{"job_id": id.GetValue(), "instance_id": instanceID}) if err := s.applyStatement(ctx, stmt, id.GetValue()); err != nil { s.metrics.TaskMetrics.TaskDeleteFail.Inc(1) return err } s.metrics.TaskMetrics.TaskDelete.Inc(1) return nil } // 1) Pod Events table has partition key job_id + instance_id, // so pod events need to be deleted per instance. // 2) Fetch instance count from job config, and delete pod events // incrementally for each Instance. // 3) There maybe a scenario, were instance count is shrunk, in order to delete // pod events for shrunk instances, first read pod event for shrunk instances, // if exist then delete. If result is zero, that means we have reached // maximum instance count ever for that job. // 4) Performance optimization for deleting shrunk instances, // read pod events for every - instance_id % 100 = 0 // If pod event exist then continue to delete pod events for next 100 instances // If pod event not exist means pod events are deleted for all shrunk instances func (s *Store) deletePodEventsOnDeleteJob( ctx context.Context, jobID string) error { queryBuilder := s.DataStore.NewQuery() instanceCount := uint32(0) jobConfig, _, err := s.jobConfigOps.GetCurrentVersion( ctx, &peloton.JobID{Value: jobID}, ) if err != nil { // if the config is not found, then the job has already been deleted. if yarpcerrors.IsNotFound(errors.Cause(err)) { return nil } return err } for { // 1) read pod events to identify shrunk instances // 2) read pod events if instance_id (shrunk instances) % 100 = 0 if instanceCount > jobConfig.InstanceCount && instanceCount%_defaultPodEventsLimit == 0 { events, err := s.GetPodEvents( ctx, jobID, instanceCount) if err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) return err } if len(events) == 0 { break } } stmt := queryBuilder.Delete(podEventsTable). Where(qb.Eq{"job_id": jobID}). Where(qb.Eq{"instance_id": instanceCount}) if err := s.applyStatement(ctx, stmt, jobID); err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) return err } instanceCount++ } return nil } // DeleteJob deletes a job and associated tasks, by job id. // TODO: This implementation is not perfect, as if it's getting an transient // error, the job or some tasks may not be fully deleted. func (s *Store) DeleteJob( ctx context.Context, jobID string) error { if err := s.deletePodEventsOnDeleteJob(ctx, jobID); err != nil { return err } if err := s.deleteTaskConfigV2OnDeleteJob(ctx, jobID); err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) return err } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Delete(taskRuntimeTable).Where(qb.Eq{"job_id": jobID}) if err := s.applyStatement(ctx, stmt, jobID); err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) return err } // Delete all updates for the job updateIDs, err := s.GetUpdatesForJob(ctx, jobID) if err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) return err } for _, id := range updateIDs { if err := s.deleteSingleUpdate(ctx, id); err != nil { return err } } stmt = queryBuilder.Delete(jobConfigTable).Where(qb.Eq{"job_id": jobID}) if err := s.applyStatement(ctx, stmt, jobID); err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) return err } stmt = queryBuilder.Delete(jobRuntimeTable).Where(qb.Eq{"job_id": jobID}) err = s.applyStatement(ctx, stmt, jobID) if err != nil { s.metrics.JobMetrics.JobDeleteFail.Inc(1) } else { s.metrics.JobMetrics.JobDelete.Inc(1) } return err } // task_config_v2 has partition key of jobID, version, instance_id // so we need to delete this table per job, per version, per instance func (s *Store) deleteTaskConfigV2OnDeleteJob( ctx context.Context, jobID string) error { queryBuilder := s.DataStore.NewQuery() jobConfig, _, err := s.jobConfigOps.GetCurrentVersion(ctx, &peloton.JobID{Value: jobID}) if err != nil { // if the config is not found, then the job has already been deleted. if yarpcerrors.IsNotFound(err) { return nil } return err } // loop through all the job config versions for i := uint64(1); i <= jobConfig.GetChangeLog().GetVersion(); i++ { // get the job config for this version jobConfigWithVersoin, _, err := s.jobConfigOps.Get(ctx, &peloton.JobID{Value: jobID}, i) if err != nil { return err } // get the instance count for this version instanceCountWithVersion := uint32(jobConfigWithVersoin. GetInstanceCount()) for j := uint32(0); j < instanceCountWithVersion; j++ { stmt := queryBuilder.Delete(taskConfigV2Table). Where(qb.Eq{"job_id": jobID}). Where(qb.Eq{"instance_id": j}). Where(qb.Eq{"version": i}) if err := s.applyStatement(ctx, stmt, jobID); err != nil { return err } } } return nil } // GetTaskByID returns the tasks (tasks.TaskInfo) for a peloton job func (s *Store) GetTaskByID(ctx context.Context, taskID string) (*task.TaskInfo, error) { jobID, instanceID, err := util.ParseTaskID(taskID) if err != nil { log.WithError(err). WithField("task_id", taskID). Error("Invalid task id") return nil, err } return s.getTask(ctx, jobID, uint32(instanceID)) } func (s *Store) getTask(ctx context.Context, jobID string, instanceID uint32) (*task.TaskInfo, error) { record, err := s.getTaskRuntimeRecord(ctx, jobID, instanceID) if err != nil { log.WithError(err). WithField("job_id", jobID). WithField("instance_id", instanceID). Error("failed to fetch task runtime record in get task") return nil, err } return s.getTaskInfoFromRuntimeRecord(ctx, &peloton.JobID{Value: jobID}, record) } // getTaskRuntimeRecord returns the runtime record for a peloton task func (s *Store) getTaskRuntimeRecord(ctx context.Context, jobID string, instanceID uint32) (*TaskRuntimeRecord, error) { taskID := fmt.Sprintf(taskIDFmt, jobID, int(instanceID)) queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*").From(taskRuntimeTable). Where(qb.Eq{"job_id": jobID, "instance_id": instanceID}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithField("task_id", taskID). WithError(err). Error("Fail to GetTask") s.metrics.TaskMetrics.TaskGetFail.Inc(1) return nil, err } for _, value := range allResults { var record TaskRuntimeRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithField("task_id", taskID). WithError(err). Error("Failed to Fill into TaskRecord") s.metrics.TaskMetrics.TaskGetFail.Inc(1) return nil, err } s.metrics.TaskMetrics.TaskGet.Inc(1) return &record, nil } s.metrics.TaskMetrics.TaskNotFound.Inc(1) return nil, yarpcerrors.NotFoundErrorf("task:%s not found", taskID) } //SetMesosStreamID stores the mesos framework id for a framework name func (s *Store) SetMesosStreamID(ctx context.Context, frameworkName string, mesosStreamID string) error { return s.updateFrameworkTable(ctx, map[string]interface{}{"framework_name": frameworkName, "mesos_stream_id": mesosStreamID}) } //SetMesosFrameworkID stores the mesos framework id for a framework name func (s *Store) SetMesosFrameworkID(ctx context.Context, frameworkName string, frameworkID string) error { return s.updateFrameworkTable(ctx, map[string]interface{}{"framework_name": frameworkName, "framework_id": frameworkID}) } func (s *Store) updateFrameworkTable(ctx context.Context, content map[string]interface{}) error { hostName, err := os.Hostname() if err != nil { return err } queryBuilder := s.DataStore.NewQuery() content["update_host"] = hostName content["update_time"] = time.Now().UTC() var columns []string var values []interface{} for col, val := range content { columns = append(columns, col) values = append(values, val) } stmt := queryBuilder.Insert(frameworksTable). Columns(columns...). Values(values...) err = s.applyStatement(ctx, stmt, frameworksTable) if err != nil { s.metrics.FrameworkStoreMetrics.FrameworkUpdateFail.Inc(1) return err } s.metrics.FrameworkStoreMetrics.FrameworkUpdate.Inc(1) return nil } //GetMesosStreamID reads the mesos stream id for a framework name func (s *Store) GetMesosStreamID(ctx context.Context, frameworkName string) (string, error) { frameworkInfoRecord, err := s.getFrameworkInfo(ctx, frameworkName) if err != nil { s.metrics.FrameworkStoreMetrics.StreamIDGetFail.Inc(1) return "", err } s.metrics.FrameworkStoreMetrics.StreamIDGet.Inc(1) return frameworkInfoRecord.MesosStreamID, nil } //GetFrameworkID reads the framework id for a framework name func (s *Store) GetFrameworkID(ctx context.Context, frameworkName string) (string, error) { frameworkInfoRecord, err := s.getFrameworkInfo(ctx, frameworkName) if err != nil { s.metrics.FrameworkStoreMetrics.FrameworkIDGetFail.Inc(1) return "", err } s.metrics.FrameworkStoreMetrics.FrameworkIDGet.Inc(1) return frameworkInfoRecord.FrameworkID, nil } func (s *Store) getFrameworkInfo(ctx context.Context, frameworkName string) (*FrameworkInfoRecord, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*").From(frameworksTable). Where(qb.Eq{"framework_name": frameworkName}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.Errorf("Fail to getFrameworkInfo by frameworkName %v, err=%v", frameworkName, err) return nil, err } for _, value := range allResults { var record FrameworkInfoRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.Errorf("Failed to Fill into FrameworkInfoRecord, err= %v", err) return nil, err } return &record, nil } return nil, fmt.Errorf("FrameworkInfo not found for framework %v", frameworkName) } func (s *Store) applyStatement(ctx context.Context, stmt api.Statement, itemName string) error { stmtString, _, _ := stmt.ToSQL() // Use common.DBStmtLogField to log CQL queries here. Log formatter will use // this string to redact secret_info table queries log.WithField(common.DBStmtLogField, stmtString).Debug("DB stmt string") result, err := s.executeWrite(ctx, stmt) if err != nil { log.WithError(err).WithFields( log.Fields{common.DBStmtLogField: stmtString, "itemName": itemName}). Debug("Fail to execute stmt") return err } if result != nil { defer result.Close() } // In case the insert stmt has IfNotExist set (create case), it would fail to apply if // the underlying job/task already exists if result != nil && !result.Applied() { errMsg := fmt.Sprintf("%v is not applied, item could exist already", itemName) s.metrics.ErrorMetrics.CASNotApplied.Inc(1) log.Error(errMsg) return yarpcerrors.AlreadyExistsErrorf(errMsg) } return nil } // getJobSummaryFromIndex gets the job summary from job index table. // This is a helper function used by QueryJobs(). Do not use it for // anything other than QueryJobs; consider using ORM directly. // TODO Remove this when QueryJobs() uses ORM. func (s *Store) getJobSummaryFromIndex( ctx context.Context, id *peloton.JobID) (*job.JobSummary, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select( "job_id", "name", "owner", "job_type", "respool_id", "instance_count", "labels", "runtime_info"). From(jobIndexTable). Where(qb.Eq{"job_id": id.GetValue()}) allResults, err := s.executeRead(ctx, stmt) if err != nil { return nil, err } summary, err := s.getJobSummaryFromResultMap(ctx, allResults) if err != nil { return nil, err } if len(summary) != 1 { return nil, yarpcerrors.FailedPreconditionErrorf( "found %d jobs %v for job id %v", len(allResults), allResults, id) } return summary[0], nil } // Less function holds the task sorting logic func Less(orderByList []*query.OrderBy, t1 *task.TaskInfo, t2 *task.TaskInfo) bool { // Keep comparing the two tasks by the field related with Order from the OrderbyList // until they are not equal on one fields for _, orderBy := range orderByList { desc := orderBy.GetOrder() == query.OrderBy_DESC property := orderBy.GetProperty().GetValue() if property == creationTimeField { time1, err1 := time.Parse(time.RFC3339, t1.GetRuntime().GetStartTime()) time2, err2 := time.Parse(time.RFC3339, t2.GetRuntime().GetStartTime()) if err1 != nil || err2 != nil { // if any StartTime of two tasks can't get parsed (or not exist) // task with a valid StartTime is less if err1 == nil { return !desc } else if err2 == nil { return desc } // both tasks have invalid StartTime, goto next loop continue } // return result if not equal, otherwise goto next loop if time1.Before(time2) { return !desc } else if time1.After(time2) { return desc } } else if property == hostField { if t1.GetRuntime().GetHost() < t2.GetRuntime().GetHost() { return !desc } else if t1.GetRuntime().GetHost() > t2.GetRuntime().GetHost() { return desc } } else if property == instanceIDField { if t1.GetInstanceId() < t2.GetInstanceId() { return !desc } else if t1.GetInstanceId() > t2.GetInstanceId() { return desc } } else if property == messageField { if t1.GetRuntime().GetMessage() < t2.GetRuntime().GetMessage() { return !desc } else if t1.GetRuntime().GetMessage() > t2.GetRuntime().GetMessage() { return desc } } else if property == nameField { if t1.GetConfig().GetName() < t2.GetConfig().GetName() { return !desc } else if t1.GetConfig().GetName() > t2.GetConfig().GetName() { return desc } } else if property == reasonField { if t1.GetRuntime().GetReason() < t2.GetRuntime().GetReason() { return !desc } else if t1.GetRuntime().GetReason() > t2.GetRuntime().GetReason() { return desc } } else if property == stateField { if t1.GetRuntime().GetState() < t2.GetRuntime().GetState() { return !desc } else if t1.GetRuntime().GetState() > t2.GetRuntime().GetState() { return desc } } } // Default order by InstanceId with increase order return t1.GetInstanceId() < t2.GetInstanceId() } // QueryTasks returns the tasks filtered on states(spec.TaskStates) in the given offset..offset+limit range. func (s *Store) QueryTasks( ctx context.Context, jobID *peloton.JobID, spec *task.QuerySpec) ([]*task.TaskInfo, uint32, error) { tasks, err := s.GetTasksByQuerySpec(ctx, jobID, spec) if err != nil { s.metrics.TaskMetrics.TaskQueryTasksFail.Inc(1) return nil, 0, err } //sortedTasksResult is sorted (by instanceID) list of tasksResult var sortedTasksResult SortedTaskInfoList for _, taskInfo := range tasks { sortedTasksResult = append(sortedTasksResult, taskInfo) } //sorting fields validation check var orderByList = spec.GetPagination().GetOrderBy() for _, orderBy := range orderByList { property := orderBy.GetProperty().GetValue() switch property { case creationTimeField, hostField, instanceIDField, messageField, nameField, reasonField, stateField: continue } return nil, 0, errors.New("Sort only supports fields: creation_time, host, instanceId, message, name, reason, state") } sort.Slice(sortedTasksResult, func(i, j int) bool { return Less(orderByList, sortedTasksResult[i], sortedTasksResult[j]) }) offset := spec.GetPagination().GetOffset() limit := _defaultQueryLimit if spec.GetPagination().GetLimit() != 0 { limit = spec.GetPagination().GetLimit() } end := offset + limit if end > uint32(len(sortedTasksResult)) { end = uint32(len(sortedTasksResult)) } var result []*task.TaskInfo if offset < end { result = sortedTasksResult[offset:end] } s.metrics.TaskMetrics.TaskQueryTasks.Inc(1) return result, uint32(len(sortedTasksResult)), nil } // CreatePersistentVolume creates a persistent volume entry. func (s *Store) CreatePersistentVolume(ctx context.Context, volume *pb_volume.PersistentVolumeInfo) error { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Insert(volumeTable). Columns("volume_id", "state", "goal_state", "job_id", "instance_id", "hostname", "size_mb", "container_path", "creation_time", "update_time"). Values( volume.GetId().GetValue(), volume.State.String(), volume.GoalState.String(), volume.GetJobId().GetValue(), volume.InstanceId, volume.Hostname, volume.SizeMB, volume.ContainerPath, time.Now().UTC(), time.Now().UTC()). IfNotExist() err := s.applyStatement(ctx, stmt, volume.GetId().GetValue()) if err != nil { s.metrics.VolumeMetrics.VolumeCreateFail.Inc(1) return err } s.metrics.VolumeMetrics.VolumeCreate.Inc(1) return nil } // UpdatePersistentVolume updates persistent volume info. func (s *Store) UpdatePersistentVolume(ctx context.Context, volumeInfo *pb_volume.PersistentVolumeInfo) error { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder. Update(volumeTable). Set("state", volumeInfo.GetState().String()). Set("goal_state", volumeInfo.GetGoalState().String()). Set("update_time", time.Now().UTC()). Where(qb.Eq{"volume_id": volumeInfo.GetId().GetValue()}) err := s.applyStatement(ctx, stmt, volumeInfo.GetId().GetValue()) if err != nil { s.metrics.VolumeMetrics.VolumeUpdateFail.Inc(1) return err } s.metrics.VolumeMetrics.VolumeUpdate.Inc(1) return nil } // GetPersistentVolume gets the persistent volume object. func (s *Store) GetPersistentVolume(ctx context.Context, volumeID *peloton.VolumeID) (*pb_volume.PersistentVolumeInfo, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder. Select("*"). From(volumeTable). Where(qb.Eq{"volume_id": volumeID.GetValue()}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("volume_id", volumeID). Error("Fail to GetPersistentVolume by volumeID.") s.metrics.VolumeMetrics.VolumeGetFail.Inc(1) return nil, err } for _, value := range allResults { var record PersistentVolumeRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithError(err). WithField("raw_volume_value", value). Error("Failed to Fill into PersistentVolumeRecord.") s.metrics.VolumeMetrics.VolumeGetFail.Inc(1) return nil, err } s.metrics.VolumeMetrics.VolumeGet.Inc(1) return &pb_volume.PersistentVolumeInfo{ Id: &peloton.VolumeID{ Value: record.VolumeID, }, State: pb_volume.VolumeState( pb_volume.VolumeState_value[record.State]), GoalState: pb_volume.VolumeState( pb_volume.VolumeState_value[record.GoalState]), JobId: &peloton.JobID{ Value: record.JobID, }, InstanceId: uint32(record.InstanceID), Hostname: record.Hostname, SizeMB: uint32(record.SizeMB), ContainerPath: record.ContainerPath, CreateTime: record.CreateTime.String(), UpdateTime: record.UpdateTime.String(), }, nil } s.metrics.VolumeMetrics.VolumeGetFail.Inc(1) return nil, &storage.VolumeNotFoundError{VolumeID: volumeID} } // CreateUpdate creates a new update entry in DB. // If it already exists, the create will return an error. func (s *Store) CreateUpdate( ctx context.Context, updateInfo *models.UpdateModel, ) error { creationTime, err := time.Parse(time.RFC3339Nano, updateInfo.GetCreationTime()) if err != nil { return errors.Wrap(yarpcerrors.InvalidArgumentErrorf(err.Error()), "fail to parse creationTime") } updateTime, err := time.Parse(time.RFC3339Nano, updateInfo.GetUpdateTime()) if err != nil { return errors.Wrap(yarpcerrors.InvalidArgumentErrorf(err.Error()), "fail to parse updateTime") } updateConfigBuffer, err := proto.Marshal(updateInfo.GetUpdateConfig()) if err != nil { log.WithError(err). WithField("update_id", updateInfo.GetUpdateID().GetValue()). WithField("job_id", updateInfo.GetJobID().GetValue()). Error("failed to marshal update config") s.metrics.UpdateMetrics.UpdateCreateFail.Inc(1) return err } // Insert the update into the DB. Use CAS to ensure // that it does not exist already. queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Insert(updatesTable). Columns( "update_id", "update_type", "update_options", "update_state", "update_prev_state", "instances_total", "instances_added", "instances_updated", "instances_removed", "instances_done", "instances_current", "instances_failed", "job_id", "job_config_version", "job_config_prev_version", "opaque_data", "creation_time", "update_time"). Values( updateInfo.GetUpdateID().GetValue(), updateInfo.GetType().String(), updateConfigBuffer, updateInfo.GetState().String(), updateInfo.GetPrevState().String(), updateInfo.GetInstancesTotal(), updateInfo.GetInstancesAdded(), updateInfo.GetInstancesUpdated(), updateInfo.GetInstancesRemoved(), 0, []int{}, 0, updateInfo.GetJobID().GetValue(), updateInfo.GetJobConfigVersion(), updateInfo.GetPrevJobConfigVersion(), updateInfo.GetOpaqueData().GetData(), creationTime.UTC(), updateTime.UTC()). IfNotExist() if err := s.applyStatement( ctx, stmt, updateInfo.GetUpdateID().GetValue()); err != nil { log.WithError(err). WithField("update_id", updateInfo.GetUpdateID().GetValue()). WithField("job_id", updateInfo.GetJobID().GetValue()). Info("create update in DB failed") s.metrics.UpdateMetrics.UpdateCreateFail.Inc(1) return err } // best effort to clean up previous updates for the job go func() { cleanupCtx, cleanupCancel := context.WithTimeout( ctx, _jobUpdatesCleanupTimeout, ) defer cleanupCancel() if err := s.cleanupPreviousUpdatesForJob( cleanupCtx, updateInfo.GetJobID()); err != nil { log.WithError(err). WithField("job_id", updateInfo.GetJobID().GetValue()). Info("failed to clean up previous updates") } }() s.metrics.UpdateMetrics.UpdateCreate.Inc(1) return nil } // convertWorkflowEvents is a helper method to return workflow events slice // from Cassandra read result of workflow events. func (s *Store) convertToWorkflowEvents( ctx context.Context, updateID *peloton.UpdateID, result []map[string]interface{}, ) []*stateless.WorkflowEvent { var count int var isLogged bool var prevWorkflowState stateless.WorkflowState var workflowEvents []*stateless.WorkflowEvent for _, value := range result { workflowEvent := &stateless.WorkflowEvent{ Type: stateless.WorkflowType( models.WorkflowType_value[value["type"].(string)]), State: stateless.WorkflowState( update.State_value[value["state"].(string)]), Timestamp: value["create_time"].(qb.UUID).Time().Format(time.RFC3339), } if prevWorkflowState != workflowEvent.GetState() { workflowEvents = append(workflowEvents, workflowEvent) count = 0 isLogged = false prevWorkflowState = workflowEvent.GetState() } else { count++ if count > _defaultWorkflowEventsDedupeWarnLimit && !isLogged { log.WithFields(log.Fields{ "workflow_state": workflowEvent.GetState().String(), "workflow_type": workflowEvent.GetType().String(), "update_id": updateID.GetValue(), }).Warn("too many job workflow events in the same state") isLogged = true } } } return workflowEvents } // deleteJobUpdateEvents deletes job update events for an update of a job func (s *Store) deleteJobUpdateEvents( ctx context.Context, updateID *peloton.UpdateID, ) error { if err := s.jobUpdateEventsOps.Delete(ctx, updateID); err != nil { s.metrics.UpdateMetrics.JobUpdateEventDeleteFail.Inc(1) return err } s.metrics.UpdateMetrics.JobUpdateEventDelete.Inc(1) return nil } // AddWorkflowEvent adds workflow events for an update and instance // to track the progress func (s *Store) AddWorkflowEvent( ctx context.Context, updateID *peloton.UpdateID, instanceID uint32, workflowType models.WorkflowType, workflowState update.State) error { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Insert(podWorkflowEventsTable). Columns( "update_id", "instance_id", "type", "state", "create_time", "update_timestamp"). Values( updateID.GetValue(), int(instanceID), workflowType.String(), workflowState.String(), qb.UUID{UUID: gocql.UUIDFromTime(time.Now())}, time.Now()) err := s.applyStatement(ctx, stmt, updateID.GetValue()) if err != nil { s.metrics.WorkflowMetrics.WorkflowEventsAddFail.Inc(1) return err } s.metrics.WorkflowMetrics.WorkflowEventsAdd.Inc(1) return nil } // GetWorkflowEvents gets workflow events for an update and instance, // events are sorted in descending create timestamp func (s *Store) GetWorkflowEvents( ctx context.Context, updateID *peloton.UpdateID, instanceID uint32, limit uint32, ) ([]*stateless.WorkflowEvent, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*").From(podWorkflowEventsTable). Where(qb.Eq{"update_id": updateID.GetValue()}). Where(qb.Eq{"instance_id": int(instanceID)}) if limit > 0 { stmt = stmt.Limit(uint64(limit)) } result, err := s.executeRead(ctx, stmt) if err != nil { s.metrics.WorkflowMetrics.WorkflowEventsGetFail.Inc(1) return nil, err } workflowEvents := s.convertToWorkflowEvents(ctx, updateID, result) s.metrics.WorkflowMetrics.WorkflowEventsGet.Inc(1) return workflowEvents, nil } // deleteWorkflowEvents deletes the workflow events for an update and instance func (s *Store) deleteWorkflowEvents( ctx context.Context, id *peloton.UpdateID, instanceID uint32) error { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Delete(podWorkflowEventsTable). Where(qb.Eq{"update_id": id.GetValue()}). Where(qb.Eq{"instance_id": int(instanceID)}) if err := s.applyStatement(ctx, stmt, id.GetValue()); err != nil { s.metrics.WorkflowMetrics.WorkflowEventsDeleteFail.Inc(1) return err } s.metrics.WorkflowMetrics.WorkflowEventsDelete.Inc(1) return nil } // TODO determine if this function should be part of storage or api handler. // cleanupPreviousUpdatesForJob cleans up the old job configurations // and updates. This is called when a new update is created, and ensures // that the number of configurations and updates in the DB do not keep // increasing continuously. func (s *Store) cleanupPreviousUpdatesForJob( ctx context.Context, jobID *peloton.JobID) error { var updateList []*SortUpdateInfo var nonUpdateList []*SortUpdateInfo // first fetch the updates for the job updates, err := s.GetUpdatesForJob(ctx, jobID.GetValue()) if err != nil { return err } for _, updateID := range updates { var allResults []map[string]interface{} // get the job configuration version queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("job_config_version").From(updatesTable). Where(qb.Eq{"update_id": updateID.GetValue()}) allResults, err = s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("update_id", updateID.GetValue()). Info("failed to get job config version") continue } for _, value := range allResults { var record UpdateRecord if err := FillObject(value, &record, reflect.TypeOf(record)); err != nil { log.WithError(err). WithField("update_id", updateID.GetValue()). Info("failed to fill the update record") continue } // sort as per the job configuration version updateInfo := &SortUpdateInfo{ updateID: updateID, jobConfigVersion: uint64(record.JobConfigVersion), } if record.Type == models.WorkflowType_UPDATE.String() { updateList = append(updateList, updateInfo) } else { nonUpdateList = append(nonUpdateList, updateInfo) } } } // updates and non-updates are handled separately. Each category would keep // up to Conf.MaxUpdatesPerJob if len(updateList) > s.Conf.MaxUpdatesPerJob { sort.Sort(sort.Reverse(SortedUpdateList(updateList))) for _, u := range updateList[s.Conf.MaxUpdatesPerJob:] { // delete the old job and task configurations, and then the update s.DeleteUpdate(ctx, u.updateID, jobID, u.jobConfigVersion) } } if len(nonUpdateList) > s.Conf.MaxUpdatesPerJob { sort.Sort(sort.Reverse(SortedUpdateList(nonUpdateList))) for _, u := range nonUpdateList[s.Conf.MaxUpdatesPerJob:] { // delete the old job and task configurations, and then the update s.DeleteUpdate(ctx, u.updateID, jobID, u.jobConfigVersion) } } return nil } // DeleteUpdate deletes the update from the update_info table and deletes all // job and task configurations created for the update. func (s *Store) DeleteUpdate( ctx context.Context, updateID *peloton.UpdateID, jobID *peloton.JobID, jobConfigVersion uint64) error { // first delete the task and job configurations created for this update if err := s.deleteJobConfigVersion(ctx, jobID, jobConfigVersion); err != nil { return err } // next clean up the update from the update_info table return s.deleteSingleUpdate(ctx, updateID) } // GetUpdate fetches the job update stored in the DB. func (s *Store) GetUpdate(ctx context.Context, id *peloton.UpdateID) ( *models.UpdateModel, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*").From(updatesTable). Where(qb.Eq{"update_id": id.GetValue()}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("update_id", id.GetValue()). Info("failed to get job update") s.metrics.UpdateMetrics.UpdateGetFail.Inc(1) return nil, err } for _, value := range allResults { var record UpdateRecord if err = FillObject(value, &record, reflect.TypeOf(record)); err != nil { s.metrics.UpdateMetrics.UpdateGetFail.Inc(1) return nil, err } updateConfig, err := record.GetUpdateConfig() if err != nil { s.metrics.UpdateMetrics.UpdateGetFail.Inc(1) return nil, err } updateInfo := &models.UpdateModel{ UpdateID: id, UpdateConfig: updateConfig, JobID: &peloton.JobID{Value: record.JobID.String()}, JobConfigVersion: uint64(record.JobConfigVersion), PrevJobConfigVersion: uint64(record.PrevJobConfigVersion), State: update.State(update.State_value[record.State]), PrevState: update.State(update.State_value[record.PrevState]), Type: models.WorkflowType(models.WorkflowType_value[record.Type]), InstancesTotal: uint32(record.InstancesTotal), InstancesAdded: record.GetInstancesAdded(), InstancesUpdated: record.GetInstancesUpdated(), InstancesRemoved: record.GetInstancesRemoved(), InstancesFailed: uint32(record.InstancesFailed), InstancesDone: uint32(record.InstancesDone), InstancesCurrent: record.GetProcessingInstances(), CreationTime: record.CreationTime.Format(time.RFC3339Nano), UpdateTime: record.UpdateTime.Format(time.RFC3339Nano), OpaqueData: &peloton.OpaqueData{Data: record.OpaqueData}, CompletionTime: record.CompletionTime, } s.metrics.UpdateMetrics.UpdateGet.Inc(1) return updateInfo, nil } s.metrics.UpdateMetrics.UpdateGetFail.Inc(1) return nil, yarpcerrors.NotFoundErrorf("update not found") } // deleteSingleUpdate deletes a given update from following tables // - pod_workflow_events table for all instances included in the update // - job_update_events table for update state change events // - update_info table func (s *Store) deleteSingleUpdate(ctx context.Context, id *peloton.UpdateID) error { update, err := s.GetUpdate(ctx, id) if err != nil { s.metrics.UpdateMetrics.UpdateDeleteFail.Inc(1) log.WithFields(log.Fields{ "update_id": id.GetValue(), }).WithError(err).Info("failed to get update for deleting workflow events") return err } instances := append(update.GetInstancesUpdated(), update.GetInstancesAdded()...) instances = append(instances, update.GetInstancesRemoved()...) for _, instance := range instances { if err := s.deleteWorkflowEvents(ctx, id, instance); err != nil { log.WithFields(log.Fields{ "update_id": id.GetValue(), "instance_id": instance, }).WithError(err).Info("failed to delete workflow events") s.metrics.UpdateMetrics.UpdateDeleteFail.Inc(1) return err } } if err := s.deleteJobUpdateEvents(ctx, id); err != nil { log.WithFields(log.Fields{ "update_id": id.GetValue(), }).WithError(err).Info("failed to delete job update events") s.metrics.UpdateMetrics.UpdateDeleteFail.Inc(1) return err } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Delete(updatesTable).Where(qb.Eq{ "update_id": id.GetValue()}) if err := s.applyStatement(ctx, stmt, id.GetValue()); err != nil { log.WithError(err). WithField("update_id", id.GetValue()). Info("failed to delete the update") s.metrics.UpdateMetrics.UpdateDeleteFail.Inc(1) return err } s.metrics.UpdateMetrics.UpdateDelete.Inc(1) return nil } // deleteJobConfigVersion deletes the job and task configurations for a given // job identifier and a configuration version. func (s *Store) deleteJobConfigVersion( ctx context.Context, jobID *peloton.JobID, version uint64) error { queryBuilder := s.DataStore.NewQuery() // next delete the job configuration stmt := queryBuilder.Delete(jobConfigTable).Where(qb.Eq{ "job_id": jobID.GetValue(), "version": version}) err := s.applyStatement(ctx, stmt, jobID.GetValue()) if err != nil { log.WithError(err). WithField("job_id", jobID.GetValue()). WithField("version", version). Info("failed to delete the job configuration") } return err } // WriteUpdateProgress writes the progress of the job update to the DB. // The inputs to this function are the only mutable fields in update. func (s *Store) WriteUpdateProgress( ctx context.Context, updateInfo *models.UpdateModel) error { updateTime, err := time.Parse(time.RFC3339Nano, updateInfo.GetUpdateTime()) if err != nil { return errors.Wrap(yarpcerrors.InvalidArgumentErrorf(err.Error()), "fail to parse updateTime") } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Update(updatesTable). Set("update_time", updateTime.UTC()) // updateInfo can either have updateTime set only, or // set State, PreState and other fields altogether. // For now, there is no better way to differentiate which fields // are set especially for slice fields. if updateInfo.GetState() != update.State_INVALID { stmt = stmt.Set("update_state", updateInfo.GetState().String()). Set("update_prev_state", updateInfo.GetPrevState().String()). Set("instances_done", updateInfo.GetInstancesDone()). Set("instances_failed", updateInfo.GetInstancesFailed()). Set("instances_current", updateInfo.GetInstancesCurrent()) } if updateInfo.GetOpaqueData() != nil { stmt = stmt.Set("opaque_data", updateInfo.GetOpaqueData().GetData()) } if len(updateInfo.GetCompletionTime()) != 0 { stmt = stmt.Set("completion_time", updateInfo.GetCompletionTime()) } stmt = stmt.Where(qb.Eq{"update_id": updateInfo.GetUpdateID().GetValue()}) if err := s.applyStatement( ctx, stmt, updateInfo.GetUpdateID().GetValue()); err != nil { log.WithError(err). WithFields(log.Fields{ "update_id": updateInfo.GetUpdateID().GetValue(), "update_state": updateInfo.GetState().String(), "update_prev_state": updateInfo.GetPrevState().String(), "update_instances_done": updateInfo.GetInstancesDone(), "update_instances_failed": updateInfo.GetInstancesFailed(), }).Info("modify update in DB failed") s.metrics.UpdateMetrics.UpdateWriteProgressFail.Inc(1) return err } s.metrics.UpdateMetrics.UpdateWriteProgress.Inc(1) return nil } // ModifyUpdate modify the progress of an update, // instances to update/remove/add and the job config version func (s *Store) ModifyUpdate( ctx context.Context, updateInfo *models.UpdateModel) error { updateTime, err := time.Parse(time.RFC3339Nano, updateInfo.GetUpdateTime()) if err != nil { return errors.Wrap(yarpcerrors.InvalidArgumentErrorf(err.Error()), "fail to parse updateTime") } queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Update(updatesTable). Set("update_state", updateInfo.GetState().String()). Set("update_prev_state", updateInfo.GetPrevState().String()). Set("instances_done", updateInfo.GetInstancesDone()). Set("instances_failed", updateInfo.GetInstancesFailed()). Set("instances_current", updateInfo.GetInstancesCurrent()). Set("instances_added", updateInfo.GetInstancesAdded()). Set("instances_updated", updateInfo.GetInstancesUpdated()). Set("instances_removed", updateInfo.GetInstancesRemoved()). Set("instances_total", updateInfo.GetInstancesTotal()). Set("job_config_version", updateInfo.GetJobConfigVersion()). Set("job_config_prev_version", updateInfo.GetPrevJobConfigVersion()). Set("update_time", updateTime.UTC()) if updateInfo.GetOpaqueData() != nil { stmt = stmt.Set("opaque_data", updateInfo.GetOpaqueData().GetData()) } stmt = stmt.Where(qb.Eq{"update_id": updateInfo.GetUpdateID().GetValue()}) if err := s.applyStatement( ctx, stmt, updateInfo.GetUpdateID().GetValue()); err != nil { log.WithError(err). WithFields(log.Fields{ "update_id": updateInfo.GetUpdateID().GetValue(), "update_state": updateInfo.GetState().String(), "update_prev_state": updateInfo.GetPrevState().String(), "update_instances_done": updateInfo.GetInstancesDone(), "update_instances_failed": updateInfo.GetInstancesFailed(), }).Info("write update progress in DB failed") s.metrics.UpdateMetrics.UpdateWriteProgressFail.Inc(1) return err } s.metrics.UpdateMetrics.UpdateWriteProgress.Inc(1) return nil } // GetUpdateProgress fetches the job update progress, which includes the // instances already updated, instances being updated and the current // state of the update. func (s *Store) GetUpdateProgress(ctx context.Context, id *peloton.UpdateID) ( *models.UpdateModel, error) { queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("*").From(updatesTable). Where(qb.Eq{"update_id": id.GetValue()}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("update_id", id.GetValue()). Info("failed to get job update") s.metrics.UpdateMetrics.UpdateGetProgessFail.Inc(1) return nil, err } for _, value := range allResults { var record UpdateRecord if err = FillObject(value, &record, reflect.TypeOf(record)); err != nil { s.metrics.UpdateMetrics.UpdateGetProgessFail.Inc(1) return nil, err } updateInfo := &models.UpdateModel{ UpdateID: id, State: update.State(update.State_value[record.State]), PrevState: update.State(update.State_value[record.PrevState]), InstancesTotal: uint32(record.InstancesTotal), InstancesDone: uint32(record.InstancesDone), InstancesFailed: uint32(record.InstancesFailed), InstancesCurrent: record.GetProcessingInstances(), UpdateTime: record.UpdateTime.Format(time.RFC3339Nano), CompletionTime: record.CompletionTime, } s.metrics.UpdateMetrics.UpdateGetProgess.Inc(1) return updateInfo, nil } s.metrics.UpdateMetrics.UpdateGetProgessFail.Inc(1) return nil, yarpcerrors.NotFoundErrorf("update not found") } // GetUpdatesForJob returns the list of job updates created for a given job. func (s *Store) GetUpdatesForJob( ctx context.Context, jobID string, ) ([]*peloton.UpdateID, error) { var updateIDs []*peloton.UpdateID var updateList []*SortUpdateInfoTS queryBuilder := s.DataStore.NewQuery() stmt := queryBuilder.Select("update_id", "job_id", "creation_time"). From(updatesByJobView). Where(qb.Eq{"job_id": jobID}) allResults, err := s.executeRead(ctx, stmt) if err != nil { log.WithError(err). WithField("job_id", jobID). Info("failed to fetch updates for a given job") s.metrics.UpdateMetrics.UpdateGetForJobFail.Inc(1) return nil, err } for _, value := range allResults { var record UpdateViewRecord err := FillObject(value, &record, reflect.TypeOf(record)) if err != nil { log.WithError(err). WithField("job_id", jobID). Info("failed to fill update record for the job") s.metrics.UpdateMetrics.UpdateGetForJobFail.Inc(1) return nil, err } // sort as per the job configuration version updateInfo := &SortUpdateInfoTS{ updateID: &peloton.UpdateID{Value: record.UpdateID.String()}, createTime: record.CreationTime, } updateList = append(updateList, updateInfo) } sort.Sort(sort.Reverse(SortedUpdateListTS(updateList))) for _, update := range updateList { updateIDs = append(updateIDs, update.updateID) } s.metrics.UpdateMetrics.UpdateGetForJob.Inc(1) return updateIDs, nil } func parseTime(v string) time.Time { r, err := time.Parse(time.RFC3339Nano, v) if err != nil { return time.Time{} } return r } // If a BATCH job is in active state for more than a threshold of time, it is // possible that the lucene index is out of sync with the job_index table so we // can read job summary from job_index table for such jobs. This function goes // through a list of job summary and looks for such stale jobs. If the query is // for only active jobs, the stale jobs are skipped from the job summary list // and a new list is returned. func (s *Store) reconcileStaleBatchJobsFromJobSummaryList( ctx context.Context, summaryList []*job.JobSummary, queryTerminalStates bool) ([]*job.JobSummary, error) { newSummaryList := []*job.JobSummary{} var err error for _, summary := range summaryList { if summary.GetType() == job.JobType_BATCH && !util.IsPelotonJobStateTerminal(summary.GetRuntime().GetState()) && time.Since( parseTime(summary.GetRuntime().GetCreationTime()), ) > common.StaleJobStateDurationThreshold { // get job summary from DB table instead of index summary, err = s.getJobSummaryFromIndex(ctx, summary.Id) if err != nil { return nil, err } if util.IsPelotonJobStateTerminal( summary.GetRuntime().GetState()) && !queryTerminalStates { // Since now the job shows up as terminal, we can conclude // that lucene index entry for this job is stale. Because // the query is for getting active jobs only, we can skip // this job entry. continue } } newSummaryList = append(newSummaryList, summary) } return newSummaryList, nil } func (s *Store) getJobSummaryFromResultMap( ctx context.Context, allResults []map[string]interface{}, ) ([]*job.JobSummary, error) { var summaryResults []*job.JobSummary for _, value := range allResults { summary := &job.JobSummary{} id, ok := value["job_id"].(qb.UUID) if !ok { return nil, yarpcerrors.InternalErrorf( "invalid job_id %v", value["job_id"]) } summary.Id = &peloton.JobID{Value: id.String()} if name, ok := value["name"].(string); ok { summary.Name = name } if runtimeInfo, ok := value["runtime_info"].(string); ok { err := json.Unmarshal([]byte(runtimeInfo), &summary.Runtime) if err != nil { log.WithError(err). WithField("runtime_info", runtimeInfo). Info("failed to unmarshal runtime info") } } if owningTeam, ok := value["owner"].(string); ok { summary.Owner = owningTeam summary.OwningTeam = owningTeam } if instcnt, ok := value["instance_count"].(int); ok { summary.InstanceCount = uint32(instcnt) } if jobType, ok := value["job_type"].(int); ok { summary.Type = job.JobType(jobType) } if respoolIDStr, ok := value["respool_id"].(string); ok { summary.RespoolID = &peloton.ResourcePoolID{Value: respoolIDStr} } if labelBuffer, ok := value["labels"].(string); ok { err := json.Unmarshal([]byte(labelBuffer), &summary.Labels) if err != nil { log.WithError(err). WithField("labels", labelBuffer). Info("failed to unmarshal labels") } } summaryResults = append(summaryResults, summary) } return summaryResults, nil } // SortedTaskInfoList makes TaskInfo implement sortable interface type SortedTaskInfoList []*task.TaskInfo func (a SortedTaskInfoList) Len() int { return len(a) } func (a SortedTaskInfoList) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a SortedTaskInfoList) Less(i, j int) bool { return a[i].InstanceId < a[j].InstanceId } // SortUpdateInfo is the structure used by the sortable interface for // updates, where the sorting will be done according to the job configuration // version for a given job. type SortUpdateInfo struct { updateID *peloton.UpdateID jobConfigVersion uint64 } // SortedUpdateList implements a sortable interface for updates according // to the job configuration versions for a given job. type SortedUpdateList []*SortUpdateInfo func (u SortedUpdateList) Len() int { return len(u) } func (u SortedUpdateList) Swap(i, j int) { u[i], u[j] = u[j], u[i] } func (u SortedUpdateList) Less(i, j int) bool { return u[i].jobConfigVersion < u[j].jobConfigVersion } // SortUpdateInfoTS is the structure used by the sortable interface for // updates, where the sorting will be done according to the update create // timestamp for a given job. type SortUpdateInfoTS struct { updateID *peloton.UpdateID createTime time.Time } // SortedUpdateListTS implements a sortable interface for updates according // to the create time for a given job. type SortedUpdateListTS []*SortUpdateInfoTS func (u SortedUpdateListTS) Len() int { return len(u) } func (u SortedUpdateListTS) Swap(i, j int) { u[i], u[j] = u[j], u[i] } func (u SortedUpdateListTS) Less(i, j int) bool { return u[i].createTime.UnixNano() < u[j].createTime.UnixNano() }