wrappers/validators/comparator-validator.ts (213 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
import { IConfiguredGenerator, ILogData, IWrapper } from "../../core/ext-types"
import { IBuiltStage } from "../../core/pipeline-types";
import { difer, hash } from "../../core/utils.js";
/*
* Compares the contents of two generators
* 1.) Datajet Generator Source
* 2.) Validation Generator Source
*/
interface IComparatorWrapperConfig {
validationGraceTimeout: number, /* validation period ends after this many seconds no matter what */
validationIdleTimeout: number, /* validation ends if no new validation logs are recieved in this time */
stageGeneratorRef: string,
validationGeneratorRef: string,
stageGeneratorLogCount: number, /* null means implied to be equal to validation log count */
isValidatingWhileExecuting: boolean,
}
const defaultConfig: IComparatorWrapperConfig = {
validationGraceTimeout: 20, /* seconds */
validationIdleTimeout: 5, /* seconds */
stageGeneratorRef: null,
validationGeneratorRef: null,
stageGeneratorLogCount: null,
isValidatingWhileExecuting: false,
}
interface IComparatorWrapperMetrics {
testDuration: number, /* seconds */
validationLogTotalCount: number,
duplicate: number,
loss: number,
validationGraceDuration: number, /* seconds after validation starts for all logs to be recieved */
validationGraceAndMetricsDuration: number,
isValidationCompleted: boolean,
}
const comparatorWrapper: IWrapper = {
name: "comparator-validator",
defaultConfig: defaultConfig,
modifySubschema: (subschema)=>subschema,
createConfiguredWrapper: function (config: IComparatorWrapperConfig, {
logger,
library,
}) {
let testStartTime: number;
let validationLogTotalCount: number = 0;
let validationLogHashCounts: {[key: string]: number} = {};
let stageLogHashCounts: {[key: string]: number} = {};
let ingestionValidationLoop: Promise<boolean>;
let ingestionLatestTime: number;
const stageGenerator = library[config.stageGeneratorRef].data as IConfiguredGenerator;
const validationGenerator = library[config.validationGeneratorRef].data as IConfiguredGenerator;
const stageGeneratorInstance = stageGenerator.makeInstance();
const validationGeneratorInstance = validationGenerator.makeInstance();
let abortHandlers: Array<{ abort: () => void }> = [];
const removeAbortHandler = (handlerToRemove) => {
abortHandlers = abortHandlers.filter(handler => handler !== handlerToRemove);
}
const addAbortHandler = (resolve: (any) => void) => {
const abortHandler = {
abort: function(){
removeAbortHandler(this);
resolve(null);
}
};
abortHandlers.push(abortHandler);
return abortHandler;
}
const waitForAbortGeneratorNextSignal = () => {
return new Promise(async (resolve, reject) => {
addAbortHandler(resolve);
});
}
const signalAbortGeneratorNext = () => {
abortHandlers.forEach((handler) => {
handler.abort();
});
}
const signalMoveToTimeoutIngestion = () => {
logger.debug("ingestion signalled. move to grace phase of ingestion")
signalAbortGeneratorNext();
}
/* potentially overly complex. Only need one handler */
const getSignallableGeneratorNext = (): Promise<IteratorResult<ILogData[], any> | null> => {
return new Promise(async (resolve, reject) => {
const abortHandler = addAbortHandler(resolve);
const result = await difer(() => validationGeneratorInstance.next());
removeAbortHandler(abortHandler);
resolve(result);
})
}
const getTimeoutGeneratorNext = (): Promise<IteratorResult<ILogData[], any> | null> => {
return new Promise(async (resolve, reject) => {
const myTimeout = setTimeout(() => {
resolve(null)
}, config.validationIdleTimeout * 1000);
const result = await difer(() => validationGeneratorInstance.next()); /* don't hog cpu */
clearTimeout(myTimeout);
resolve(result);
});
}
const ingestValidationLogBatch = (logs: ILogData[]) => {
logs.forEach((log) => {
/* ingest validation log */
validationLogTotalCount++;
validationLogHashCounts[hash(log)] = (validationLogHashCounts[hash(log)] ?? 0) + 1;
ingestionLatestTime = Date.now();
});
}
const ingestStageLogBatch = (logs: ILogData[]) => {
logs.forEach((log) => {
/* ingest stage log */
stageLogHashCounts[hash(log)] = (stageLogHashCounts[hash(log)] ?? 0) + 1;
});
}
const ingestValidationLogBatches = async () => {
logger.debug("ingesting validation logs");
let isSignalled = false;
let result: IteratorResult<ILogData[], any>;
/* validate while executing child */
if (config.isValidatingWhileExecuting) {
result = await getSignallableGeneratorNext();
isSignalled = result === null;
while (!isSignalled && !result.done) { /* null counts as complete */
/* ingest validation log */
ingestValidationLogBatch(result.value);
result = await getSignallableGeneratorNext();
isSignalled = result === null;
}
/* nothing left to validate */
if (result.done) {
return true;
}
}
/* otherwise wait for signal to start validating */
else {
await waitForAbortGeneratorNextSignal();
}
/* move to ingestion with grace phase */
logger.debug("ingesting validation logs: grace phase");
/* grace timeout for rapid yielding */
let isGraceTimedOut = false;
const graceTimer = setTimeout(() => isGraceTimedOut = true, config.validationGraceTimeout * 1000);
result = await getTimeoutGeneratorNext();
let isIdleTimedout = result === null;
while (!isGraceTimedOut && !isIdleTimedout && result.done === false) { /* null counts as complete */
/* ingest validation log */
ingestValidationLogBatch(result.value);
result = await getTimeoutGeneratorNext();
isIdleTimedout = result === null;
}
clearTimeout(graceTimer);
/*
* return true: Idle timeout means that validation completed.
* return false: Grace timeout means our validation time is up.
*/
if (isGraceTimedOut) {
return false;
}
return true;
}
const ingestionCompleteOrTimeout = async () => { /* return true if completed, false if timeout */
return new Promise(async (resolve, reject) => {
/*
* grace timeout for slow yielding, ingestionValidation loop has its
* own timeout to help with cleaning up the closure
*/
setTimeout(() => resolve(false), config.validationGraceTimeout * 1000);
const isComplete = await ingestionValidationLoop;
resolve(isComplete);
});
}
return {
wrapperTemplate: this,
setup: async (root: IBuiltStage, subtree: IBuiltStage) => {
/* validation ingestion loop */
ingestionValidationLoop = ingestValidationLogBatches(); /* signallable */
testStartTime = Date.now();
return true;
},
validation: async (root: IBuiltStage, subtree: IBuiltStage) => {
/* signal ingestion to move to timeout phase */
signalMoveToTimeoutIngestion();
/* wait for grace timeout phase */
const graceStartTime = Date.now();
const validationCompleted = await ingestionCompleteOrTimeout();
const validationGraceDuration = ingestionLatestTime - graceStartTime;
/* ingest source generator logs */
const stageGeneratorLogCount = config.stageGeneratorLogCount ?? validationLogTotalCount;
for (let i = 0; i < stageGeneratorLogCount; ++i) {
const stageLog = await stageGeneratorInstance.next();
if (stageLog.value === null) {
break;
}
ingestStageLogBatch(stageLog.value);
if (stageLog.done) {
break;
}
}
/* verify logs in both + only_stage_log */
let duplicate = 0;
let loss = 0;
for (const myHash in stageLogHashCounts) {
const stageCount = stageLogHashCounts[myHash] ?? 0;
const validationCount = validationLogHashCounts[myHash] ?? 0;
if (stageCount > validationCount) {
loss += stageCount - validationCount;
}
if (validationCount > stageCount) {
duplicate += validationCount - stageCount;
}
}
/* verify logs in only_validation_count */
for (const myHash in validationLogHashCounts) {
if (stageLogHashCounts.hasOwnProperty(myHash)) {
/* In stageHashCounts */
continue;
}
const validationCount = validationLogHashCounts[myHash] ?? 0;
duplicate += validationCount;
}
/* additional metrics */
const testDuration = Date.now() - testStartTime;
const validationGraceAndMetricsDuration = Date.now() - graceStartTime;
return {
isValidationSuccess: true,
// Other data can be added here for validation metric collection.
validationData: {
testDuration: testDuration,
validationLogTotalCount: validationLogTotalCount,
duplicate: duplicate,
loss: loss,
validationGraceDuration: validationGraceDuration,
validationGraceAndMetricsDuration: validationGraceAndMetricsDuration,
isValidationCompleted: validationCompleted,
} as IComparatorWrapperMetrics,
/* May want to add hidden validation data */
};
},
breakdown: async (root: IBuiltStage, subtree: IBuiltStage) => {
/* clear memory */
stageLogHashCounts = {};
validationLogHashCounts = {};
return true;
},
isValidationAsync: false,
};
}
}
export default comparatorWrapper;