public async listReplies()

in source/packages/services/command-and-control/src/messages/messages.service.ts [275:366]


    public async listReplies(
        messageId: string,
        thingName: string,
        exclusiveStart?: ReplyListPaginationKey,
        count?: number
    ): Promise<[ReplyItem[], ReplyListPaginationKey]> {
        logger.debug(
            `messages.service listReplies: in: messageId:${messageId}, thingName:${thingName}, exclusiveStart:${JSON.stringify(
                exclusiveStart
            )}, count:${count}`
        );

        ow(messageId, ow.string.nonEmpty);
        ow(thingName, ow.string.nonEmpty);

        if (count) {
            count = Number(count);
        }

        // TODO: retrieve message and recipient concurrently to speed things up
        const message = await this.messagesDao.getMessageById(messageId);
        if (message === undefined) {
            throw new Error('NOT_FOUND: Message not found');
        }
        const recipient = await this.messagesDao.getRecipient(messageId, thingName);
        if (recipient === undefined) {
            throw new Error('NOT_FOUND: Recipient not found');
        }
        const command = (await this.commandsDao.get([message.commandId]))?.[0];
        if (command === undefined) {
            throw new Error('NOT_FOUND: Command not found');
        }

        // if the command was SHADOW/TOPIC then we retrieve the replies from our own datastore. But if JOB then we need to retrieve them from the job system.
        let result: [ReplyItem[], ReplyListPaginationKey];
        if (command.deliveryMethod.type === 'JOB') {
            ow(recipient.jobId, ow.string.nonEmpty);
            const replies: ReplyItem[] = [];
            const r = await this.iot
                .describeJobExecution({
                    thingName,
                    jobId: recipient.jobId,
                })
                .promise();
            // main difference between JOB delivery method and others is that JOB may only have a single reply/payload as not possible to hook into its execution state change events
            if (r.execution) {
                // translate the job execution status to the common statues we use across all delivery methods
                const jobAction = r.execution.status;
                let action: ResponseAction;
                switch (jobAction) {
                    case 'IN_PROGRESS':
                        action = 'accepted';
                        break;
                    case 'SUCCEEDED':
                        action = 'reply';
                        break;
                    case 'FAILED':
                    case 'TIMED_OUT':
                    case 'REJECTED':
                    case 'REMOVED':
                    case 'CANCELED':
                        action = 'rejected';
                        break;
                }
                // remove correlationId if provided as not needed for reply
                let payload = r.execution.statusDetails.detailsMap;
                if (payload) {
                    delete payload.correlationId;
                }
                // to keep things tidy, prefer undefined payload rather than empty json to be consistent with other delivery methods
                if (Object.keys(payload || {}).length === 0) {
                    payload = undefined;
                }
                replies.push({
                    receivedAt: r.execution.lastUpdatedAt,
                    action,
                    payload,
                });
            }
            result = [replies, undefined];
        } else {
            result = await this.messagesDao.listReplies(
                messageId,
                thingName,
                exclusiveStart,
                count
            );
        }

        logger.debug(`messages.service listReplies: exit: ${JSON.stringify(result)}`);
        return result;
    }