oracle/pkg/database/lib/lro/job.go (178 lines of code) (raw):
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package lro
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
log "k8s.io/klog/v2"
"github.com/GoogleCloudPlatform/elcarro-oracle-operator/oracle/pkg/database/lib/detach"
)
const (
// TaskIDMetadataTag is the tag for the task ID in gRPC metadata.
TaskIDMetadataTag = "taskID"
taskTimeOutMetadataTag = "taskTimeoutSec"
// JobStartIndicator is a log line we use to identify when the job has been started.
JobStartIndicator = "CreateAndRunLROJobWithID: Create and run LRO job with id %v"
)
const (
completionStatusOK = "OK"
completionStatusError = "Error"
)
// Job represents a long running operation and its metadata.
type Job struct {
id string
name string
resp *anypb.Any
err error
lro *Server
call func(ctx context.Context) (proto.Message, error)
task *detach.Task
}
// Cancel cancels the job.
func (j *Job) Cancel() error {
log.Infof("Cancel: job [%s] is cancelled", j.id)
j.task.Cancel()
return nil
}
// Delete deletes the job.
// It's not implemented yet.
func (j *Job) Delete() error {
if j.IsDone() {
return nil
}
return status.Errorf(codes.Aborted, "Can't delete job with ID %q while it's still running", j.id)
}
// Status gets the current status of the job, returning error or response on completion.
func (j *Job) Status() (bool, *anypb.Any, error) {
if !j.IsDone() {
return false, nil, nil
}
if j.err != nil {
log.Errorf("Job %v failed with error=%v", j.id, j.err)
return true, nil, j.err
}
return true, j.resp, nil
}
// Wait waits for the job to complete or timeout.
func (j *Job) Wait(timeout time.Duration) error {
var err error
select {
case <-j.task.Finished():
log.Infof("Job with ID %q has finished", j.id)
err = nil
case <-time.After(timeout):
log.Infof("Job with ID %q has timed out", j.id)
err = status.Errorf(codes.DeadlineExceeded, "LRO job with ID %q didn't complete in time", j.id)
}
return err
}
// catchPanic catches the panic to prevent the program from being shut down, and properly handles
// the state of the job.
func catchPanic(j *Job, f func(context.Context)) func(context.Context) {
return func(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
e := fmt.Errorf("caught panic in agent execution. Panic Message: %v", r)
log.Error(e)
j.err = e
j.lro.EndOperation(j.id, completionStatusError)
}
}()
f(ctx)
}
}
func taskTimeout(context context.Context) (time.Duration, error) {
md, ok := metadata.FromIncomingContext(context)
if !ok {
return 0, fmt.Errorf("context has no timeout info")
}
data := md.Get(taskTimeOutMetadataTag)
if len(data) == 0 || data[0] == "" {
return 0, fmt.Errorf("fails to parse out the timeout info")
}
if len(data) > 1 {
log.Warningf("taskTimeout: More than one task id in the metadata %v", data)
}
return time.ParseDuration(fmt.Sprintf("%ss", data[0]))
}
// start uses detach.Go to start an async job.
func (j *Job) start(ctx context.Context) {
log.Infof("Start job with ID %s", j.id)
timeOutDuration, _ := taskTimeout(ctx)
task := detach.Go(catchPanic(j, func(jobCtx context.Context) {
var resp proto.Message
if timeOutDuration > 0 {
var cancel context.CancelFunc
jobCtx, cancel = context.WithTimeout(jobCtx, timeOutDuration)
defer cancel()
}
resp, j.err = j.call(jobCtx)
if resp == nil {
j.resp = nil
} else if any, ok := resp.(*anypb.Any); ok {
j.resp = any
} else {
any := &anypb.Any{}
if err := any.MarshalFrom(resp); err != nil {
j.err = status.Errorf(codes.Internal, "Failed to marshal response to any: %v", err)
}
j.resp = any
}
if j.err == nil {
j.lro.EndOperation(j.id, completionStatusOK)
} else {
j.lro.EndOperation(j.id, completionStatusError)
}
}))
j.task = &task
}
// IsDone returns whether the job is done.
func (j *Job) IsDone() bool {
select {
case <-j.task.Finished():
return true
default:
return false
}
}
// ID returns the ID of the job.
func (j *Job) ID() string {
return j.id
}
// Name returns the name of the job.
func (j *Job) Name() string {
return j.name
}
// CreateJobID creates a new job id based on uuid.
func CreateJobID() string {
return "Job" + "_" + uuid.New().String()
}
func addAndStartJob(ctx context.Context, lro *Server, job *Job) (*Job, error) {
if err := lro.AddJob(job.id, job); err != nil {
if status.Code(err) == codes.AlreadyExists {
log.Warningf("LRO with job id %q already exists", job.id)
return job, nil
}
return nil, fmt.Errorf("failed to add job for id=%v: %w", job.id, err)
}
log.Infof(JobStartIndicator, job.id)
job.start(ctx)
return job, nil
}
// CreateAndRunLROJobWithID creates an LRO job that can be cancelled.
// The method passed in is the main part of the call,
// adding into the job set and then starting it.
// It uses the given lro job id as the id.
var CreateAndRunLROJobWithID = func(ctx context.Context, id, name string, lro *Server, call func(ctx context.Context) (proto.Message, error)) (*Job, error) {
if id == "" {
id = CreateJobID()
}
job := &Job{
id: id,
name: name,
call: call,
lro: lro,
}
return addAndStartJob(ctx, lro, job)
}
// CreateAndRunLROJobWithContext creates an LRO job that can be cancelled.
// The method passed in is the main part of the call,
// adding into the job set and then starting it.
// It pulls the job id from grpc context.
func CreateAndRunLROJobWithContext(ctx context.Context, name string, lro *Server, call func(ctx context.Context) (proto.Message, error)) (*Job, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return CreateAndRunLROJobWithID(ctx, "", name, lro, call)
}
var id string
data := md.Get(TaskIDMetadataTag)
if len(data) > 0 && data[0] != "" {
if len(data) > 1 {
log.Warningf("More than one task id in the metadata %v", data)
}
id = data[0]
}
return CreateAndRunLROJobWithID(ctx, id, name, lro, call)
}