in src/scaler/scaler-core/index.js [594:779]
async function readStateCheckOngoingLRO(cluster, autoscalerState) {
const savedState = await autoscalerState.get();
if (!savedState?.scalingOperationId) {
// no LRO ongoing.
return savedState;
}
try {
const [operationState] = await getOperationState(
savedState.scalingOperationId,
cluster.engine,
);
if (!operationState) {
throw new Error(
`GetOperation(${savedState.scalingOperationId}) returned no results`,
);
}
/** @type {RedisClusterProtos.google.cloud.redis.cluster.v1.OperationMetadata|MemorystoreProtos.google.cloud.memorystore.v1.OperationMetadata} */
let metadata;
try {
if (
operationState.metadata?.value == null ||
(operationState.metadata?.type_url !==
RedisClusterProtos.google.cloud.redis.cluster.v1.OperationMetadata.getTypeUrl() &&
operationState.metadata?.type_url !==
MemorystoreProtos.google.cloud.memorystore.v1.OperationMetadata.getTypeUrl())
) {
throw new Error('no metadata in response');
}
if (cluster.engine === AutoscalerEngine.REDIS) {
metadata =
RedisClusterProtos.google.cloud.redis.cluster.v1.OperationMetadata.decode(
/** @type {any} */ (operationState.metadata).value,
);
} else if (cluster.engine === AutoscalerEngine.VALKEY) {
metadata =
MemorystoreProtos.google.cloud.memorystore.v1.OperationMetadata.decode(
/** @type {any} */ (operationState.metadata).value,
);
} else {
throw new Error(
`Unknown engine for operation metadata decode: ${cluster.engine}`,
);
}
} catch (e) {
throw new Error(
`GetOperation(${savedState.scalingOperationId}) could not decode OperationMetadata: ${e}`,
);
}
const createTimeMillis =
metadata.createTime?.seconds == null || metadata.createTime.nanos == null
? 0
: Number(metadata.createTime.seconds) * 1000 +
Math.floor(metadata.createTime.nanos / 1_000_000);
const createTimeStamp = new Date(createTimeMillis).toISOString();
const endTimeMillis =
metadata.endTime?.seconds == null || metadata.endTime.nanos == null
? 0
: Number(metadata.endTime.seconds) * 1000 +
Math.floor(metadata.endTime.nanos / 1_000_000);
const endTimeStamp = new Date(endTimeMillis).toISOString();
if (operationState.done) {
if (!operationState.error) {
// Completed successfully.
logger.info({
message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for size ${savedState.scalingRequestedSize} SUCCEEDED. Started: ${createTimeStamp}, completed: ${endTimeStamp}`,
projectId: cluster.projectId,
regionId: cluster.regionId,
clusterId: cluster.clusterId,
payload: cluster,
});
// Set completion time in SavedState
if (endTimeMillis) {
savedState.lastScalingCompleteTimestamp = endTimeMillis;
} else {
// invalid end date, assume start date...
logger.warn(
`Failed to parse operation endTime : ${metadata.endTime}`,
);
savedState.lastScalingCompleteTimestamp =
savedState.lastScalingTimestamp;
}
// Record success counters.
await Counters.recordScalingDuration(
savedState.lastScalingCompleteTimestamp -
savedState.lastScalingTimestamp,
cluster,
savedState.scalingRequestedSize || 0,
savedState.scalingPreviousSize,
savedState.scalingMethod,
);
await Counters.incScalingSuccessCounter(
cluster,
savedState.scalingRequestedSize || 0,
savedState.scalingPreviousSize,
savedState.scalingMethod,
);
// Clear operation frm savedState
savedState.scalingOperationId = null;
savedState.scalingRequestedSize = null;
savedState.scalingPreviousSize = null;
savedState.scalingMethod = null;
} else {
// Last operation failed with an error
logger.error({
message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for size ${savedState.scalingRequestedSize} FAILED: ${operationState.error?.message}. Started: ${createTimeStamp}, completed: ${endTimeStamp}`,
projectId: cluster.projectId,
regionId: cluster.regionId,
clusterId: cluster.clusterId,
error: operationState.error,
payload: cluster,
});
await Counters.incScalingFailedCounter(
cluster,
savedState.scalingRequestedSize || 0,
savedState.scalingPreviousSize,
savedState.scalingMethod,
);
// Clear last scaling operation from savedState.
savedState.scalingOperationId = null;
savedState.scalingRequestedSize = null;
savedState.lastScalingCompleteTimestamp = 0;
savedState.lastScalingTimestamp = 0;
savedState.scalingPreviousSize = null;
savedState.scalingMethod = null;
}
} else {
if (!!metadata.requestedCancellation) {
logger.info({
message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for ${savedState.scalingRequestedSize} CANCEL REQUESTED. Started: ${createTimeStamp}`,
projectId: cluster.projectId,
regionId: cluster.regionId,
clusterId: cluster.clusterId,
payload: cluster,
});
} else {
logger.info({
message: `----- ${cluster.projectId}/${cluster.regionId}/${cluster.clusterId}: Last scaling request for ${savedState.scalingRequestedSize} IN PROGRESS. Started: ${createTimeStamp}`,
projectId: cluster.projectId,
regionId: cluster.regionId,
clusterId: cluster.clusterId,
payload: cluster,
});
}
}
} catch (err) {
// Fallback - LRO.get() API failed or returned invalid status.
// Assume complete.
logger.error({
message: `Failed to retrieve state of operation, assume completed. ID: ${savedState.scalingOperationId}: ${err}`,
err: err,
});
savedState.lastScalingCompleteTimestamp = savedState.lastScalingTimestamp;
savedState.scalingOperationId = null;
// Record success counters.
await Counters.recordScalingDuration(
savedState.lastScalingCompleteTimestamp - savedState.lastScalingTimestamp,
cluster,
savedState.scalingRequestedSize || 0,
savedState.scalingPreviousSize,
savedState.scalingMethod,
);
await Counters.incScalingSuccessCounter(
cluster,
savedState.scalingRequestedSize || 0,
savedState.scalingPreviousSize,
savedState.scalingMethod,
);
savedState.scalingRequestedSize = null;
savedState.scalingPreviousSize = null;
savedState.scalingMethod = null;
}
// Update saved state in storage.
await autoscalerState.updateState(savedState);
return savedState;
}