async processEvent()

in source/services/events-processing/lib/event.js [51:190]


  async processEvent(input, recordId, context) {
    const _self = this;
    try {
      // Extract event object and applicationId string from payload. application_id and event are required or record fails processing
      if(!input.hasOwnProperty('application_id')){
        return Promise.reject({
          recordId: recordId,
          result: 'ProcessingFailed',
          data: new Buffer.from(JSON.stringify(input) + '\n').toString('base64')
        });
      }
      if(!input.hasOwnProperty('event')){
        return Promise.reject({
          recordId: recordId,
          result: 'ProcessingFailed',
          data: new Buffer.from(JSON.stringify(input) + '\n').toString('base64')
        });
      }
      const applicationId = input.application_id;
      const event = input.event;
      
      // Add a processing timestamp and the Lambda Request Id to the event metadata
      let metadata = {
        ingestion_id: context.awsRequestId,
        processing_timestamp: moment().unix()
      };
      
      // If event came from Solution API, it should have extra metadata
      if (input.aws_ga_api_validated_flag) {
        metadata.api = {};
        if (input.aws_ga_api_requestId) { 
          metadata.api.request_id = input.aws_ga_api_requestId; 
        }
        if (input.aws_ga_api_requestTimeEpoch) {
          metadata.api.request_time_epoch = input.aws_ga_api_requestTimeEpoch;
        }
      }
      
      // Retrieve application config from Applications table
      const application = await _self.getApplication(applicationId);
      if (application !== null) {
        // Validate the input record against solution event schema
        const schemaValid = await _self.validateSchema(input);
        let transformed_event = {};
        if (schemaValid.validation_result == 'schema_mismatch') {
          metadata.processing_result = {
            status: 'schema_mismatch',
            validation_errors: schemaValid.validation_errors
          };
          transformed_event.metadata = metadata;
          //console.log(`Errors processing event: ${JSON.stringify(errors)}`);
        } else {
          metadata.processing_result = {
            status: 'ok'
          };
          transformed_event.metadata = metadata;
        }
        
        if(event.hasOwnProperty('event_id')){
          transformed_event.event_id = String(event.event_id);
        }
        if(event.hasOwnProperty('event_type')){
          transformed_event.event_type = String(event.event_type);
        }
        if(event.hasOwnProperty('event_name')){
          transformed_event.event_name = String(event.event_name);
        }
        if(event.hasOwnProperty('event_version')){
          transformed_event.event_version = String(event.event_version);
        }
        if(event.hasOwnProperty('event_timestamp')){
          transformed_event.event_timestamp = Number(event.event_timestamp);
        }
        if(event.hasOwnProperty('app_version')){
          transformed_event.app_version = String(event.app_version);
        }
        if(event.hasOwnProperty('event_data')){
          transformed_event.event_data = event.event_data;
        }
        
        transformed_event.application_name = String(application.application_name);
        transformed_event.application_id = String(applicationId);
        
        return Promise.resolve({
          recordId: recordId,
          result: 'Ok',
          data: new Buffer.from(JSON.stringify(transformed_event) + '\n').toString('base64')
        });
      } else {
        /**
         * Handle events from unregistered ("NOT_FOUND") applications
         * Sets processing result as "unregistered"
         * We don't attempt to validate schema of unregistered events, we just coerce the necessary fields into expected format 
         */
        metadata.processing_result = {
          status: 'unregistered'
        };
        let unregistered_format = {};
        unregistered_format.metadata = metadata;
        
        if(event.hasOwnProperty('event_id')){
          unregistered_format.event_id = String(event.event_id);
        }
        if(event.hasOwnProperty('event_type')){
          unregistered_format.event_type = String(event.event_type);
        }
        if(event.hasOwnProperty('event_name')){
          unregistered_format.event_name = String(event.event_name);
        }
        if(event.hasOwnProperty('event_version')){
          unregistered_format.event_version = String(event.event_version);
        }
        if(event.hasOwnProperty('event_timestamp')){
          unregistered_format.event_timestamp = Number(event.event_timestamp);
        }
        if(event.hasOwnProperty('app_version')){
          unregistered_format.app_version = String(event.app_version);
        }
        if(event.hasOwnProperty('event_data')){
          unregistered_format.event_data = event.event_data;
        }
        
        // Even though the application_id is not registered, let's add it to the event
        unregistered_format.application_id = String(applicationId);
        
        return Promise.resolve({
          recordId: recordId,
          result: 'Ok',
          data: new Buffer.from(JSON.stringify(unregistered_format) + '\n').toString('base64')
        });
      } 
    } catch (err) {
      console.log(`Error processing record: ${JSON.stringify(err)}`);
      return Promise.reject({
        recordId: recordId,
        result: 'ProcessingFailed',
        data: new Buffer.from(JSON.stringify(input) + '\n').toString('base64')
      });
    }
  }