in sample_apps/js/crud-and-simple-ingestion-example.js [316:455]
async function writeRecordsWithUpsert() {
console.log("Writing records with upsert");
const currentTime = Date.now().toString(); // Unix time in milliseconds
// To achieve upsert (last writer wins) semantic, one example is to use current time as the version if you are writing directly from the data source
let version = Date.now();
const dimensions = [
{'Name': 'region', 'Value': 'us-east-1'},
{'Name': 'az', 'Value': 'az1'},
{'Name': 'hostname', 'Value': 'host1'}
];
const commonAttributes = {
'Dimensions': dimensions,
'MeasureValueType': 'DOUBLE',
'Time': currentTime.toString(),
'Version': version
};
const cpuUtilization = {
'MeasureName': 'cpu_utilization',
'MeasureValue': '13.5'
};
const memoryUtilization = {
'MeasureName': 'memory_utilization',
'MeasureValue': '40'
};
const records = [cpuUtilization, memoryUtilization];
const params = {
DatabaseName: constants.DATABASE_NAME,
TableName: constants.TABLE_NAME,
Records: records,
CommonAttributes: commonAttributes
};
const request = writeClient.writeRecords(params);
// write records for first time
await request.promise().then(
(data) => {
console.log("Write records successful for first time.");
},
(err) => {
console.log("Error writing records:", err);
if (err.code === 'RejectedRecordsException') {
printRejectedRecordsException(request);
}
}
);
// Successfully retry same writeRecordsRequest with same records and versions, because writeRecords API is idempotent.
await request.promise().then(
(data) => {
console.log("Write records successful for retry.");
},
(err) => {
console.log("Error writing records:", err);
if (err.code === 'RejectedRecordsException') {
printRejectedRecordsException(request);
}
}
);
// upsert with lower version, this would fail because a higher version is required to update the measure value.
version--;
const commonAttributesWithLowerVersion = {
'Dimensions': dimensions,
'MeasureValueType': 'DOUBLE',
'Time': currentTime.toString(),
'Version': version
};
const updatedCpuUtilization = {
'MeasureName': 'cpu_utilization',
'MeasureValue': '14.5'
};
const updatedMemoryUtilization = {
'MeasureName': 'memory_utilization',
'MeasureValue': '50'
};
const upsertedRecords = [updatedCpuUtilization, updatedMemoryUtilization];
const upsertedParamsWithLowerVersion = {
DatabaseName: constants.DATABASE_NAME,
TableName: constants.TABLE_NAME,
Records: upsertedRecords,
CommonAttributes: commonAttributesWithLowerVersion
};
const upsertRequestWithLowerVersion = writeClient.writeRecords(upsertedParamsWithLowerVersion);
await upsertRequestWithLowerVersion.promise().then(
(data) => {
console.log("Write records for upsert with lower version successful");
},
(err) => {
console.log("Error writing records for upsert with lower version:", err);
if (err.code === 'RejectedRecordsException') {
printRejectedRecordsException(upsertRequestWithLowerVersion);
}
}
);
// upsert with higher version as new data is generated
version = Date.now();
const commonAttributesWithHigherVersion = {
'Dimensions': dimensions,
'MeasureValueType': 'DOUBLE',
'Time': currentTime.toString(),
'Version': version
};
const upsertedParamsWithHigherVerion = {
DatabaseName: constants.DATABASE_NAME,
TableName: constants.TABLE_NAME,
Records: upsertedRecords,
CommonAttributes: commonAttributesWithHigherVersion
};
const upsertRequestWithHigherVersion = writeClient.writeRecords(upsertedParamsWithHigherVerion);
await upsertRequestWithHigherVersion.promise().then(
(data) => {
console.log("Write records upsert successful with higher version");
},
(err) => {
console.log("Error writing records:", err);
if (err.code === 'RejectedRecordsException') {
printRejectedRecordsException(upsertedParamsWithHigherVerion);
}
}
);
}