func NewOperationContainerHandler()

in operationsbus/handlers.go [179:264]


func NewOperationContainerHandler(errHandler ErrorHandlerFunc, operationContainer oc.OperationContainerClient) ErrorHandlerFunc {
	return func(ctx context.Context, settler shuttle.MessageSettler, message *azservicebus.ReceivedMessage) error {
		logger := ctxlogger.GetLogger(ctx)

		var body OperationRequest
		err := json.Unmarshal(message.Body, &body)
		if err != nil {
			logger.Error("OperationContainerHandler: Error unmarshalling message: " + err.Error())
			return nil
		}

		var updateOperationStatusRequest *oc.UpdateOperationStatusRequest
		// If the operation is picked up immediately from the service bus, while the operationContainer is still putting the
		// operation into the hcp and operations databases, this step might fail if both databases have not been updated.
		// Allowing a couple of retries before fully failing the operation due to this error.
		opInProgress := false
		for i := 0; i < 5; i++ {
			// err = operationContainer.OperationInProgress(ctx, body.OperationId)
			updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
				OperationId: body.OperationId,
				Status:      oc.Status_IN_PROGRESS,
			}
			_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
			if err != nil {
				logger.Error("OperationContainerHandler: Error setting operation in progress: " + err.Error())
				logger.Info("Trying again.")
			} else {
				opInProgress = true
				break
			}
		}

		if !opInProgress {
			logger.Error("Operation was not able to be put in progress.")
			return err
		}

		err = errHandler.Handle(ctx, settler, message)

		if err != nil {
			logger.Info("OperationContainerHandler: Handling error: " + err.Error())
			switch err.(type) {
			case *NonRetryError:
				// Cancel the operation
				logger.Info("OperationContainerHandler: Setting operation as Cancelled.")
				// err = operationContainerOperationCancel(ctx, body.OperationId)
				updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
					OperationId: body.OperationId,
					Status:      oc.Status_CANCELLED,
				}
				_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
				if err != nil {
					logger.Error("OperationContainerHandler: Something went wrong setting the operation as Cancelled" + err.Error())
					return err
				}
			case *RetryError:
				// Set the operation as Pending
				logger.Info("OperationContainerHandler: Setting operation as Pending.")
				updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
					OperationId: body.OperationId,
					Status:      oc.Status_PENDING,
				}
				_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
				if err != nil {
					logger.Error("OperationContainerHandler: Something went wrong setting the operation as Pending:" + err.Error())
					return err
				}
			default:
				logger.Info("OperationContainerHandler: Error type not recognized. Operation status not changed.")
			}
		} else {
			logger.Info("Setting Operation as Successful.")
			updateOperationStatusRequest = &oc.UpdateOperationStatusRequest{
				OperationId: body.OperationId,
				Status:      oc.Status_COMPLETED,
			}
			_, err = operationContainer.UpdateOperationStatus(ctx, updateOperationStatusRequest)
			if err != nil {
				logger.Error("OperationContainerHandler: Something went wrong setting the operation as Completed: " + err.Error())
				return err
			}
		}

		return err
	}
}