source/packages/services/command-and-control/src/messages/messages.dao.ts (642 lines of code) (raw):
/*********************************************************************************************************************
* Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. *
* *
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance *
* with the License. A copy of the License is located at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* or in the 'license' file accompanying this file. This file is distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES *
* OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions *
* and limitations under the License. *
*********************************************************************************************************************/
import { logger } from '@awssolutions/simple-cdf-logger';
import { DocumentClient } from 'aws-sdk/clients/dynamodb';
import { inject, injectable } from 'inversify';
import ow from 'ow';
import { TYPES } from '../di/types';
import { DynamoDbUtils } from '../utils/dynamoDb.util';
import {
PkType,
createDelimitedAttribute,
createDelimitedAttributePrefix,
expandDelimitedAttribute,
} from '../utils/pkUtils.util';
import {
MessageItem,
MessageListPaginationKey,
Recipient,
RecipientListPaginationKey,
ReplyItem,
ReplyListPaginationKey,
TaskBatchProgress,
} from './messages.models';
@injectable()
export class MessagesDao {
private readonly GSI1 = 'siKey1-sk-index';
private readonly GSI2 = 'siKey2-siSort2-index';
private _dc: DocumentClient;
public constructor(
@inject('aws.dynamoDb.table') private table: string,
@inject(TYPES.DynamoDbUtils) private dynamoDbUtils: DynamoDbUtils,
@inject(TYPES.DocumentClientFactory) documentClientFactory: () => DocumentClient
) {
this._dc = documentClientFactory();
}
public async getMessageByCorrelation(
correlationId: string,
targetId: string
): Promise<MessageItem> {
logger.debug(
`messages.dao getMessageByCorrelation: in: correlationId:${correlationId}, targetId${targetId}`
);
const params: DocumentClient.QueryInput = {
TableName: this.table,
IndexName: this.GSI1,
KeyConditionExpression: `#hash = :hash and #range = :range`,
ExpressionAttributeNames: {
'#hash': 'siKey1',
'#range': 'sk',
},
ExpressionAttributeValues: {
':hash': createDelimitedAttribute(PkType.Reply, correlationId),
':range': createDelimitedAttribute(PkType.Thing, targetId),
},
Select: 'ALL_ATTRIBUTES',
};
logger.silly(
`messages.dao getMessageByCorrelation: QueryInput: ${JSON.stringify(params)}`
);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao getMessageByCorrelation: exit: undefined');
return undefined;
}
const message = this.assembleMessage(results.Items[0]);
logger.debug(
`messages.dao getMessageByCorrelation: exit: message:${JSON.stringify(message)}`
);
return message;
}
public async getMessageById(id: string): Promise<MessageItem> {
logger.debug(`messages.dao getMessageById: in: id:${id}`);
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': createDelimitedAttribute(PkType.Message, id),
':sort': createDelimitedAttributePrefix(PkType.Message),
},
};
logger.silly(`messages.dao getMessageById: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao getMessageById: exit: undefined');
return undefined;
}
const message = this.assembleMessage(results.Items[0]);
logger.debug(`messages.dao getMessageById: exit: message:${JSON.stringify(message)}`);
return message;
}
public async saveMessage(message: MessageItem): Promise<void> {
logger.debug(`messages.dao saveMessage: in: message:${JSON.stringify(message)}`);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
const messageDbId = createDelimitedAttribute(PkType.Message, message.id);
const messageHeaderDbItem = {
PutRequest: {
Item: {
pk: messageDbId,
sk: messageDbId,
siKey2: createDelimitedAttribute(PkType.Command, message.commandId),
siSort2: createDelimitedAttribute(PkType.Message, message.createdAt),
messageId: message.id,
commandId: message.commandId,
payloadParamValues: message.payloadParamValues,
targets: message.targets,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
},
},
};
if (message.status !== undefined) {
messageHeaderDbItem.PutRequest.Item['status'] = message.status;
messageHeaderDbItem.PutRequest.Item['statusMessage'] = message.statusMessage;
}
params.RequestItems[this.table] = [messageHeaderDbItem];
logger.silly(`messages.dao saveMessage: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao saveMessage: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao saveMessage: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('SAVE_MESSAGE_FAILED');
}
logger.debug(`messages.dao save: exit:`);
}
public async updateMessage(message: MessageItem): Promise<void> {
logger.debug(`messages.dao save: in: updateMessage:${JSON.stringify(message)}`);
const messageDbId = createDelimitedAttribute(PkType.Message, message.id);
const params: DocumentClient.UpdateItemInput = {
TableName: this.table,
Key: {
pk: messageDbId,
sk: messageDbId,
},
UpdateExpression: '',
ExpressionAttributeNames: {},
ExpressionAttributeValues: {},
};
Object.keys(message).forEach((k) => {
if (
Object.prototype.hasOwnProperty.call(message, k) &&
k !== 'pk' &&
k !== 'sk' &&
k !== 'resolvedTargets' &&
message[k] !== undefined
) {
if (params.UpdateExpression === '') {
params.UpdateExpression += 'set ';
} else {
params.UpdateExpression += ', ';
}
params.UpdateExpression += `#${k} = :${k}`;
params.ExpressionAttributeNames[`#${k}`] = k;
params.ExpressionAttributeValues[`:${k}`] = message[k];
}
});
logger.silly(`messages.dao updateMessage: params:${JSON.stringify(params)}`);
const r = await this._dc.update(params).promise();
logger.silly(`messages.dao updateMessage: r:${JSON.stringify(r)}`);
logger.debug(`messages.dao updateMessage: exit:`);
}
public async saveResolvedTargets(message: MessageItem): Promise<void> {
logger.debug(`messages.dao saveResolvedTargets: in: message:${JSON.stringify(message)}`);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
const messageDbId = createDelimitedAttribute(PkType.Message, message.id);
if (message.resolvedTargets !== undefined) {
for (const target of message.resolvedTargets) {
const targetDbItem = {
PutRequest: {
Item: {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Thing, target.id),
siKey2: createDelimitedAttribute(PkType.Thing, target.id),
siSort2: createDelimitedAttribute(
PkType.Message,
new Date().getTime()
),
messageId: message.id,
targetId: target.id,
targetType: target.type,
status: target.status,
statusMessage: target.statusMessage,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
},
},
};
if (target.correlationId !== undefined) {
targetDbItem.PutRequest.Item['siKey1'] = createDelimitedAttribute(
PkType.Reply,
target.correlationId
);
targetDbItem.PutRequest.Item['correlationId'] = target.correlationId;
}
if (target.jobId !== undefined) {
targetDbItem.PutRequest.Item['jobId'] = target.jobId;
}
params.RequestItems[this.table].push(targetDbItem);
}
}
logger.silly(`messages.dao saveResolvedTargets: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao saveResolvedTargets: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao saveResolvedTargets: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('SAVE_MESSAGE_TARGETS_FAILED');
}
logger.debug(`messages.dao saveResolvedTargets: exit:`);
}
public async saveBatchProgress(message: MessageItem): Promise<void> {
logger.debug(`messages.dao saveBatchProgress: in: message:${JSON.stringify(message)}`);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
if (message.batchesTotal > 0) {
const messageDbId = createDelimitedAttribute(PkType.Message, message.id);
const batchDbId = createDelimitedAttribute(PkType.Message, message.id, 'batches');
const batchSummaryItem = {
PutRequest: {
Item: {
pk: messageDbId,
sk: batchDbId,
batchesTotal: message.batchesTotal,
batchesComplete: message.batchesComplete,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
},
},
};
params.RequestItems[this.table].push(batchSummaryItem);
}
logger.silly(`messages.dao saveBatchProgress: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao saveBatchProgress: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao saveBatchProgress: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('SAVE_MESSAGE_TARGETS_FAILED');
}
logger.debug(`messages.dao saveBatchProgress: exit:`);
}
public async listMessages(
commandId: string,
exclusiveStart?: MessageListPaginationKey,
count?: number
): Promise<[MessageItem[], MessageListPaginationKey]> {
logger.debug(
`messages.dao listMessages: in: commandId:${commandId}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
const siKey2 = createDelimitedAttribute(PkType.Command, commandId);
let exclusiveStartKey: DocumentClient.Key;
if (exclusiveStart?.createdAt !== undefined) {
exclusiveStartKey = {
siKey2,
siSort2: createDelimitedAttribute(PkType.Message, exclusiveStart.createdAt),
};
}
const params: DocumentClient.QueryInput = {
TableName: this.table,
IndexName: this.GSI2,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'siKey2',
'#sort': 'siSort2',
},
ExpressionAttributeValues: {
':hash': siKey2,
':sort': createDelimitedAttributePrefix(PkType.Message),
},
ExclusiveStartKey: exclusiveStartKey,
Limit: count,
};
logger.silly(`messages.dao listMessages: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao listMessages: exit: [[],undefined]');
return [[], undefined];
}
const messages = this.assembleMessages(results.Items);
let paginationKey: MessageListPaginationKey;
if (results.LastEvaluatedKey) {
const lastEvaluatedCreatedAt = Number(
expandDelimitedAttribute(results.LastEvaluatedKey.sk)[1]
);
paginationKey = {
createdAt: lastEvaluatedCreatedAt,
};
}
logger.debug(
`messages.dao listMessages: exit: messages:${JSON.stringify(
messages
)}, paginationKey:${JSON.stringify(paginationKey)}`
);
return [messages, paginationKey];
}
public async getRecipient(messageId: string, targetName: string): Promise<Recipient> {
logger.debug(
`messages.dao getRecipient: in: messageId:${messageId}, targetName:${targetName}`
);
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
const targetNameDbId = createDelimitedAttribute(PkType.Thing, targetName);
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND #sort = :sort`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': messageDbId,
':sort': targetNameDbId,
},
};
logger.silly(`messages.dao getRecipient: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao getRecipient: exit: undefined');
return undefined;
}
const recipient = this.assembleRecipient(results.Items[0]);
logger.debug(`messages.dao getRecipient: exit: message:${JSON.stringify(recipient)}`);
return recipient;
}
public async listRecipients(
id: string,
exclusiveStart?: RecipientListPaginationKey,
count?: number
): Promise<[Recipient[], RecipientListPaginationKey]> {
logger.debug(
`messages.dao listRecipients: in: id:${id}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
const messageDbId = createDelimitedAttribute(PkType.Message, id);
let exclusiveStartKey: DocumentClient.Key;
if (exclusiveStart?.targetName) {
exclusiveStartKey = {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Thing, exclusiveStart.targetName),
};
}
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': messageDbId,
':sort': createDelimitedAttributePrefix(PkType.Thing),
},
ExclusiveStartKey: exclusiveStartKey,
Limit: count,
};
logger.silly(`messages.dao listRecipients: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao listRecipients: exit: undefined');
return [undefined, undefined];
}
const recipients = this.assembleRecipients(results.Items);
let paginationKey: RecipientListPaginationKey;
if (results.LastEvaluatedKey) {
const lastEvaluatedThingName = expandDelimitedAttribute(
results.LastEvaluatedKey.sk
)[1];
paginationKey = {
targetName: lastEvaluatedThingName,
};
}
logger.debug(
`messages.dao listRecipients: exit: recipients:${JSON.stringify(
recipients
)}, paginationKey:${JSON.stringify(paginationKey)}`
);
return [recipients, paginationKey];
}
public async listReplies(
messageId: string,
targetName: string,
exclusiveStart?: ReplyListPaginationKey,
count?: number
): Promise<[ReplyItem[], ReplyListPaginationKey]> {
logger.debug(
`messages.dao listReplies: in: messageId:${messageId}, targetName:${targetName}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
let exclusiveStartKey: DocumentClient.Key;
if (exclusiveStart?.receivedAt) {
const skDbId = createDelimitedAttribute(
PkType.Reply,
PkType.Thing,
targetName,
exclusiveStart.receivedAt
);
exclusiveStartKey = {
pk: messageDbId,
sk: skDbId,
};
}
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': messageDbId,
':sort': createDelimitedAttributePrefix(PkType.Reply, PkType.Thing, targetName),
},
ExclusiveStartKey: exclusiveStartKey,
Limit: count,
};
logger.silly(`messages.dao listReplies: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao listReplies: exit: undefined');
return [undefined, undefined];
}
const replies = this.assembleReplies(results.Items);
let paginationKey: ReplyListPaginationKey;
if (results.LastEvaluatedKey) {
const lastEvaluatedReceivedAt = Number(
expandDelimitedAttribute(results.LastEvaluatedKey.sk)[3]
);
paginationKey = {
receivedAt: lastEvaluatedReceivedAt,
};
}
logger.debug(
`messages.dao listReplies: exit: replies:${JSON.stringify(
replies
)}, paginationKey:${JSON.stringify(paginationKey)}`
);
return [replies, paginationKey];
}
public async deleteMessage(messageId: string): Promise<void> {
logger.debug(`messages.dao deleteMessage: in: messageId:${messageId}`);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
const messageHeaderDbItem = {
DeleteRequest: {
Key: {
pk: messageDbId,
sk: messageDbId,
},
},
};
params.RequestItems[this.table].push(messageHeaderDbItem);
const messageBatchDbItem = {
DeleteRequest: {
Key: {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Message, messageId, 'batches'),
},
},
};
params.RequestItems[this.table].push(messageBatchDbItem);
logger.silly(`messages.dao deleteMessage: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao deleteMessage: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao deleteMessage: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('DELETE_MESSAGE_FAILED');
}
logger.debug(`messages.dao deleteMessage: exit:`);
}
public async deleteRecipient(messageId: string, targetName: string): Promise<void> {
logger.debug(
`messages.dao deleteRecipient: in: messageId:${messageId}, targetName:${targetName}`
);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
// delete the recipient item
const resolvedTargetDbItem = {
DeleteRequest: {
Key: {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Thing, targetName),
},
},
};
params.RequestItems[this.table].push(resolvedTargetDbItem);
// delete the recipient reply items
let exclusiveStart: ReplyListPaginationKey;
// eslint-disable-next-line no-constant-condition
while (true) {
const r = await this.listReplies(messageId, targetName, exclusiveStart);
r[0]?.forEach(async (reply) => {
const replyDbItem = {
DeleteRequest: {
Key: {
pk: reply.pk,
sk: reply.sk,
},
},
};
params.RequestItems[this.table].push(replyDbItem);
});
exclusiveStart = r[1];
if (exclusiveStart?.receivedAt === undefined) {
break;
}
}
logger.silly(`messages.dao deleteRecipient: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao deleteRecipient: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao deleteRecipient: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('DELETE_RECIPIENT_MESSAGE_FAILED');
}
logger.debug(`messages.dao deleteRecipient: exit:`);
}
public async incrementBatchesCompleted(messageId: string): Promise<TaskBatchProgress> {
logger.debug(`messages.dao incrementBatchesCompleted: in: messageId:${messageId}`);
// validation
ow(messageId, ow.string.nonEmpty);
const taskDbId = createDelimitedAttribute(PkType.Message, messageId);
const batchDbId = createDelimitedAttribute(PkType.Message, messageId, 'batches');
const params: DocumentClient.UpdateItemInput = {
TableName: this.table,
Key: {
pk: taskDbId,
sk: batchDbId,
},
UpdateExpression: 'set batchesComplete = batchesComplete + :val',
ExpressionAttributeValues: {
':val': 1,
},
ReturnValues: 'ALL_NEW',
};
const result = await this._dc.update(params).promise();
const response: TaskBatchProgress = {
complete: result.Attributes['batchesComplete'],
total: result.Attributes['batchesTotal'],
};
logger.debug(`messages.dao incrementBatchesCompleted: exit: ${JSON.stringify(response)}`);
return response;
}
private assembleMessages(items: DocumentClient.AttributeMap[]): MessageItem[] {
logger.silly(`messages.dao assembleMessages: in: items:${JSON.stringify(items)}`);
if (items === undefined) {
return undefined;
}
const messages: MessageItem[] = items.map((i) => this.assembleMessage(i));
logger.silly(`messages.dao assembleMessages: exit:${JSON.stringify(messages)}`);
return messages;
}
private assembleMessage(attrs: DocumentClient.AttributeMap): MessageItem {
logger.silly(`messages.dao assembleMessage: in: attrs:${JSON.stringify(attrs)}`);
const r: MessageItem = {
id: attrs.messageId,
commandId: attrs.commandId,
payloadParamValues: attrs.payloadParamValues,
targets: attrs.targets,
createdAt: attrs.createdAt,
updatedAt: attrs.updatedAt,
status: attrs.status,
statusMessage: attrs.statusMessage,
};
logger.silly(`messages.dao assembleMessage: exit:${JSON.stringify(r)}`);
return r;
}
private assembleRecipients(items: DocumentClient.AttributeMap[]): Recipient[] {
logger.silly(`messages.dao assembleRecipients: in: items:${JSON.stringify(items)}`);
if (items === undefined) {
return undefined;
}
const recipients: Recipient[] = items.map((i) => this.assembleRecipient(i));
logger.silly(`messages.dao assembleRecipients: exit:${JSON.stringify(recipients)}`);
return recipients;
}
private assembleRecipient(attrs: DocumentClient.AttributeMap): Recipient {
logger.silly(`messages.dao assembleRecipient: in: attrs:${JSON.stringify(attrs)}`);
const r: Recipient = {
id: attrs.targetId,
type: attrs.targetType,
status: attrs.status,
statusMessage: attrs.statusMessage,
correlationId: attrs.correlationId,
jobId: attrs.jobId,
};
logger.silly(`messages.dao assembleRecipient: exit:${JSON.stringify(r)}`);
return r;
}
private assembleReplies(items: DocumentClient.AttributeMap[]): ReplyItem[] {
logger.silly(`messages.dao assembleReplies: in: items:${JSON.stringify(items)}`);
if (items === undefined) {
return undefined;
}
const replies: ReplyItem[] = items.map((i) => ({
receivedAt: new Date(i.createdAt),
action: i.action,
payload: i.payload,
pk: i.pk,
sk: i.sk,
}));
logger.silly(`messages.dao assembleReplies: exit:${JSON.stringify(replies)}`);
return replies;
}
}