in source/packages/services/command-and-control/src/messages/messages.dao.ts [222:614]
sk: createDelimitedAttribute(PkType.Thing, target.id),
siKey2: createDelimitedAttribute(PkType.Thing, target.id),
siSort2: createDelimitedAttribute(
PkType.Message,
new Date().getTime()
),
messageId: message.id,
targetId: target.id,
targetType: target.type,
status: target.status,
statusMessage: target.statusMessage,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
},
},
};
if (target.correlationId !== undefined) {
targetDbItem.PutRequest.Item['siKey1'] = createDelimitedAttribute(
PkType.Reply,
target.correlationId
);
targetDbItem.PutRequest.Item['correlationId'] = target.correlationId;
}
if (target.jobId !== undefined) {
targetDbItem.PutRequest.Item['jobId'] = target.jobId;
}
params.RequestItems[this.table].push(targetDbItem);
}
}
logger.silly(`messages.dao saveResolvedTargets: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao saveResolvedTargets: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao saveResolvedTargets: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('SAVE_MESSAGE_TARGETS_FAILED');
}
logger.debug(`messages.dao saveResolvedTargets: exit:`);
}
public async saveBatchProgress(message: MessageItem): Promise<void> {
logger.debug(`messages.dao saveBatchProgress: in: message:${JSON.stringify(message)}`);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
if (message.batchesTotal > 0) {
const messageDbId = createDelimitedAttribute(PkType.Message, message.id);
const batchDbId = createDelimitedAttribute(PkType.Message, message.id, 'batches');
const batchSummaryItem = {
PutRequest: {
Item: {
pk: messageDbId,
sk: batchDbId,
batchesTotal: message.batchesTotal,
batchesComplete: message.batchesComplete,
createdAt: message.createdAt,
updatedAt: message.updatedAt,
},
},
};
params.RequestItems[this.table].push(batchSummaryItem);
}
logger.silly(`messages.dao saveBatchProgress: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao saveBatchProgress: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao saveBatchProgress: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('SAVE_MESSAGE_TARGETS_FAILED');
}
logger.debug(`messages.dao saveBatchProgress: exit:`);
}
public async listMessages(
commandId: string,
exclusiveStart?: MessageListPaginationKey,
count?: number
): Promise<[MessageItem[], MessageListPaginationKey]> {
logger.debug(
`messages.dao listMessages: in: commandId:${commandId}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
const siKey2 = createDelimitedAttribute(PkType.Command, commandId);
let exclusiveStartKey: DocumentClient.Key;
if (exclusiveStart?.createdAt !== undefined) {
exclusiveStartKey = {
siKey2,
siSort2: createDelimitedAttribute(PkType.Message, exclusiveStart.createdAt),
};
}
const params: DocumentClient.QueryInput = {
TableName: this.table,
IndexName: this.GSI2,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'siKey2',
'#sort': 'siSort2',
},
ExpressionAttributeValues: {
':hash': siKey2,
':sort': createDelimitedAttributePrefix(PkType.Message),
},
ExclusiveStartKey: exclusiveStartKey,
Limit: count,
};
logger.silly(`messages.dao listMessages: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao listMessages: exit: [[],undefined]');
return [[], undefined];
}
const messages = this.assembleMessages(results.Items);
let paginationKey: MessageListPaginationKey;
if (results.LastEvaluatedKey) {
const lastEvaluatedCreatedAt = Number(
expandDelimitedAttribute(results.LastEvaluatedKey.sk)[1]
);
paginationKey = {
createdAt: lastEvaluatedCreatedAt,
};
}
logger.debug(
`messages.dao listMessages: exit: messages:${JSON.stringify(
messages
)}, paginationKey:${JSON.stringify(paginationKey)}`
);
return [messages, paginationKey];
}
public async getRecipient(messageId: string, targetName: string): Promise<Recipient> {
logger.debug(
`messages.dao getRecipient: in: messageId:${messageId}, targetName:${targetName}`
);
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
const targetNameDbId = createDelimitedAttribute(PkType.Thing, targetName);
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND #sort = :sort`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': messageDbId,
':sort': targetNameDbId,
},
};
logger.silly(`messages.dao getRecipient: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao getRecipient: exit: undefined');
return undefined;
}
const recipient = this.assembleRecipient(results.Items[0]);
logger.debug(`messages.dao getRecipient: exit: message:${JSON.stringify(recipient)}`);
return recipient;
}
public async listRecipients(
id: string,
exclusiveStart?: RecipientListPaginationKey,
count?: number
): Promise<[Recipient[], RecipientListPaginationKey]> {
logger.debug(
`messages.dao listRecipients: in: id:${id}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
const messageDbId = createDelimitedAttribute(PkType.Message, id);
let exclusiveStartKey: DocumentClient.Key;
if (exclusiveStart?.targetName) {
exclusiveStartKey = {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Thing, exclusiveStart.targetName),
};
}
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': messageDbId,
':sort': createDelimitedAttributePrefix(PkType.Thing),
},
ExclusiveStartKey: exclusiveStartKey,
Limit: count,
};
logger.silly(`messages.dao listRecipients: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao listRecipients: exit: undefined');
return [undefined, undefined];
}
const recipients = this.assembleRecipients(results.Items);
let paginationKey: RecipientListPaginationKey;
if (results.LastEvaluatedKey) {
const lastEvaluatedThingName = expandDelimitedAttribute(
results.LastEvaluatedKey.sk
)[1];
paginationKey = {
targetName: lastEvaluatedThingName,
};
}
logger.debug(
`messages.dao listRecipients: exit: recipients:${JSON.stringify(
recipients
)}, paginationKey:${JSON.stringify(paginationKey)}`
);
return [recipients, paginationKey];
}
public async listReplies(
messageId: string,
targetName: string,
exclusiveStart?: ReplyListPaginationKey,
count?: number
): Promise<[ReplyItem[], ReplyListPaginationKey]> {
logger.debug(
`messages.dao listReplies: in: messageId:${messageId}, targetName:${targetName}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
let exclusiveStartKey: DocumentClient.Key;
if (exclusiveStart?.receivedAt) {
const skDbId = createDelimitedAttribute(
PkType.Reply,
PkType.Thing,
targetName,
exclusiveStart.receivedAt
);
exclusiveStartKey = {
pk: messageDbId,
sk: skDbId,
};
}
const params: DocumentClient.QueryInput = {
TableName: this.table,
KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
ExpressionAttributeNames: {
'#hash': 'pk',
'#sort': 'sk',
},
ExpressionAttributeValues: {
':hash': messageDbId,
':sort': createDelimitedAttributePrefix(PkType.Reply, PkType.Thing, targetName),
},
ExclusiveStartKey: exclusiveStartKey,
Limit: count,
};
logger.silly(`messages.dao listReplies: QueryInput: ${JSON.stringify(params)}`);
const results = await this._dc.query(params).promise();
logger.silly(`query result: ${JSON.stringify(results)}`);
if ((results?.Count ?? 0) === 0) {
logger.debug('messages.dao listReplies: exit: undefined');
return [undefined, undefined];
}
const replies = this.assembleReplies(results.Items);
let paginationKey: ReplyListPaginationKey;
if (results.LastEvaluatedKey) {
const lastEvaluatedReceivedAt = Number(
expandDelimitedAttribute(results.LastEvaluatedKey.sk)[3]
);
paginationKey = {
receivedAt: lastEvaluatedReceivedAt,
};
}
logger.debug(
`messages.dao listReplies: exit: replies:${JSON.stringify(
replies
)}, paginationKey:${JSON.stringify(paginationKey)}`
);
return [replies, paginationKey];
}
public async deleteMessage(messageId: string): Promise<void> {
logger.debug(`messages.dao deleteMessage: in: messageId:${messageId}`);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
const messageHeaderDbItem = {
DeleteRequest: {
Key: {
pk: messageDbId,
sk: messageDbId,
},
},
};
params.RequestItems[this.table].push(messageHeaderDbItem);
const messageBatchDbItem = {
DeleteRequest: {
Key: {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Message, messageId, 'batches'),
},
},
};
params.RequestItems[this.table].push(messageBatchDbItem);
logger.silly(`messages.dao deleteMessage: params:${JSON.stringify(params)}`);
const r = await this.dynamoDbUtils.batchWriteAll(params);
logger.silly(`messages.dao deleteMessage: r:${JSON.stringify(r)}`);
if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
logger.error(
`messages.dao deleteMessage: has unprocessed items: ${JSON.stringify(
r.UnprocessedItems
)}`
);
throw new Error('DELETE_MESSAGE_FAILED');
}
logger.debug(`messages.dao deleteMessage: exit:`);
}
public async deleteRecipient(messageId: string, targetName: string): Promise<void> {
logger.debug(
`messages.dao deleteRecipient: in: messageId:${messageId}, targetName:${targetName}`
);
const params: DocumentClient.BatchWriteItemInput = {
RequestItems: {},
};
params.RequestItems[this.table] = [];
const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
// delete the recipient item
const resolvedTargetDbItem = {
DeleteRequest: {
Key: {
pk: messageDbId,
sk: createDelimitedAttribute(PkType.Thing, targetName),
},
},
};
params.RequestItems[this.table].push(resolvedTargetDbItem);
// delete the recipient reply items
let exclusiveStart: ReplyListPaginationKey;
// eslint-disable-next-line no-constant-condition
while (true) {
const r = await this.listReplies(messageId, targetName, exclusiveStart);
r[0]?.forEach(async (reply) => {
const replyDbItem = {
DeleteRequest: {
Key: {