server/internal/async/operations/longRunningOperation/longRunning.go (49 lines of code) (raw):
package longRunningOperation
import (
"context"
"errors"
"time"
opbus "github.com/Azure/aks-async/operationsbus"
"github.com/Azure/aks-middleware/grpc/server/ctxlogger"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/grpc/codes"
)
// Setting the variable to ensure all functions of the ApiOperation interface are implemented.
var _ opbus.ApiOperation = &LongRunningOperation{}
type LongRunningOperation struct {
Name string
Operation opbus.OperationRequest
LroEntity *LongRunningEntity
OperationId string
ExpirationTimestamp *timestamppb.Timestamp
}
var CreateLroEntityFunc opbus.EntityFactoryFunc = func(id string) opbus.Entity {
return NewLongRunningEntity(id)
}
func (lro *LongRunningOperation) InitOperation(ctx context.Context, opRequest opbus.OperationRequest) (opbus.ApiOperation, error) {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Initializing LongRunningOperation")
lro.Operation = opRequest
lro.Name = opRequest.OperationName
lro.OperationId = opRequest.OperationId
return nil, nil
}
func (lro *LongRunningOperation) Run(ctx context.Context) error {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Running the long running operation!")
// Logic for running the operation
time.Sleep(20 * time.Second)
logger.Info("Finished running the long running operation.")
return nil
}
func (lro *LongRunningOperation) GuardConcurrency(ctx context.Context, entity opbus.Entity) *opbus.CategorizedError {
logger := ctxlogger.GetLogger(ctx)
logger.Info("Guarding concurrency for operation.")
if latestOperationId := entity.GetLatestOperationID(); lro.OperationId != latestOperationId {
err := errors.New("OperaionId and LastOperationId don't match!")
ce := opbus.NewCategorizedError(err.Error(), "", int(codes.Canceled), err)
return ce
}
return nil
}
func (lro *LongRunningOperation) GetOperationRequest() *opbus.OperationRequest {
return &lro.Operation
}