in source/packages/services/command-and-control/src/messages/messages.service.ts [275:366]
public async listReplies(
messageId: string,
thingName: string,
exclusiveStart?: ReplyListPaginationKey,
count?: number
): Promise<[ReplyItem[], ReplyListPaginationKey]> {
logger.debug(
`messages.service listReplies: in: messageId:${messageId}, thingName:${thingName}, exclusiveStart:${JSON.stringify(
exclusiveStart
)}, count:${count}`
);
ow(messageId, ow.string.nonEmpty);
ow(thingName, ow.string.nonEmpty);
if (count) {
count = Number(count);
}
// TODO: retrieve message and recipient concurrently to speed things up
const message = await this.messagesDao.getMessageById(messageId);
if (message === undefined) {
throw new Error('NOT_FOUND: Message not found');
}
const recipient = await this.messagesDao.getRecipient(messageId, thingName);
if (recipient === undefined) {
throw new Error('NOT_FOUND: Recipient not found');
}
const command = (await this.commandsDao.get([message.commandId]))?.[0];
if (command === undefined) {
throw new Error('NOT_FOUND: Command not found');
}
// if the command was SHADOW/TOPIC then we retrieve the replies from our own datastore. But if JOB then we need to retrieve them from the job system.
let result: [ReplyItem[], ReplyListPaginationKey];
if (command.deliveryMethod.type === 'JOB') {
ow(recipient.jobId, ow.string.nonEmpty);
const replies: ReplyItem[] = [];
const r = await this.iot
.describeJobExecution({
thingName,
jobId: recipient.jobId,
})
.promise();
// main difference between JOB delivery method and others is that JOB may only have a single reply/payload as not possible to hook into its execution state change events
if (r.execution) {
// translate the job execution status to the common statues we use across all delivery methods
const jobAction = r.execution.status;
let action: ResponseAction;
switch (jobAction) {
case 'IN_PROGRESS':
action = 'accepted';
break;
case 'SUCCEEDED':
action = 'reply';
break;
case 'FAILED':
case 'TIMED_OUT':
case 'REJECTED':
case 'REMOVED':
case 'CANCELED':
action = 'rejected';
break;
}
// remove correlationId if provided as not needed for reply
let payload = r.execution.statusDetails.detailsMap;
if (payload) {
delete payload.correlationId;
}
// to keep things tidy, prefer undefined payload rather than empty json to be consistent with other delivery methods
if (Object.keys(payload || {}).length === 0) {
payload = undefined;
}
replies.push({
receivedAt: r.execution.lastUpdatedAt,
action,
payload,
});
}
result = [replies, undefined];
} else {
result = await this.messagesDao.listReplies(
messageId,
thingName,
exclusiveStart,
count
);
}
logger.debug(`messages.service listReplies: exit: ${JSON.stringify(result)}`);
return result;
}