libs/logic-apps-shared/src/designer-client-services/lib/consumption/run.ts (303 lines of code) (raw):
import { inputsResponse, outputsResponse } from '../__test__/__mocks__/monitoringInputsOutputsResponse';
import type { HttpRequestOptions, IHttpClient } from '../httpClient';
import type { IRunService } from '../run';
import type { CallbackInfo } from '../callbackInfo';
import type { ContentLink, Runs, ArmResources, Run, LogicAppsV2 } from '../../../utils/src';
import {
ArgumentException,
isCallbackInfoWithRelativePath,
HTTP_METHODS,
getCallbackUrl,
getRecordEntry,
UnsupportedException,
isNullOrUndefined,
} from '../../../utils/src';
import { LoggerService } from '../logger';
import { LogEntryLevel } from '../logging/logEntry';
export interface ConsumptionRunServiceOptions {
apiVersion: string;
baseUrl: string;
httpClient: IHttpClient;
updateCors?: () => void;
accessToken?: string;
workflowId: string;
isDev?: boolean;
}
export class ConsumptionRunService implements IRunService {
_isDev = false;
constructor(public readonly options: ConsumptionRunServiceOptions) {
const { apiVersion, baseUrl, isDev } = options;
if (!baseUrl) {
throw new ArgumentException('baseUrl required');
}
if (!apiVersion) {
throw new ArgumentException('apiVersion required');
}
this._isDev = isDev || false;
}
async getContent(contentLink: ContentLink): Promise<any> {
const { uri, contentSize } = contentLink;
const { httpClient } = this.options;
if (!uri) {
throw new Error();
}
LoggerService().log({
level: LogEntryLevel.Verbose,
area: 'getContent consumption run service',
message: `Content size: ${contentSize}`,
args: [`size: ${contentSize}`],
});
// 2MB
if (contentSize > 2097152) {
return undefined;
}
try {
const response = await httpClient.get<any>({
uri,
noAuth: true,
headers: { 'Access-Control-Allow-Origin': '*' },
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
private getAccessTokenHeaders = () => {
const { accessToken } = this.options;
if (!accessToken) {
return undefined;
}
return new Headers({
Authorization: accessToken,
});
};
async getMoreRuns(continuationToken: string): Promise<Runs> {
const headers = this.getAccessTokenHeaders();
const { httpClient } = this.options;
try {
const response = await httpClient.get<ArmResources<Run>>({
uri: continuationToken,
headers: headers as Record<string, any>,
});
const { nextLink, value: runs }: ArmResources<Run> = response;
return { nextLink, runs };
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Gets run details.
* @param {string} runId - Run id.
* @returns {Promise<Run>} Workflow runs.
*/
async getRun(runId: string): Promise<Run> {
const { apiVersion, baseUrl, httpClient } = this.options;
const uri = `${baseUrl}${runId}?api-version=${apiVersion}&$expand=properties/actions,workflow/properties`;
try {
const response = await httpClient.get<Run>({
uri,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Gets workflow run history
* @returns {Promise<Runs>} Workflow runs.
*/
async getRuns(): Promise<Runs> {
const { apiVersion, baseUrl, workflowId, httpClient } = this.options;
const headers = this.getAccessTokenHeaders();
const uri = `${baseUrl}${workflowId}/runs?api-version=${apiVersion}`;
try {
const response = await httpClient.get<ArmResources<Run>>({
uri,
headers: headers as Record<string, any>,
});
const { nextLink, value: runs }: ArmResources<Run> = response;
return { nextLink, runs };
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Gets an array of scope repetition records for a node with the specified status.
* @param {{ actionId: string, runId: string }} action - An object with nodeId and the runId of the workflow
* @param {string} status - The status of scope repetition records to fetch
* @return {Promise<RunScopeRepetition[]>}
*/
async getScopeRepetitions(
action: { nodeId: string; runId: string | undefined },
status?: string
): Promise<{ value: LogicAppsV2.RunRepetition[] }> {
const { nodeId, runId } = action;
if (this._isDev) {
return Promise.resolve({ value: [] });
}
const { apiVersion, baseUrl, httpClient } = this.options;
const headers = this.getAccessTokenHeaders();
const filter = status ? `&$filter=status eq '${status}'` : '';
const uri = `${baseUrl}${runId}/actions/${nodeId}/scopeRepetitions?api-version=${apiVersion}${filter}`;
try {
const response = await httpClient.get<{ value: LogicAppsV2.RunRepetition[] }>({
uri,
headers: headers as Record<string, any>,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Gets an array of scope repetition records for a node with the specified status.
* @param {{ actionId: string, runId: string }} action - An object with nodeId and the runId of the workflow
* @param {string} repetitionId - A string with the resource ID of a repetition record
* @return {Promise<RunScopeRepetition[]>}
*/
async getAgentRepetition(
action: { nodeId: string; runId: string | undefined },
repetitionId: string
): Promise<LogicAppsV2.RunRepetition> {
const { nodeId, runId } = action;
const { apiVersion, baseUrl, httpClient } = this.options;
const headers = this.getAccessTokenHeaders();
const uri = `${baseUrl}${runId}/actions/${nodeId}/agentRepetitions/${repetitionId}?api-version=${apiVersion}`;
try {
const response = await httpClient.get<LogicAppsV2.RunRepetition>({
uri,
headers: headers as Record<string, any>,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Gets an array of scope repetition records for a node with the specified status.
* @param {{ actionId: string, runId: string }} action - An object with nodeId and the runId of the workflow
* @param {string} repetitionId - A string with the resource ID of a repetition record
* @return {Promise<RunScopeRepetition[]>}
*/
async getAgentActionsRepetition(action: { nodeId: string; runId: string | undefined }, repetitionId: string): Promise<any> {
const { nodeId, runId } = action;
const { apiVersion, baseUrl, httpClient } = this.options;
const headers = this.getAccessTokenHeaders();
const uri = `${baseUrl}${runId}/actions/${nodeId}/agentRepetitions/${repetitionId}/actions?api-version=${apiVersion}`;
try {
const response = await httpClient.get<LogicAppsV2.RunRepetition>({
uri,
headers: headers as Record<string, any>,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Retrieves additional agent actions repetition data based on the provided continuation token.
*
* This method constructs an HTTP GET request using the continuation token as the URI and leverages the authorized HTTP client.
* It returns a promise that resolves with the run repetition data in the form of a [[LogicAppsV2.RunRepetition]] object.
* @param continuationToken - A string token used to fetch the next set of agent actions repetition data.
* @returns A promise that resolves with the run repetition data.
* @throws Will throw an error with the corresponding message if the HTTP request fails.
*/
async getMoreAgentActionsRepetition(continuationToken: string): Promise<any> {
const { httpClient } = this.options;
const headers = this.getAccessTokenHeaders();
try {
const response = await httpClient.get<LogicAppsV2.RunRepetition>({
uri: continuationToken,
headers: headers as Record<string, any>,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Gets the repetition record for the repetition item with the specified ID
* @param {{ actionId: string, runId: string }} action - An object with nodeId and the runId of the workflow
* @param {string} repetitionId - A string with the resource ID of a repetition record
* @return {Promise<any>}
*/
async getRepetition(action: { nodeId: string; runId: string | undefined }, repetitionId: string): Promise<LogicAppsV2.RunRepetition> {
const { apiVersion, baseUrl, httpClient } = this.options;
const { nodeId, runId } = action;
const headers = this.getAccessTokenHeaders();
const uri = `${baseUrl}${runId}/actions/${nodeId}/repetitions/${repetitionId}?api-version=${apiVersion}`;
try {
const response = await httpClient.get<LogicAppsV2.RunRepetition>({
uri,
headers: headers as Record<string, any>,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Triggers a workflow run
* @param {CallbackInfo} callbackInfo - Information to call Api to trigger workflow.
*/
async runTrigger(callbackInfo: CallbackInfo): Promise<void> {
const { httpClient } = this.options;
const method = isCallbackInfoWithRelativePath(callbackInfo) ? callbackInfo.method : HTTP_METHODS.POST;
const uri = getCallbackUrl(callbackInfo);
if (!uri) {
throw new Error();
}
try {
await this.getHttpRequestByMethod(httpClient, method, { uri });
} catch (e: any) {
throw new Error(`${e.status} ${e?.data?.error?.message}`);
}
}
/**
* Gets the inputs and outputs for an action repetition from a workflow run
* @param {{inputsLink: ContentLink, outputsLink: ContentLink}} actionMetadata - Workflow file path.
* @param {string} nodeId - Action ID.
* @returns {Promise<any>} Action inputs and outputs.
*/
async getActionLinks(actionMetadata: { inputsLink?: ContentLink; outputsLink?: ContentLink }, nodeId: string): Promise<any> {
const { inputsLink, outputsLink } = actionMetadata ?? {};
const { updateCors } = this.options;
let inputs: Record<string, any> = {};
let outputs: Record<string, any> = {};
if (this._isDev) {
inputs = getRecordEntry(inputsResponse, nodeId) ?? {};
outputs = getRecordEntry(outputsResponse, nodeId) ?? {};
return Promise.resolve({ inputs, outputs });
}
try {
if (outputsLink && outputsLink.uri) {
outputs = await this.getContent(outputsLink);
}
if (inputsLink && inputsLink.uri) {
inputs = await this.getContent(inputsLink);
}
} catch (e: any) {
if (e.message.includes('Failed to fetch') && !isNullOrUndefined(updateCors)) {
updateCors();
} else {
throw new Error(e.message);
}
}
return { inputs, outputs };
}
/**
* Gets http request acording to method.
* @param {IHttpClient} httpClient - HTTP Client.
* @param {string} method - HTTP method.
* @param {HttpRequestOptions<unknown>} options - Request options.
* @returns {Promise<any>}
*/
getHttpRequestByMethod(httpClient: IHttpClient, method: string, options: HttpRequestOptions<unknown>): Promise<any> {
switch (method.toLowerCase()) {
case 'get':
return httpClient.get(options);
case 'post':
return httpClient.post(options);
case 'put':
return httpClient.put(options);
default:
throw new UnsupportedException(`Unsupported call connector method - '${method}'`);
}
}
/**
* Retrieves the chat history for a specified action.
*
* This function constructs a URI based on the provided runId and nodeId, along with the
* baseUrl and apiVersion from the options. It then sends an HTTP GET request to obtain the
* chat history information associated with the specified action.
* @param action - An object containing the necessary identifiers.
* @param action.nodeId - The unique identifier of the node to retrieve the chat history for.
* @param action.runId - The unique identifier of the run; may be undefined.
* @returns A promise that resolves with the chat history response.
* @throws {Error} Throws an error with a message if the HTTP request fails.
*/
async getChatHistory(action: { nodeId: string; runId: string | undefined }): Promise<any> {
const { apiVersion, baseUrl, httpClient } = this.options;
const { nodeId, runId } = action;
const headers = this.getAccessTokenHeaders();
const uri = `${baseUrl}${runId}/actions/${nodeId}/chatHistory?api-version=${apiVersion}`;
try {
const response = await httpClient.get<any>({
uri,
headers: headers as Record<string, any>,
});
return response.value;
} catch (e: any) {
throw new Error(e.message);
}
}
/**
* Retrieves the chat history for a specified action.
*
* This function constructs a URI based on the provided runId and nodeId, along with the
* baseUrl and apiVersion from the options. It then sends an HTTP GET request to obtain the
* chat history information associated with the specified action.
* @param action - An object containing the necessary identifiers.
* @param action.id - Id suffix for agent and channel.
* @returns A promise that resolves with the agent chat url.
* @throws {Error} Throws an error with a message if the HTTP request fails.
*/
async getAgentChatInvokeUri(action: { idSuffix: string }): Promise<any> {
const { apiVersion, baseUrl, httpClient } = this.options;
const { idSuffix } = action;
const headers = this.getAccessTokenHeaders();
const uri = `${baseUrl}${idSuffix}/listCallBackUrl?api-version=${apiVersion}`;
try {
const response = await httpClient.post({
uri,
headers: headers as Record<string, any>,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
async invokeAgentChat(action: { id: string; data: any }): Promise<any> {
const { httpClient } = this.options;
const { id: uri, data } = action;
try {
const response = await httpClient.post<any, any>({
uri,
content: data,
});
return response;
} catch (e: any) {
throw new Error(e.message);
}
}
async cancelRun(runId: string): Promise<any> {
const { apiVersion, baseUrl, httpClient } = this.options;
const uri = `${baseUrl}${runId}/cancel?api-version=${apiVersion}`;
try {
const response = await httpClient.post({
uri,
});
return response;
} catch (e: any) {
return new Error(e.message);
}
}
}