public async transform()

in source/packages/services/events-processor/src/transformers/ddbstream.transformer.ts [24:137]


    public async transform(event: any): Promise<CommonEvent[]> {
        logger.debug(`ddbstream.transformer transform: in: event:${JSON.stringify(event)}`);

        const principalAttributes: { [key: string]: string } = {};

        const transformedEvents: CommonEvent[] = [];

        for (const rec of event.Records) {
            // arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899
            // becomes arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream
            const arnSplit = rec.eventSourceARN.split('/');
            const eventSourceId = `${arnSplit[0]}/${arnSplit[1]}`;

            /**
             *   validate the event stream
             */
            if (rec.eventSource !== 'aws:dynamodb') {
                logger.error(
                    `eventSource ${eventSourceId} incorrectly configured as a dynamodb stream!`
                );
                break;
            }

            /**
             *   only interested in certain event types
             */
            if (
                rec.eventName !== 'INSERT' &&
                rec.eventName !== 'MODIFY' &&
                rec.eventName !== 'REMOVE'
            ) {
                continue;
            }

            /**
             *   identify the principal of the incoming event source
             */

            let principalAttribute = principalAttributes[eventSourceId];
            if (principalAttribute === undefined) {
                const r = await this.eventSourceDao.get(eventSourceId);
                if (r === undefined) {
                    logger.warn(`eventSource ${eventSourceId} not configured therefore ignoring`);
                    continue;
                }
                principalAttributes[eventSourceId] = r.principal;
                principalAttribute = principalAttributes[eventSourceId];
            }

            if (rec.dynamodb === undefined || rec.dynamodb.Keys === undefined) {
                logger.warn(
                    `eventSource ${eventSourceId} missing 'Keys' therefore ignoring: ${rec}`
                );
                continue;
            }

            /**
             *   transform the incoming event
             */

            const transformedEvent: CommonEvent = {
                eventSourceId,
                principal: principalAttribute,
                principalValue: undefined,
                sourceChangeType: <string>rec.eventName,
                attributes: {},
            };

            const keys = rec.dynamodb.Keys;
            const newImage = rec.dynamodb.NewImage;
            const oldImage = rec.dynamodb.OldImage;

            Object.keys(keys).forEach((prop) => {
                const value = this.extractValue(keys[prop]);
                if (prop === principalAttribute) {
                    transformedEvent.principalValue = <string>value;
                }
                transformedEvent.attributes[prop] = value;
            });

            if (newImage !== undefined) {
                Object.keys(newImage).forEach((prop) => {
                    const value = this.extractValue(newImage[prop]);
                    if (prop === principalAttribute) {
                        transformedEvent.principalValue = <string>value;
                    }
                    transformedEvent.attributes[prop] = value;
                });
            } else if (oldImage !== undefined) {
                // This is considered for REMOVE events only
                Object.keys(oldImage).forEach((prop) => {
                    const value = this.extractValue(oldImage[prop]);
                    if (prop === principalAttribute) {
                        transformedEvent.principalValue = <string>value;
                    }
                    transformedEvent.attributes[prop] = value;
                });
            }

            if (transformedEvent.principalValue === undefined) {
                logger.warn(
                    `eventSource ${eventSourceId} missing value for principal therefore ignoring.  attributes: ${transformedEvent.attributes}`
                );
                continue;
            }

            transformedEvents.push(transformedEvent);
        }

        logger.debug(
            `ddbstream.transformer transform: exit: ${JSON.stringify(transformedEvents)}`
        );
        return transformedEvents;
    }