in source/services/events-processing/lib/event.js [51:190]
async processEvent(input, recordId, context) {
const _self = this;
try {
// Extract event object and applicationId string from payload. application_id and event are required or record fails processing
if(!input.hasOwnProperty('application_id')){
return Promise.reject({
recordId: recordId,
result: 'ProcessingFailed',
data: new Buffer.from(JSON.stringify(input) + '\n').toString('base64')
});
}
if(!input.hasOwnProperty('event')){
return Promise.reject({
recordId: recordId,
result: 'ProcessingFailed',
data: new Buffer.from(JSON.stringify(input) + '\n').toString('base64')
});
}
const applicationId = input.application_id;
const event = input.event;
// Add a processing timestamp and the Lambda Request Id to the event metadata
let metadata = {
ingestion_id: context.awsRequestId,
processing_timestamp: moment().unix()
};
// If event came from Solution API, it should have extra metadata
if (input.aws_ga_api_validated_flag) {
metadata.api = {};
if (input.aws_ga_api_requestId) {
metadata.api.request_id = input.aws_ga_api_requestId;
}
if (input.aws_ga_api_requestTimeEpoch) {
metadata.api.request_time_epoch = input.aws_ga_api_requestTimeEpoch;
}
}
// Retrieve application config from Applications table
const application = await _self.getApplication(applicationId);
if (application !== null) {
// Validate the input record against solution event schema
const schemaValid = await _self.validateSchema(input);
let transformed_event = {};
if (schemaValid.validation_result == 'schema_mismatch') {
metadata.processing_result = {
status: 'schema_mismatch',
validation_errors: schemaValid.validation_errors
};
transformed_event.metadata = metadata;
//console.log(`Errors processing event: ${JSON.stringify(errors)}`);
} else {
metadata.processing_result = {
status: 'ok'
};
transformed_event.metadata = metadata;
}
if(event.hasOwnProperty('event_id')){
transformed_event.event_id = String(event.event_id);
}
if(event.hasOwnProperty('event_type')){
transformed_event.event_type = String(event.event_type);
}
if(event.hasOwnProperty('event_name')){
transformed_event.event_name = String(event.event_name);
}
if(event.hasOwnProperty('event_version')){
transformed_event.event_version = String(event.event_version);
}
if(event.hasOwnProperty('event_timestamp')){
transformed_event.event_timestamp = Number(event.event_timestamp);
}
if(event.hasOwnProperty('app_version')){
transformed_event.app_version = String(event.app_version);
}
if(event.hasOwnProperty('event_data')){
transformed_event.event_data = event.event_data;
}
transformed_event.application_name = String(application.application_name);
transformed_event.application_id = String(applicationId);
return Promise.resolve({
recordId: recordId,
result: 'Ok',
data: new Buffer.from(JSON.stringify(transformed_event) + '\n').toString('base64')
});
} else {
/**
* Handle events from unregistered ("NOT_FOUND") applications
* Sets processing result as "unregistered"
* We don't attempt to validate schema of unregistered events, we just coerce the necessary fields into expected format
*/
metadata.processing_result = {
status: 'unregistered'
};
let unregistered_format = {};
unregistered_format.metadata = metadata;
if(event.hasOwnProperty('event_id')){
unregistered_format.event_id = String(event.event_id);
}
if(event.hasOwnProperty('event_type')){
unregistered_format.event_type = String(event.event_type);
}
if(event.hasOwnProperty('event_name')){
unregistered_format.event_name = String(event.event_name);
}
if(event.hasOwnProperty('event_version')){
unregistered_format.event_version = String(event.event_version);
}
if(event.hasOwnProperty('event_timestamp')){
unregistered_format.event_timestamp = Number(event.event_timestamp);
}
if(event.hasOwnProperty('app_version')){
unregistered_format.app_version = String(event.app_version);
}
if(event.hasOwnProperty('event_data')){
unregistered_format.event_data = event.event_data;
}
// Even though the application_id is not registered, let's add it to the event
unregistered_format.application_id = String(applicationId);
return Promise.resolve({
recordId: recordId,
result: 'Ok',
data: new Buffer.from(JSON.stringify(unregistered_format) + '\n').toString('base64')
});
}
} catch (err) {
console.log(`Error processing record: ${JSON.stringify(err)}`);
return Promise.reject({
recordId: recordId,
result: 'ProcessingFailed',
data: new Buffer.from(JSON.stringify(input) + '\n').toString('base64')
});
}
}