async function readStateCheckOngoingLRO()

in src/scaler/scaler-core/index.js [594:779]


async function readStateCheckOngoingLRO(cluster, autoscalerState) {
  const savedState = await autoscalerState.get();

  if (!savedState?.scalingOperationId) {
    // no LRO ongoing.
    return savedState;
  }

  try {
    const [operationState] = await getOperationState(
      savedState.scalingOperationId,
      cluster.engine,
    );

    if (!operationState) {
      throw new Error(
        `GetOperation(${savedState.scalingOperationId}) returned no results`,
      );
    }

    /** @type {RedisClusterProtos.google.cloud.redis.cluster.v1.OperationMetadata|MemorystoreProtos.google.cloud.memorystore.v1.OperationMetadata} */
    let metadata;
    try {
      if (
        operationState.metadata?.value == null ||
        (operationState.metadata?.type_url !==
          RedisClusterProtos.google.cloud.redis.cluster.v1.OperationMetadata.getTypeUrl() &&
          operationState.metadata?.type_url !==
            MemorystoreProtos.google.cloud.memorystore.v1.OperationMetadata.getTypeUrl())
      ) {
        throw new Error('no metadata in response');
      }
      if (cluster.engine === AutoscalerEngine.REDIS) {
        metadata =
          RedisClusterProtos.google.cloud.redis.cluster.v1.OperationMetadata.decode(
            /** @type {any} */ (operationState.metadata).value,
          );
      } else if (cluster.engine === AutoscalerEngine.VALKEY) {
        metadata =
          MemorystoreProtos.google.cloud.memorystore.v1.OperationMetadata.decode(
            /** @type {any} */ (operationState.metadata).value,
          );
      } else {
        throw new Error(
          `Unknown engine for operation metadata decode: ${cluster.engine}`,
        );
      }
    } catch (e) {
      throw new Error(
        `GetOperation(${savedState.scalingOperationId}) could not decode OperationMetadata: ${e}`,
      );
    }

    const createTimeMillis =
      metadata.createTime?.seconds == null || metadata.createTime.nanos == null
        ? 0
        : Number(metadata.createTime.seconds) * 1000 +
          Math.floor(metadata.createTime.nanos / 1_000_000);
    const createTimeStamp = new Date(createTimeMillis).toISOString();
    const endTimeMillis =
      metadata.endTime?.seconds == null || metadata.endTime.nanos == null
        ? 0
        : Number(metadata.endTime.seconds) * 1000 +
          Math.floor(metadata.endTime.nanos / 1_000_000);
    const endTimeStamp = new Date(endTimeMillis).toISOString();

    if (operationState.done) {
      if (!operationState.error) {
        // Completed successfully.
        logger.info({
          message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for size ${savedState.scalingRequestedSize} SUCCEEDED. Started: ${createTimeStamp}, completed: ${endTimeStamp}`,
          projectId: cluster.projectId,
          regionId: cluster.regionId,
          clusterId: cluster.clusterId,
          payload: cluster,
        });

        // Set completion time in SavedState
        if (endTimeMillis) {
          savedState.lastScalingCompleteTimestamp = endTimeMillis;
        } else {
          // invalid end date, assume start date...
          logger.warn(
            `Failed to parse operation endTime : ${metadata.endTime}`,
          );
          savedState.lastScalingCompleteTimestamp =
            savedState.lastScalingTimestamp;
        }

        // Record success counters.
        await Counters.recordScalingDuration(
          savedState.lastScalingCompleteTimestamp -
            savedState.lastScalingTimestamp,
          cluster,
          savedState.scalingRequestedSize || 0,
          savedState.scalingPreviousSize,
          savedState.scalingMethod,
        );
        await Counters.incScalingSuccessCounter(
          cluster,
          savedState.scalingRequestedSize || 0,
          savedState.scalingPreviousSize,
          savedState.scalingMethod,
        );

        // Clear operation frm savedState
        savedState.scalingOperationId = null;
        savedState.scalingRequestedSize = null;
        savedState.scalingPreviousSize = null;
        savedState.scalingMethod = null;
      } else {
        // Last operation failed with an error
        logger.error({
          message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for size ${savedState.scalingRequestedSize} FAILED: ${operationState.error?.message}. Started: ${createTimeStamp}, completed: ${endTimeStamp}`,
          projectId: cluster.projectId,
          regionId: cluster.regionId,
          clusterId: cluster.clusterId,
          error: operationState.error,
          payload: cluster,
        });

        await Counters.incScalingFailedCounter(
          cluster,
          savedState.scalingRequestedSize || 0,
          savedState.scalingPreviousSize,
          savedState.scalingMethod,
        );

        // Clear last scaling operation from savedState.
        savedState.scalingOperationId = null;
        savedState.scalingRequestedSize = null;
        savedState.lastScalingCompleteTimestamp = 0;
        savedState.lastScalingTimestamp = 0;
        savedState.scalingPreviousSize = null;
        savedState.scalingMethod = null;
      }
    } else {
      if (!!metadata.requestedCancellation) {
        logger.info({
          message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for ${savedState.scalingRequestedSize} CANCEL REQUESTED. Started: ${createTimeStamp}`,
          projectId: cluster.projectId,
          regionId: cluster.regionId,
          clusterId: cluster.clusterId,
          payload: cluster,
        });
      } else {
        logger.info({
          message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for ${savedState.scalingRequestedSize} IN PROGRESS. Started: ${createTimeStamp}`,
          projectId: cluster.projectId,
          regionId: cluster.regionId,
          clusterId: cluster.clusterId,
          payload: cluster,
        });
      }
    }
  } catch (err) {
    // Fallback - LRO.get() API failed or returned invalid status.
    // Assume complete.
    logger.error({
      message: `Failed to retrieve state of operation, assume completed. ID: ${savedState.scalingOperationId}: ${err}`,
      err: err,
    });
    savedState.lastScalingCompleteTimestamp = savedState.lastScalingTimestamp;
    savedState.scalingOperationId = null;
    // Record success counters.
    await Counters.recordScalingDuration(
      savedState.lastScalingCompleteTimestamp - savedState.lastScalingTimestamp,
      cluster,
      savedState.scalingRequestedSize || 0,
      savedState.scalingPreviousSize,
      savedState.scalingMethod,
    );
    await Counters.incScalingSuccessCounter(
      cluster,
      savedState.scalingRequestedSize || 0,
      savedState.scalingPreviousSize,
      savedState.scalingMethod,
    );
    savedState.scalingRequestedSize = null;
    savedState.scalingPreviousSize = null;
    savedState.scalingMethod = null;
  }
  // Update saved state in storage.
  await autoscalerState.updateState(savedState);
  return savedState;
}