siKey2: createDelimitedAttribute()

in source/packages/services/command-and-control/src/messages/messages.dao.ts [223:614]


                            siKey2: createDelimitedAttribute(PkType.Thing, target.id),
                            siSort2: createDelimitedAttribute(
                                PkType.Message,
                                new Date().getTime()
                            ),
                            messageId: message.id,
                            targetId: target.id,
                            targetType: target.type,
                            status: target.status,
                            statusMessage: target.statusMessage,
                            createdAt: message.createdAt,
                            updatedAt: message.updatedAt,
                        },
                    },
                };
                if (target.correlationId !== undefined) {
                    targetDbItem.PutRequest.Item['siKey1'] = createDelimitedAttribute(
                        PkType.Reply,
                        target.correlationId
                    );
                    targetDbItem.PutRequest.Item['correlationId'] = target.correlationId;
                }
                if (target.jobId !== undefined) {
                    targetDbItem.PutRequest.Item['jobId'] = target.jobId;
                }

                params.RequestItems[this.table].push(targetDbItem);
            }
        }

        logger.silly(`messages.dao saveResolvedTargets: params:${JSON.stringify(params)}`);
        const r = await this.dynamoDbUtils.batchWriteAll(params);
        logger.silly(`messages.dao saveResolvedTargets: r:${JSON.stringify(r)}`);
        if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
            logger.error(
                `messages.dao saveResolvedTargets: has unprocessed items: ${JSON.stringify(
                    r.UnprocessedItems
                )}`
            );
            throw new Error('SAVE_MESSAGE_TARGETS_FAILED');
        }

        logger.debug(`messages.dao saveResolvedTargets: exit:`);
    }

    public async saveBatchProgress(message: MessageItem): Promise<void> {
        logger.debug(`messages.dao saveBatchProgress: in: message:${JSON.stringify(message)}`);

        const params: DocumentClient.BatchWriteItemInput = {
            RequestItems: {},
        };
        params.RequestItems[this.table] = [];

        if (message.batchesTotal > 0) {
            const messageDbId = createDelimitedAttribute(PkType.Message, message.id);
            const batchDbId = createDelimitedAttribute(PkType.Message, message.id, 'batches');
            const batchSummaryItem = {
                PutRequest: {
                    Item: {
                        pk: messageDbId,
                        sk: batchDbId,
                        batchesTotal: message.batchesTotal,
                        batchesComplete: message.batchesComplete,
                        createdAt: message.createdAt,
                        updatedAt: message.updatedAt,
                    },
                },
            };
            params.RequestItems[this.table].push(batchSummaryItem);
        }
        logger.silly(`messages.dao saveBatchProgress: params:${JSON.stringify(params)}`);
        const r = await this.dynamoDbUtils.batchWriteAll(params);
        logger.silly(`messages.dao saveBatchProgress: r:${JSON.stringify(r)}`);
        if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
            logger.error(
                `messages.dao saveBatchProgress: has unprocessed items: ${JSON.stringify(
                    r.UnprocessedItems
                )}`
            );
            throw new Error('SAVE_MESSAGE_TARGETS_FAILED');
        }

        logger.debug(`messages.dao saveBatchProgress: exit:`);
    }

    public async listMessages(
        commandId: string,
        exclusiveStart?: MessageListPaginationKey,
        count?: number
    ): Promise<[MessageItem[], MessageListPaginationKey]> {
        logger.debug(
            `messages.dao listMessages: in: commandId:${commandId}, exclusiveStart:${JSON.stringify(
                exclusiveStart
            )}, count:${count}`
        );

        const siKey2 = createDelimitedAttribute(PkType.Command, commandId);

        let exclusiveStartKey: DocumentClient.Key;
        if (exclusiveStart?.createdAt !== undefined) {
            exclusiveStartKey = {
                siKey2,
                siSort2: createDelimitedAttribute(PkType.Message, exclusiveStart.createdAt),
            };
        }

        const params: DocumentClient.QueryInput = {
            TableName: this.table,
            IndexName: this.GSI2,
            KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
            ExpressionAttributeNames: {
                '#hash': 'siKey2',
                '#sort': 'siSort2',
            },
            ExpressionAttributeValues: {
                ':hash': siKey2,
                ':sort': createDelimitedAttributePrefix(PkType.Message),
            },
            ExclusiveStartKey: exclusiveStartKey,
            Limit: count,
        };

        logger.silly(`messages.dao listMessages: QueryInput: ${JSON.stringify(params)}`);
        const results = await this._dc.query(params).promise();
        logger.silly(`query result: ${JSON.stringify(results)}`);
        if ((results?.Count ?? 0) === 0) {
            logger.debug('messages.dao listMessages: exit: [[],undefined]');
            return [[], undefined];
        }

        const messages = this.assembleMessages(results.Items);
        let paginationKey: MessageListPaginationKey;
        if (results.LastEvaluatedKey) {
            const lastEvaluatedCreatedAt = Number(
                expandDelimitedAttribute(results.LastEvaluatedKey.sk)[1]
            );
            paginationKey = {
                createdAt: lastEvaluatedCreatedAt,
            };
        }

        logger.debug(
            `messages.dao listMessages: exit: messages:${JSON.stringify(
                messages
            )}, paginationKey:${JSON.stringify(paginationKey)}`
        );
        return [messages, paginationKey];
    }

    public async getRecipient(messageId: string, targetName: string): Promise<Recipient> {
        logger.debug(
            `messages.dao getRecipient: in: messageId:${messageId}, targetName:${targetName}`
        );

        const messageDbId = createDelimitedAttribute(PkType.Message, messageId);
        const targetNameDbId = createDelimitedAttribute(PkType.Thing, targetName);

        const params: DocumentClient.QueryInput = {
            TableName: this.table,
            KeyConditionExpression: `#hash = :hash AND #sort = :sort`,
            ExpressionAttributeNames: {
                '#hash': 'pk',
                '#sort': 'sk',
            },
            ExpressionAttributeValues: {
                ':hash': messageDbId,
                ':sort': targetNameDbId,
            },
        };

        logger.silly(`messages.dao getRecipient: QueryInput: ${JSON.stringify(params)}`);
        const results = await this._dc.query(params).promise();
        logger.silly(`query result: ${JSON.stringify(results)}`);
        if ((results?.Count ?? 0) === 0) {
            logger.debug('messages.dao getRecipient: exit: undefined');
            return undefined;
        }

        const recipient = this.assembleRecipient(results.Items[0]);
        logger.debug(`messages.dao getRecipient: exit: message:${JSON.stringify(recipient)}`);
        return recipient;
    }

    public async listRecipients(
        id: string,
        exclusiveStart?: RecipientListPaginationKey,
        count?: number
    ): Promise<[Recipient[], RecipientListPaginationKey]> {
        logger.debug(
            `messages.dao listRecipients: in: id:${id}, exclusiveStart:${JSON.stringify(
                exclusiveStart
            )}, count:${count}`
        );

        const messageDbId = createDelimitedAttribute(PkType.Message, id);

        let exclusiveStartKey: DocumentClient.Key;
        if (exclusiveStart?.targetName) {
            exclusiveStartKey = {
                pk: messageDbId,
                sk: createDelimitedAttribute(PkType.Thing, exclusiveStart.targetName),
            };
        }

        const params: DocumentClient.QueryInput = {
            TableName: this.table,
            KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
            ExpressionAttributeNames: {
                '#hash': 'pk',
                '#sort': 'sk',
            },
            ExpressionAttributeValues: {
                ':hash': messageDbId,
                ':sort': createDelimitedAttributePrefix(PkType.Thing),
            },
            ExclusiveStartKey: exclusiveStartKey,
            Limit: count,
        };

        logger.silly(`messages.dao listRecipients: QueryInput: ${JSON.stringify(params)}`);
        const results = await this._dc.query(params).promise();
        logger.silly(`query result: ${JSON.stringify(results)}`);
        if ((results?.Count ?? 0) === 0) {
            logger.debug('messages.dao listRecipients: exit: undefined');
            return [undefined, undefined];
        }

        const recipients = this.assembleRecipients(results.Items);
        let paginationKey: RecipientListPaginationKey;
        if (results.LastEvaluatedKey) {
            const lastEvaluatedThingName = expandDelimitedAttribute(
                results.LastEvaluatedKey.sk
            )[1];
            paginationKey = {
                targetName: lastEvaluatedThingName,
            };
        }

        logger.debug(
            `messages.dao listRecipients: exit: recipients:${JSON.stringify(
                recipients
            )}, paginationKey:${JSON.stringify(paginationKey)}`
        );
        return [recipients, paginationKey];
    }

    public async listReplies(
        messageId: string,
        targetName: string,
        exclusiveStart?: ReplyListPaginationKey,
        count?: number
    ): Promise<[ReplyItem[], ReplyListPaginationKey]> {
        logger.debug(
            `messages.dao listReplies: in: messageId:${messageId}, targetName:${targetName}, exclusiveStart:${JSON.stringify(
                exclusiveStart
            )}, count:${count}`
        );

        const messageDbId = createDelimitedAttribute(PkType.Message, messageId);

        let exclusiveStartKey: DocumentClient.Key;
        if (exclusiveStart?.receivedAt) {
            const skDbId = createDelimitedAttribute(
                PkType.Reply,
                PkType.Thing,
                targetName,
                exclusiveStart.receivedAt
            );
            exclusiveStartKey = {
                pk: messageDbId,
                sk: skDbId,
            };
        }

        const params: DocumentClient.QueryInput = {
            TableName: this.table,
            KeyConditionExpression: `#hash = :hash AND begins_with(#sort, :sort)`,
            ExpressionAttributeNames: {
                '#hash': 'pk',
                '#sort': 'sk',
            },
            ExpressionAttributeValues: {
                ':hash': messageDbId,
                ':sort': createDelimitedAttributePrefix(PkType.Reply, PkType.Thing, targetName),
            },
            ExclusiveStartKey: exclusiveStartKey,
            Limit: count,
        };

        logger.silly(`messages.dao listReplies: QueryInput: ${JSON.stringify(params)}`);
        const results = await this._dc.query(params).promise();
        logger.silly(`query result: ${JSON.stringify(results)}`);
        if ((results?.Count ?? 0) === 0) {
            logger.debug('messages.dao listReplies: exit: undefined');
            return [undefined, undefined];
        }

        const replies = this.assembleReplies(results.Items);
        let paginationKey: ReplyListPaginationKey;
        if (results.LastEvaluatedKey) {
            const lastEvaluatedReceivedAt = Number(
                expandDelimitedAttribute(results.LastEvaluatedKey.sk)[3]
            );
            paginationKey = {
                receivedAt: lastEvaluatedReceivedAt,
            };
        }

        logger.debug(
            `messages.dao listReplies: exit: replies:${JSON.stringify(
                replies
            )}, paginationKey:${JSON.stringify(paginationKey)}`
        );
        return [replies, paginationKey];
    }

    public async deleteMessage(messageId: string): Promise<void> {
        logger.debug(`messages.dao deleteMessage: in: messageId:${messageId}`);

        const params: DocumentClient.BatchWriteItemInput = {
            RequestItems: {},
        };
        params.RequestItems[this.table] = [];
        const messageDbId = createDelimitedAttribute(PkType.Message, messageId);

        const messageHeaderDbItem = {
            DeleteRequest: {
                Key: {
                    pk: messageDbId,
                    sk: messageDbId,
                },
            },
        };
        params.RequestItems[this.table].push(messageHeaderDbItem);

        const messageBatchDbItem = {
            DeleteRequest: {
                Key: {
                    pk: messageDbId,
                    sk: createDelimitedAttribute(PkType.Message, messageId, 'batches'),
                },
            },
        };
        params.RequestItems[this.table].push(messageBatchDbItem);

        logger.silly(`messages.dao deleteMessage: params:${JSON.stringify(params)}`);
        const r = await this.dynamoDbUtils.batchWriteAll(params);
        logger.silly(`messages.dao deleteMessage: r:${JSON.stringify(r)}`);
        if (this.dynamoDbUtils.hasUnprocessedItems(r)) {
            logger.error(
                `messages.dao deleteMessage: has unprocessed items: ${JSON.stringify(
                    r.UnprocessedItems
                )}`
            );
            throw new Error('DELETE_MESSAGE_FAILED');
        }

        logger.debug(`messages.dao deleteMessage: exit:`);
    }

    public async deleteRecipient(messageId: string, targetName: string): Promise<void> {
        logger.debug(
            `messages.dao deleteRecipient: in: messageId:${messageId}, targetName:${targetName}`
        );

        const params: DocumentClient.BatchWriteItemInput = {
            RequestItems: {},
        };
        params.RequestItems[this.table] = [];

        const messageDbId = createDelimitedAttribute(PkType.Message, messageId);

        // delete the recipient item
        const resolvedTargetDbItem = {
            DeleteRequest: {
                Key: {
                    pk: messageDbId,
                    sk: createDelimitedAttribute(PkType.Thing, targetName),
                },
            },
        };
        params.RequestItems[this.table].push(resolvedTargetDbItem);

        // delete the recipient reply items
        let exclusiveStart: ReplyListPaginationKey;
        // eslint-disable-next-line no-constant-condition
        while (true) {
            const r = await this.listReplies(messageId, targetName, exclusiveStart);
            r[0]?.forEach(async (reply) => {
                const replyDbItem = {
                    DeleteRequest: {
                        Key: {