export async function onEvent()

in lib/lambda/replayQldbPartiQL/index.ts [26:87]


export async function onEvent(
    event: lambda.KinesisStreamEvent, 
    context: lambda.Context,
): Promise<void> {

    console.log(`Processing request: `, event);

    const aws_region = process.env.AWS_REGION;
    const destQdlbName = process.env.destQldbName;

    const qldbClientConfigOptions = {
        region: aws_region,
    };

    const qldbDriver = new qldb.QldbDriver(destQdlbName!, qldbClientConfigOptions);

    try {
        for (const record of event.Records) {
            const payload: lambda.KinesisStreamRecordPayload = record.kinesis;

            // Load the message as ION record. 
            const ion_record = ion.load(Buffer.from(payload.data, 'base64'));
    
            // if not ION record or not BLOCK_SUMMARY record type, skip this record. 
            if (ion_record === null || 
                ion_record.get('recordType')?.stringValue() as string !== 'BLOCK_SUMMARY') 
                continue;

            const ion_text = ion.dumpText(ion_record);

            console.log(
                `Kinesis Message:
                partition key: ${payload.partitionKey}
                sequence number: ${payload.sequenceNumber}
                kinesis schema version: ${payload.kinesisSchemaVersion}
                data: ${ion_text}
            `);

            // Now we extract each of the PartiQL statement. 
            // It's full json path is .payload.transactionInfo.statements[].statement
            const partiql_statements = ion_record.get('payload', 'transactionInfo', 'statements')
            
            for (const statement_element of partiql_statements!.elements()) {
                const statement_string = statement_element.get('statement')?.stringValue();
                if(statement_string!.toLowerCase().startsWith('select')) {
                    console.log('Ingore SELECT statement');
                    continue;
                }
                console.log(`The current PartiQL statement is ${statement_string}`);
                await qldbDriver.executeLambda(async (txn: qldb.TransactionExecutor) => {
                    Promise.all([
                        executeStatement(txn, statement_string!),
                    ]);
                });
            }

        }
    } catch (error) {
        console.log(error);
    }

};