pluto-message-ingestion/pluto-message-processor.js (84 lines of code) (raw):

const logForElk = require('./logger'); const DELETE_KEY = '(DELETE)'; class PlutoMessageProcessor { constructor({ hostname, hmacRequest }) { this.hostname = hostname; this.hmacRequest = hmacRequest; } process(message) { const upsertMessageTypes = ['project-created', 'project-updated']; if (upsertMessageTypes.includes(message.type)) { if (message.commissionTitle === DELETE_KEY) { return this._deleteProject(message); } return this._upsertProject(message); } return Promise.reject(`unknown message type: ${message.type}`); } static _isValidMessage(message) { const requiredKeys = new Set([ 'id', 'title', 'status', 'commissionId', 'commissionTitle', 'productionOffice', 'created' ]); const messageKeys = new Set(Object.keys(message)); const diff = new Set( [...requiredKeys].filter(key => !messageKeys.has(key)) ); return diff.size === 0; } _deleteProject({ commissionId }) { const remoteUrl = `${this.hostname}/api/pluto/commissions/${commissionId}`; return this.hmacRequest.delete(remoteUrl); } _upsertProject(message) { return new Promise((resolve, reject) => { if (!PlutoMessageProcessor._isValidMessage(message)) { logForElk( { message: 'invalid message, props missing', data: { message } }, 'error' ); // `resolve` to remove message from Kinesis return resolve('invalid message, props missing'); } const project = Object.assign({}, message); const remoteUrl = `${this.hostname}/api/pluto/projects`; this.hmacRequest .put(remoteUrl, project) .then(resp => { logForElk( { message: 'successfully upserted project', response: resp }, 'log' ); resolve(resp); }) .catch(err => { const logDetail = { status: err.status, response: err.response, project: project }; logForElk( { message: 'failed to upsert project', extraDetail: logDetail }, 'error' ); reject(err); }); }); } } module.exports = PlutoMessageProcessor;