in source/packages/services/events-processor/src/transformers/ddbstream.transformer.ts [26:128]
public async transform(event: any): Promise<CommonEvent[]> {
logger.debug(`ddbstream.transformer transform: in: event:${JSON.stringify(event)}`);
const principalAttributes:{[key: string]: string} = {};
const transformedEvents:CommonEvent[]=[];
for(const rec of event.Records) {
// arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899
// becomes arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream
const arnSplit = rec.eventSourceARN.split('/');
const eventSourceId = `${arnSplit[0]}/${arnSplit[1]}`;
/**
* validate the event stream
*/
if (rec.eventSource!=='aws:dynamodb') {
logger.error(`eventSource ${eventSourceId} incorrectly configured as a dynamodb stream!`);
break;
}
/**
* only interested in certain event types
*/
if (rec.eventName!=='INSERT' && rec.eventName!=='MODIFY' && rec.eventName!=='REMOVE') {
continue;
}
/**
* identify the principal of the incoming event source
*/
let principalAttribute=principalAttributes[eventSourceId];
if (principalAttribute===undefined) {
const r = await this.eventSourceDao.get(eventSourceId);
if (r===undefined) {
logger.warn(`eventSource ${eventSourceId} not configured therefore ignoring`);
continue;
}
principalAttributes[eventSourceId]=r.principal;
principalAttribute=principalAttributes[eventSourceId];
}
if (rec.dynamodb===undefined || rec.dynamodb.Keys===undefined) {
logger.warn(`eventSource ${eventSourceId} missing 'Keys' therefore ignoring: ${rec}`);
continue;
}
/**
* transform the incoming event
*/
const transformedEvent:CommonEvent = {
eventSourceId,
principal: principalAttribute,
principalValue: undefined,
sourceChangeType: <string>rec.eventName,
attributes:{}
};
const keys=rec.dynamodb.Keys;
const newImage=rec.dynamodb.NewImage;
const oldImage=rec.dynamodb.OldImage;
Object.keys(keys).forEach(prop=> {
const value = this.extractValue(keys[prop]);
if (prop===principalAttribute) {
transformedEvent.principalValue = <string>value;
}
transformedEvent.attributes[prop] = value;
});
if (newImage!==undefined) {
Object.keys(newImage).forEach(prop=> {
const value = this.extractValue(newImage[prop]);
if (prop===principalAttribute) {
transformedEvent.principalValue = <string>value;
}
transformedEvent.attributes[prop] = value;
});
} else if (oldImage!==undefined) {
// This is considered for REMOVE events only
Object.keys(oldImage).forEach(prop=> {
const value = this.extractValue(oldImage[prop]);
if (prop===principalAttribute) {
transformedEvent.principalValue = <string>value;
}
transformedEvent.attributes[prop] = value;
});
}
if (transformedEvent.principalValue===undefined) {
logger.warn(`eventSource ${eventSourceId} missing value for principal therefore ignoring. attributes: ${transformedEvent.attributes}`);
continue;
}
transformedEvents.push(transformedEvent);
}
logger.debug(`ddbstream.transformer transform: exit: ${JSON.stringify(transformedEvents)}`);
return transformedEvents;
}