in functions/source/real-time-adherence/AspectKinesisLamda/ProcessKinesisEvents.cs [184:223]
private async Task ProcessEventRecord(KinesisEvent.KinesisEventRecord record, bool writeEventsToQueue)
{
_logger.Trace("Beginning ProcessEventRecord");
_logger.Debug($"Kinesis EventId:{record.EventId} EventName: {record.EventName} EventSource: {record.EventSource} EventVersion: {record.EventVersion} EventSourceARN {record.EventSourceARN} InvokeIdentityARN: {record.InvokeIdentityArn} Kinesis Time: {record.Kinesis.ApproximateArrivalTimestamp} Kinesis ParitionKey: {record.Kinesis.PartitionKey} Kinesis SeqNum: {record.Kinesis.SequenceNumber}");
string recordData = GetRecordContents(record.Kinesis);
_logger.Debug($"RecordData:{recordData}");
//FOR CREATING TEST DATA
//var plainTextBytes = System.Text.Encoding.UTF8.GetBytes(recordData);
//string convertedRecordData = System.Convert.ToBase64String(plainTextBytes);
//Logger.Info(convertedRecordData);
if (!String.IsNullOrEmpty(recordData))
{
var minimizedJSON = ShrinkEvent(recordData);
var eventRecord = ParseEvent(minimizedJSON);
if (eventRecord != null && MatchFilter(eventRecord))
{
_logger.Debug("Event matches filter");
ConnectKinesisEventRecord streamRecord = ConvertRecordData(minimizedJSON, eventRecord);
// if we were able to parse this event, send to DynamoDB and SQS.
if (streamRecord != null)
{
if (streamRecord.LastEventType != HEARTBEAT_EVENTTYPE)
{
await ProcessEventToTable(streamRecord);
}
if (writeEventsToQueue)
{
await ProcessEventToQueue(minimizedJSON, streamRecord.AgentARN);
}
}
}
}
_logger.Trace("Ending ProcessEventRecord");
}