pkg/jobmgr/updatesvc/handler.go (379 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package updatesvc
import (
"context"
"time"
"github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
"github.com/uber/peloton/.gen/peloton/api/v0/update"
"github.com/uber/peloton/.gen/peloton/api/v0/update/svc"
"github.com/uber/peloton/.gen/peloton/private/models"
"github.com/uber/peloton/pkg/common"
versionutil "github.com/uber/peloton/pkg/common/util/entityversion"
"github.com/uber/peloton/pkg/jobmgr/cached"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
jobutil "github.com/uber/peloton/pkg/jobmgr/util/job"
"github.com/uber/peloton/pkg/storage"
ormobjects "github.com/uber/peloton/pkg/storage/objects"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
// InitServiceHandler initalizes the update service.
func InitServiceHandler(
d *yarpc.Dispatcher,
parent tally.Scope,
ormStore *ormobjects.Store,
updateStore storage.UpdateStore,
goalStateDriver goalstate.Driver,
jobFactory cached.JobFactory,
) {
handler := &serviceHandler{
jobConfigOps: ormobjects.NewJobConfigOps(ormStore),
jobRuntimeOps: ormobjects.NewJobRuntimeOps(ormStore),
updateStore: updateStore,
goalStateDriver: goalStateDriver,
jobFactory: jobFactory,
metrics: NewMetrics(parent.SubScope("jobmgr").SubScope("update")),
}
d.Register(svc.BuildUpdateServiceYARPCProcedures(handler))
}
// serviceHandler implements peloton.api.update.svc
type serviceHandler struct {
jobConfigOps ormobjects.JobConfigOps
jobRuntimeOps ormobjects.JobRuntimeOps
updateStore storage.UpdateStore
goalStateDriver goalstate.Driver
jobFactory cached.JobFactory
metrics *Metrics
}
// validateJobConfigUpdate validates that the job configuration
// update is valid.
func (h *serviceHandler) validateJobConfigUpdate(
ctx context.Context,
jobID *peloton.JobID,
prevJobConfig *job.JobConfig,
newJobConfig *job.JobConfig) error {
// Ensure that the changelog is present
if newJobConfig.GetChangeLog() == nil {
return yarpcerrors.InvalidArgumentErrorf(
"missing changelog in job configuration")
}
// job type is immutable
if newJobConfig.GetType() != prevJobConfig.GetType() {
return yarpcerrors.InvalidArgumentErrorf("job type is immutable")
}
// resource pool identifier is immutable
if newJobConfig.GetRespoolID().GetValue() !=
prevJobConfig.GetRespoolID().GetValue() {
return yarpcerrors.InvalidArgumentErrorf(
"resource pool identifier is immutable")
}
return nil
}
// validateJobRuntime validates that the job state allows updating it
func (h *serviceHandler) validateJobRuntime(jobRuntime *job.RuntimeInfo) error {
// cannot update a job which is still being created
if jobRuntime.GetState() == job.JobState_INITIALIZED {
return yarpcerrors.UnavailableErrorf(
"cannot update partially created job")
}
return nil
}
// Create creates an update for a given job ID.
func (h *serviceHandler) CreateUpdate(
ctx context.Context,
req *svc.CreateUpdateRequest) (*svc.CreateUpdateResponse, error) {
h.metrics.UpdateAPICreate.Inc(1)
jobID := req.GetJobId()
pelotonJobID := &peloton.JobID{Value: jobID.GetValue()}
jobUUID := uuid.Parse(jobID.GetValue())
if jobUUID == nil {
// job uuid is not a uuid
h.metrics.UpdateCreateFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf(
"JobID must be of UUID format")
}
if req.GetUpdateConfig().GetInPlace() {
return nil, yarpcerrors.UnimplementedErrorf("in-place update is not supported yet")
}
// Validate that the job does exist
jobRuntime, err := h.jobRuntimeOps.Get(ctx, pelotonJobID)
if err != nil {
h.metrics.UpdateCreateFail.Inc(1)
return nil, yarpcerrors.NotFoundErrorf("job not found")
}
// validate the job is in a state where it can be updated
if err := h.validateJobRuntime(jobRuntime); err != nil {
h.metrics.UpdateCreateFail.Inc(1)
return nil, err
}
jobConfig := req.GetJobConfig()
// Get previous job configuration
prevJobConfig, prevConfigAddOn, err := h.jobConfigOps.Get(
ctx,
pelotonJobID,
jobRuntime.GetConfigurationVersion(),
)
if err != nil {
h.metrics.UpdateCreateFail.Inc(1)
return nil, err
}
// check that job type is service
if prevJobConfig.GetType() != job.JobType_SERVICE {
h.metrics.UpdateCreateFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf(
"job must be of type service")
}
// validate the new configuration
if err = h.validateJobConfigUpdate(
ctx, jobID, prevJobConfig, jobConfig); err != nil {
h.metrics.UpdateCreateFail.Inc(1)
return nil, err
}
var respoolPath string
for _, label := range prevConfigAddOn.GetSystemLabels() {
if label.GetKey() == common.SystemLabelResourcePool {
respoolPath = label.GetValue()
}
}
configAddOn := &models.ConfigAddOn{
SystemLabels: jobutil.ConstructSystemLabels(jobConfig, respoolPath),
}
// add this new update to cache and DB
cachedJob := h.jobFactory.AddJob(jobID)
updateID, _, err := cachedJob.CreateWorkflow(
ctx,
models.WorkflowType_UPDATE,
req.GetUpdateConfig(),
versionutil.GetJobEntityVersion(
jobRuntime.GetConfigurationVersion(),
jobRuntime.GetDesiredStateVersion(),
jobRuntime.GetWorkflowVersion()),
cached.WithConfig(jobConfig, prevJobConfig, configAddOn, nil),
cached.WithOpaqueData(req.GetOpaqueData()),
)
if err != nil {
// In case of error, since it is not clear if job runtime was
// persisted with the update ID or not, enqueue the update to
// the goal state. If the update ID got persisted, update should
// start running, else, it should be aborted. Enqueueing it into
// the goal state will ensure both. In case the update was not
// persisted, clear the cache as well so that it is reloaded
// from DB and cleaned up.
h.metrics.UpdateCreateFail.Inc(1)
}
// Add update to goal state engine to start it
if len(updateID.GetValue()) > 0 {
h.goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now())
}
return &svc.CreateUpdateResponse{
UpdateID: updateID,
}, err
}
func (h *serviceHandler) GetUpdate(ctx context.Context, req *svc.GetUpdateRequest) (*svc.GetUpdateResponse, error) {
h.metrics.UpdateAPIGet.Inc(1)
updateID := req.GetUpdateId()
if updateID == nil {
h.metrics.UpdateGetFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf("no update ID provided")
}
if req.GetStatusOnly() {
// get only the status
updateModel, err := h.updateStore.GetUpdateProgress(ctx, updateID)
if err != nil {
h.metrics.UpdateGetFail.Inc(1)
return nil, err
}
updateInfo := &update.UpdateInfo{
UpdateId: updateID,
Status: &update.UpdateStatus{
NumTasksDone: updateModel.GetInstancesDone(),
NumTasksRemaining: updateModel.GetInstancesTotal() -
updateModel.GetInstancesDone() -
updateModel.GetInstancesFailed(),
NumTasksFailed: updateModel.GetInstancesFailed(),
State: updateModel.GetState(),
},
}
h.metrics.UpdateGet.Inc(1)
return &svc.GetUpdateResponse{
UpdateInfo: updateInfo,
}, nil
}
updateModel, err := h.updateStore.GetUpdate(ctx, updateID)
if err != nil {
h.metrics.UpdateGetFail.Inc(1)
return nil, err
}
updateInfo := &update.UpdateInfo{
UpdateId: updateID,
Config: updateModel.GetUpdateConfig(),
JobId: updateModel.GetJobID(),
ConfigVersion: updateModel.GetJobConfigVersion(),
PrevConfigVersion: updateModel.GetPrevJobConfigVersion(),
OpaqueData: updateModel.GetOpaqueData(),
Status: &update.UpdateStatus{
NumTasksDone: updateModel.GetInstancesDone(),
NumTasksRemaining: updateModel.GetInstancesTotal() -
updateModel.GetInstancesDone() -
updateModel.GetInstancesFailed(),
NumTasksFailed: updateModel.GetInstancesFailed(),
State: updateModel.GetState(),
},
}
h.metrics.UpdateGet.Inc(1)
return &svc.GetUpdateResponse{
UpdateInfo: updateInfo,
}, nil
}
func (h *serviceHandler) GetUpdateCache(ctx context.Context,
req *svc.GetUpdateCacheRequest) (*svc.GetUpdateCacheResponse, error) {
h.metrics.UpdateAPIGetCache.Inc(1)
cachedJob, err := h.getCachedJobWithUpdateID(ctx, req.GetUpdateId())
if err != nil {
return nil, err
}
workflow := cachedJob.GetWorkflow(req.GetUpdateId())
if workflow == nil {
return nil, yarpcerrors.NotFoundErrorf("update not found")
}
h.metrics.UpdateGetCache.Inc(1)
return &svc.GetUpdateCacheResponse{
JobId: workflow.JobID(),
State: workflow.GetState().State,
InstancesTotal: workflow.GetGoalState().Instances,
InstancesDone: workflow.GetInstancesDone(),
InstancesCurrent: workflow.GetInstancesCurrent(),
InstancesAdded: workflow.GetInstancesAdded(),
InstancesUpdated: workflow.GetInstancesUpdated(),
InstancesFailed: workflow.GetInstancesFailed(),
}, nil
}
func (h *serviceHandler) PauseUpdate(
ctx context.Context,
req *svc.PauseUpdateRequest,
) (*svc.PauseUpdateResponse, error) {
var err error
h.metrics.UpdateAPIPause.Inc(1)
cachedJob, err := h.getCachedJobWithUpdateID(ctx, req.GetUpdateId())
if err != nil {
h.metrics.UpdatePauseFail.Inc(1)
return nil, err
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
h.metrics.UpdatePauseFail.Inc(1)
return nil, err
}
if _, _, err = cachedJob.PauseWorkflow(
ctx,
versionutil.GetJobEntityVersion(
runtime.GetConfigurationVersion(),
runtime.GetDesiredStateVersion(),
runtime.GetWorkflowVersion()),
cached.WithOpaqueData(req.GetOpaqueData()),
); err != nil {
// In case of error, since it is not clear if job runtime was
// updated or not, enqueue the update to the goal state.
h.metrics.UpdatePauseFail.Inc(1)
} else {
h.metrics.UpdatePause.Inc(1)
}
h.goalStateDriver.EnqueueUpdate(cachedJob.ID(), req.GetUpdateId(), time.Now())
return &svc.PauseUpdateResponse{}, err
}
func (h *serviceHandler) ResumeUpdate(
ctx context.Context,
req *svc.ResumeUpdateRequest,
) (*svc.ResumeUpdateResponse, error) {
var err error
h.metrics.UpdateAPIResume.Inc(1)
cachedJob, err := h.getCachedJobWithUpdateID(ctx, req.GetUpdateId())
if err != nil {
h.metrics.UpdateResumeFail.Inc(1)
return nil, err
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
h.metrics.UpdateResumeFail.Inc(1)
return nil, err
}
if _, _, err = cachedJob.ResumeWorkflow(
ctx,
versionutil.GetJobEntityVersion(
runtime.GetConfigurationVersion(),
runtime.GetDesiredStateVersion(),
runtime.GetWorkflowVersion()),
cached.WithOpaqueData(req.GetOpaqueData()),
); err != nil {
// In case of error, since it is not clear if job runtime was
// updated or not, enqueue the update to the goal state.
h.metrics.UpdateResumeFail.Inc(1)
} else {
h.metrics.UpdateResume.Inc(1)
}
h.goalStateDriver.EnqueueUpdate(cachedJob.ID(), req.GetUpdateId(), time.Now())
return &svc.ResumeUpdateResponse{}, err
}
func (h *serviceHandler) ListUpdates(ctx context.Context,
req *svc.ListUpdatesRequest) (*svc.ListUpdatesResponse, error) {
var updates []*update.UpdateInfo
h.metrics.UpdateAPIList.Inc(1)
jobID := req.GetJobID()
if jobID == nil {
h.metrics.UpdateListFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf("no job ID provided")
}
updateIDs, err := h.updateStore.GetUpdatesForJob(ctx, jobID.GetValue())
if err != nil {
h.metrics.UpdateListFail.Inc(1)
return nil, err
}
for _, updateID := range updateIDs {
updateModel, err := h.updateStore.GetUpdate(ctx, updateID)
if err != nil {
h.metrics.UpdateListFail.Inc(1)
return nil, err
}
updateInfo := &update.UpdateInfo{
UpdateId: updateID,
Config: updateModel.GetUpdateConfig(),
JobId: updateModel.GetJobID(),
ConfigVersion: updateModel.GetJobConfigVersion(),
PrevConfigVersion: updateModel.GetPrevJobConfigVersion(),
Status: &update.UpdateStatus{
NumTasksDone: updateModel.GetInstancesDone(),
NumTasksRemaining: updateModel.GetInstancesTotal() -
updateModel.GetInstancesDone() -
updateModel.GetInstancesFailed(),
NumTasksFailed: updateModel.GetInstancesFailed(),
State: updateModel.GetState(),
},
}
updates = append(updates, updateInfo)
}
h.metrics.UpdateList.Inc(1)
return &svc.ListUpdatesResponse{
UpdateInfo: updates,
}, nil
}
func (h *serviceHandler) AbortUpdate(ctx context.Context,
req *svc.AbortUpdateRequest) (*svc.AbortUpdateResponse, error) {
h.metrics.UpdateAPIAbort.Inc(1)
cachedJob, err := h.getCachedJobWithUpdateID(ctx, req.GetUpdateId())
// TODO: what if the workflow in job is not what is intended to be aborted
if err != nil {
h.metrics.UpdateAbortFail.Inc(1)
return nil, err
}
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
h.metrics.UpdatePauseFail.Inc(1)
return nil, err
}
if _, _, err = cachedJob.AbortWorkflow(
ctx,
versionutil.GetJobEntityVersion(
runtime.GetConfigurationVersion(),
runtime.GetDesiredStateVersion(),
runtime.GetWorkflowVersion()),
cached.WithOpaqueData(req.GetOpaqueData()),
); err != nil {
h.metrics.UpdateAbortFail.Inc(1)
} else {
h.metrics.UpdateAbort.Inc(1)
}
h.goalStateDriver.EnqueueUpdate(cachedJob.ID(), req.GetUpdateId(), time.Now())
return &svc.AbortUpdateResponse{}, err
}
func (h *serviceHandler) RollbackUpdate(ctx context.Context,
req *svc.RollbackUpdateRequest) (*svc.RollbackUpdateResponse, error) {
return nil, yarpcerrors.UnimplementedErrorf(
"UpdateService.RollbackUpdate is not implemented")
}
func (h *serviceHandler) getCachedJobWithUpdateID(
ctx context.Context,
updateID *peloton.UpdateID,
) (cached.Job, error) {
if len(updateID.GetValue()) == 0 {
return nil, yarpcerrors.InvalidArgumentErrorf("no update ID provided")
}
updateModel, err := h.updateStore.GetUpdate(ctx, updateID)
if err != nil {
return nil, err
}
return h.jobFactory.AddJob(updateModel.GetJobID()), nil
}
// NewTestServiceHandler returns an empty new ServiceHandler ptr for testing.
func NewTestServiceHandler() *serviceHandler {
return &serviceHandler{}
}