infra-as-code/modules/ingest-pipeline/workflow/orchestration.yaml (183 lines of code) (raw):

main: params: [event] steps: - init: assign: - bucket: ${event.data.bucket} - file_name: ${event.data.name} - callTranscriptFunction: try: call: http.post args: url: ${sys.get_env("cf_stt_url")} timeout: 1800 body: bucket: ${bucket} name: ${file_name} headers: Authorization: 'Bearer ${auth_response.body.access_token}' Content-Type: application/json auth: type: OIDC result: transcript_payload retry: predicate: ${http.default_retry_predicate} max_attempts: 5 backoff: type: exponential delay: 30 max_delay: 500 - printTranscriptResultHttp: call: sys.log args: data: ${transcript_payload} - callTranscriptFix: try: call: http.post args: url: ${sys.get_env("cf_genai_url")} timeout: 1800 body: ${transcript_payload.body} headers: Authorization: 'Bearer ${auth_response.body.access_token}' Content-Type: application/json auth: type: OIDC result: fixed_transcript_payload retry: predicate: ${http.default_retry_predicate} max_attempts: 5 backoff: type: exponential delay: 30 max_delay: 500 - printFixedTranscriptResultHttp: call: sys.log args: data: ${fixed_transcript_payload} - callAudioRedaction: try: call: http.post args: url: ${sys.get_env("cf_audio_redaction_url")} timeout: 1800 body: ${fixed_transcript_payload.body} headers: Authorization: 'Bearer ${auth_response.body.access_token}' Content-Type: application/json auth: type: OIDC result: redacted_transcript_payload retry: predicate: ${http.default_retry_predicate} max_attempts: 5 backoff: type: exponential delay: 30 max_delay: 500 - callConversationUpload: try: call: http.post args: url: ${sys.get_env("cf_ccai_conversation_upload_url")} timeout: 1800 body: ${redacted_transcript_payload.body} headers: Authorization: 'Bearer ${auth_response.body.access_token}' Content-Type: application/json auth: type: OIDC result: operationId retry: predicate: ${http.default_retry_predicate} max_attempts: 5 backoff: type: exponential delay: 30 max_delay: 500 - printResultHttp: call: sys.log args: data: ${operationId} - waitForConversationToBeUploaded: call: WaitUntilOperationStatusDone args: operation_name: ${operationId.body} status: None result: operationInfo - waitForConversationToBeAnalyzed: call: WaitUntilOperationStatusDone args: operation_name: ${operationId.body+"-analysis"} status: None result: operationAnalysisInfo - callFeedbackGenerator: try: call: http.post args: url: ${sys.get_env("cf_feedback_generator_url")} timeout: 1800 body: conversation_id: ${operationInfo.body.metadata.request.conversationId} headers: Authorization: 'Bearer ${auth_response.body.access_token}' Content-Type: application/json auth: type: OIDC result: generated_feedback retry: predicate: ${http.default_retry_predicate} max_attempts: 5 backoff: type: exponential delay: 30 max_delay: 500 - printGeneratedFeedback: call: sys.log args: data: ${generated_feedback} WaitUntilOperationStatusDone: params: [operation_name, status] steps: - init: assign: - currentStatus: body: key: "value" - insightsEndpoint: ${sys.get_env("insights_endpoint")} - check_condition: switch: - condition: ${("error" in currentStatus.body)} next: exit_fail - condition: ${not("done" in currentStatus.body)} next: iterate next: exit_success - iterate: steps: - sleep30s: call: sys.sleep args: seconds: 30 - getJob: call: http.get args: url: ${"https://" + insightsEndpoint + "/v1/" + operation_name} auth: type: OAuth2 result: getOperationStatus - getStatus: assign: - currentStatus: ${getOperationStatus} - log: call: sys.log args: text: ${currentStatus} severity: "INFO" next: check_condition - exit_success: return: ${currentStatus} - exit_fail: raise: ${"Operation in unexpected terminal status. Error message:"+currentStatus.body.error.message}