infra-as-code/modules/ingest-pipeline/workflow/orchestration.yaml (183 lines of code) (raw):
main:
params: [event]
steps:
- init:
assign:
- bucket: ${event.data.bucket}
- file_name: ${event.data.name}
- callTranscriptFunction:
try:
call: http.post
args:
url: ${sys.get_env("cf_stt_url")}
timeout: 1800
body:
bucket: ${bucket}
name: ${file_name}
headers:
Authorization: 'Bearer ${auth_response.body.access_token}'
Content-Type: application/json
auth:
type: OIDC
result: transcript_payload
retry:
predicate: ${http.default_retry_predicate}
max_attempts: 5
backoff:
type: exponential
delay: 30
max_delay: 500
- printTranscriptResultHttp:
call: sys.log
args:
data: ${transcript_payload}
- callTranscriptFix:
try:
call: http.post
args:
url: ${sys.get_env("cf_genai_url")}
timeout: 1800
body:
${transcript_payload.body}
headers:
Authorization: 'Bearer ${auth_response.body.access_token}'
Content-Type: application/json
auth:
type: OIDC
result: fixed_transcript_payload
retry:
predicate: ${http.default_retry_predicate}
max_attempts: 5
backoff:
type: exponential
delay: 30
max_delay: 500
- printFixedTranscriptResultHttp:
call: sys.log
args:
data: ${fixed_transcript_payload}
- callAudioRedaction:
try:
call: http.post
args:
url: ${sys.get_env("cf_audio_redaction_url")}
timeout: 1800
body:
${fixed_transcript_payload.body}
headers:
Authorization: 'Bearer ${auth_response.body.access_token}'
Content-Type: application/json
auth:
type: OIDC
result: redacted_transcript_payload
retry:
predicate: ${http.default_retry_predicate}
max_attempts: 5
backoff:
type: exponential
delay: 30
max_delay: 500
- callConversationUpload:
try:
call: http.post
args:
url: ${sys.get_env("cf_ccai_conversation_upload_url")}
timeout: 1800
body:
${redacted_transcript_payload.body}
headers:
Authorization: 'Bearer ${auth_response.body.access_token}'
Content-Type: application/json
auth:
type: OIDC
result: operationId
retry:
predicate: ${http.default_retry_predicate}
max_attempts: 5
backoff:
type: exponential
delay: 30
max_delay: 500
- printResultHttp:
call: sys.log
args:
data: ${operationId}
- waitForConversationToBeUploaded:
call: WaitUntilOperationStatusDone
args:
operation_name: ${operationId.body}
status: None
result: operationInfo
- waitForConversationToBeAnalyzed:
call: WaitUntilOperationStatusDone
args:
operation_name: ${operationId.body+"-analysis"}
status: None
result: operationAnalysisInfo
- callFeedbackGenerator:
try:
call: http.post
args:
url: ${sys.get_env("cf_feedback_generator_url")}
timeout: 1800
body:
conversation_id: ${operationInfo.body.metadata.request.conversationId}
headers:
Authorization: 'Bearer ${auth_response.body.access_token}'
Content-Type: application/json
auth:
type: OIDC
result: generated_feedback
retry:
predicate: ${http.default_retry_predicate}
max_attempts: 5
backoff:
type: exponential
delay: 30
max_delay: 500
- printGeneratedFeedback:
call: sys.log
args:
data: ${generated_feedback}
WaitUntilOperationStatusDone:
params: [operation_name, status]
steps:
- init:
assign:
- currentStatus:
body:
key: "value"
- insightsEndpoint: ${sys.get_env("insights_endpoint")}
- check_condition:
switch:
- condition: ${("error" in currentStatus.body)}
next: exit_fail
- condition: ${not("done" in currentStatus.body)}
next: iterate
next: exit_success
- iterate:
steps:
- sleep30s:
call: sys.sleep
args:
seconds: 30
- getJob:
call: http.get
args:
url: ${"https://" + insightsEndpoint + "/v1/" + operation_name}
auth:
type: OAuth2
result: getOperationStatus
- getStatus:
assign:
- currentStatus: ${getOperationStatus}
- log:
call: sys.log
args:
text: ${currentStatus}
severity: "INFO"
next: check_condition
- exit_success:
return: ${currentStatus}
- exit_fail:
raise: ${"Operation in unexpected terminal status. Error message:"+currentStatus.body.error.message}