source/packages/services/command-and-control/src/commands/commands.dao.ts (311 lines of code) (raw):

/********************************************************************************************************************* * Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. * * * * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance * * with the License. A copy of the License is located at * * * * http://www.apache.org/licenses/LICENSE-2.0 * * * * or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES * * OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions * * and limitations under the License. * *********************************************************************************************************************/ import { DocumentClient } from 'aws-sdk/clients/dynamodb'; import { inject, injectable } from 'inversify'; import { logger } from '@awssolutions/simple-cdf-logger'; import { TYPES } from '../di/types'; import { PkType, createDelimitedAttribute, createDelimitedAttributePrefix, expandDelimitedAttribute, } from '../utils/pkUtils.util'; import { CommandItem, CommandListIdsByTagPaginationKey, CommandListPaginationKey, Tags, } from './commands.models'; import { DynamoDB } from 'aws-sdk'; import { DynamoDbUtils } from '../utils/dynamoDb.util'; @injectable() export class CommandsDao { private readonly GSI1 = 'siKey1-sk-index'; private _dc: DynamoDB.DocumentClient; public constructor( @inject('aws.dynamoDb.table') private table: string, @inject(TYPES.DynamoDbUtils) private dynamoDbUtils: DynamoDbUtils, @inject(TYPES.DocumentClientFactory) documentClientFactory: () => DynamoDB.DocumentClient ) { this._dc = documentClientFactory(); } public async get(commandIds: string[]): Promise<CommandItem[]> { logger.debug(`commands.dao get: in: commandIds:${commandIds}`); const params: DocumentClient.BatchGetItemInput = { RequestItems: {}, }; params.RequestItems[this.table] = { Keys: commandIds.map((id) => ({ pk: createDelimitedAttribute(PkType.Command, id), sk: createDelimitedAttribute(PkType.Command, id), })), }; logger.silly(`commands.dao get: params: ${JSON.stringify(params)}`); const response = await this.dynamoDbUtils.batchGetAll(params); logger.silly(`commands.dao get: response: ${JSON.stringify(response)}`); if (response?.Responses?.[this.table] == undefined) { logger.debug('commands.dao get: exit: commands:undefined'); return undefined; } const commands = this.assembleCommands(response.Responses[this.table]); logger.debug(`commands.dao get: exit: commands:${JSON.stringify(commands)}`); return commands; } public async list( exclusiveStart?: CommandListPaginationKey, count?: number ): Promise<[CommandItem[], CommandListPaginationKey]> { logger.debug( `commands.dao list: exclusiveStart:${JSON.stringify(exclusiveStart)}, count:${count}` ); let exclusiveStartKey: DocumentClient.Key; if (exclusiveStart?.commandId) { const lastCommandId = createDelimitedAttribute( PkType.Command, exclusiveStart.commandId ); exclusiveStartKey = { pk: lastCommandId, sk: lastCommandId, siKey1: PkType.Command, }; } const params: DocumentClient.QueryInput = { TableName: this.table, IndexName: this.GSI1, KeyConditionExpression: `#hash = :hash`, ExpressionAttributeNames: { '#hash': 'siKey1', }, ExpressionAttributeValues: { ':hash': PkType.Command, }, Select: 'ALL_ATTRIBUTES', ExclusiveStartKey: exclusiveStartKey, Limit: count, }; logger.silly(`commands.dao list: QueryInput: ${JSON.stringify(params)}`); const results = await this._dc.query(params).promise(); logger.silly(`query result: ${JSON.stringify(results)}`); if ((results?.Count ?? 0) === 0) { logger.debug('commands.dao list: exit: [undefined,undefined]'); return [undefined, undefined]; } const commands = this.assembleCommands(results.Items); let paginationKey: CommandListPaginationKey; if (results.LastEvaluatedKey) { const lastEvaluatedCommandId = expandDelimitedAttribute( results.LastEvaluatedKey.pk )[1]; paginationKey = { commandId: lastEvaluatedCommandId, }; } const response: [CommandItem[], CommandListPaginationKey] = [commands, paginationKey]; logger.debug(`commands.dao list: exit: response:${JSON.stringify(response)}`); return response; } public async listIds( tagKey: string, tagValue: string, exclusiveStart?: CommandListIdsByTagPaginationKey, count?: number ): Promise<[string[], CommandListIdsByTagPaginationKey]> { logger.debug( `commands.dao listIds: tagKey:${tagKey}, tagValue:${tagValue}, exclusiveStart:${JSON.stringify( exclusiveStart )}, count:${count}` ); let exclusiveStartKey: DocumentClient.Key; if (exclusiveStart?.commandId) { exclusiveStartKey = { pk: createDelimitedAttribute(PkType.Tag, exclusiveStart.tagKey), sk: createDelimitedAttribute( PkType.Tag, exclusiveStart.tagValue, PkType.Command, exclusiveStart.commandId ), }; } const params: DocumentClient.QueryInput = { TableName: this.table, KeyConditionExpression: `#hash = :hash AND begins_with(#sort,:sort)`, ExpressionAttributeNames: { '#hash': 'pk', '#sort': 'sk', }, ExpressionAttributeValues: { ':hash': createDelimitedAttribute(PkType.Tag, tagKey), ':sort': createDelimitedAttributePrefix(PkType.Tag, tagValue, PkType.Command), }, ExclusiveStartKey: exclusiveStartKey, Limit: count, }; logger.silly(`commands.dao listIds: QueryInput: ${JSON.stringify(params)}`); const results = await this._dc.query(params).promise(); logger.silly(`query result: ${JSON.stringify(results)}`); if ((results?.Count ?? 0) === 0) { logger.debug('commands.dao listIds: exit: [undefined,undefined]'); return [undefined, undefined]; } const commandIds: string[] = []; for (const i of results.Items) { commandIds.push(expandDelimitedAttribute(i.sk)[3]); } let paginationKey: CommandListIdsByTagPaginationKey; if (results.LastEvaluatedKey) { paginationKey = { tagKey: expandDelimitedAttribute(results.LastEvaluatedKey.pk)[1], tagValue: expandDelimitedAttribute(results.LastEvaluatedKey.sk)[1], commandId: expandDelimitedAttribute(results.LastEvaluatedKey.sk)[3], }; } const response: [string[], CommandListIdsByTagPaginationKey] = [commandIds, paginationKey]; logger.debug(`commands.dao listIds: exit: response:${JSON.stringify(response)}`); return response; } private assembleCommands(items: DocumentClient.ItemList): CommandItem[] { const list: CommandItem[] = []; for (const attrs of items) { const r: CommandItem = { id: attrs.commandId, operation: attrs.operation, createdAt: new Date(attrs.createdAt), updatedAt: new Date(attrs.updatedAt), deliveryMethod: attrs.topicDeliveryMethod ?? attrs.shadowDeliveryMethod ?? attrs.jobDeliveryMethod, payloadTemplate: attrs.payloadTemplate, payloadParams: attrs.payloadParams, enabled: attrs.enabled, tags: attrs.tags, }; list.push(r); } return list; } public async save(command: CommandItem, tagsToDelete?: Tags): Promise<void> { logger.debug( `commands.dao save: in: command:${JSON.stringify( command )}, tagsToDelete:${JSON.stringify(tagsToDelete)}` ); const params: DocumentClient.BatchWriteItemInput = { RequestItems: {}, }; params.RequestItems[this.table] = []; /////// the main command item to save... const commandDbItem = { PutRequest: { Item: { pk: createDelimitedAttribute(PkType.Command, command.id), sk: createDelimitedAttribute(PkType.Command, command.id), commandId: command.id, siKey1: PkType.Command, siKey2: PkType.Command, operation: command.operation, enabled: command.enabled, payloadTemplate: command.payloadTemplate, payloadParams: command.payloadParams, createdAt: command.createdAt.getTime(), updatedAt: command.updatedAt?.getTime(), tags: command.tags, // denormalized tags to simplify the assembly of the command }, }, }; if (command.deliveryMethod?.type === 'TOPIC') { commandDbItem.PutRequest.Item['topicDeliveryMethod'] = command.deliveryMethod; } if (command.deliveryMethod?.type === 'SHADOW') { commandDbItem.PutRequest.Item['shadowDeliveryMethod'] = command.deliveryMethod; } if (command.deliveryMethod?.type === 'JOB') { commandDbItem.PutRequest.Item['jobDeliveryMethod'] = command.deliveryMethod; } params.RequestItems[this.table].push(commandDbItem); /////// searchable tags to save... if (command.tags) { Object.entries(command.tags).forEach(([k, v]) => { const tagDbItem: DocumentClient.WriteRequest = { PutRequest: { Item: { pk: createDelimitedAttribute(PkType.Tag, k), sk: createDelimitedAttribute( PkType.Tag, v, PkType.Command, command.id ), }, }, }; params.RequestItems[this.table].push(tagDbItem); }); } /////// old searchable tags to remove... if (tagsToDelete) { Object.entries(tagsToDelete).forEach(([k, v]) => { const tagDbItem: DocumentClient.WriteRequest = { DeleteRequest: { Key: { pk: createDelimitedAttribute(PkType.Tag, k), sk: createDelimitedAttribute( PkType.Tag, v, PkType.Command, command.id ), }, }, }; params.RequestItems[this.table].push(tagDbItem); }); } logger.silly(`commands.dao save: params:${JSON.stringify(params)}`); const r = await this.dynamoDbUtils.batchWriteAll(params); logger.silly(`commands.dao save: r:${JSON.stringify(r)}`); if (this.dynamoDbUtils.hasUnprocessedItems(r)) { logger.error( `commands.dao save: has unprocessed items: ${JSON.stringify(r.UnprocessedItems)}` ); throw new Error('SAVE_COMMAND_FAILED'); } logger.debug(`commands.dao save: exit:`); } public async delete(commandId: string): Promise<void> { logger.debug(`commands.dao delete: in: commandId:${commandId}`); const command = (await this.get([commandId]))?.[0]; const params: DocumentClient.BatchWriteItemInput = { RequestItems: {}, }; const commandDbItem: DocumentClient.WriteRequest = { DeleteRequest: { Key: { pk: createDelimitedAttribute(PkType.Command, commandId), sk: createDelimitedAttribute(PkType.Command, commandId), }, }, }; params.RequestItems[this.table] = [commandDbItem]; if (command.tags) { Object.entries(command.tags).forEach(([k, v]) => { const tagDbItem: DocumentClient.WriteRequest = { DeleteRequest: { Key: { pk: createDelimitedAttribute(PkType.Tag, k), sk: createDelimitedAttribute(PkType.Tag, v, PkType.Command, commandId), }, }, }; params.RequestItems[this.table].push(tagDbItem); }); } logger.silly(`commands.dao delete: params:${JSON.stringify(params)}`); const r = await this.dynamoDbUtils.batchWriteAll(params); logger.silly(`commands.dao delete: r:${JSON.stringify(r)}`); if (this.dynamoDbUtils.hasUnprocessedItems(r)) { logger.error( `commands.dao delete: has unprocessed items: ${JSON.stringify(r.UnprocessedItems)}` ); throw new Error('DELETE_COMMAND_FAILED'); } logger.debug(`commands.dao delete: exit:`); } }