def run_pipeline()

in microservices/upload_service/src/utils/process_task_helpers.py [0:0]


def run_pipeline(payload: List[Dict], is_hitl: bool, is_reassign: bool):
  """Runs the entire pipeline
    Args:
    payload (ProcessTask): Consist of configs required to run the pipeline
    is_hitl : It is used to run the pipeline for unclassifed documents
    is_reassign : It is used to run the pipeline for reassigned document
  """
  Logger.info(f"Processing the documents: {payload}")
  print(f"Processing the documents: {payload}")

  try:
    extraction_score = None
    applications = []
    supporting_docs = []

    # For unclassified or reassigned documents set the doc_class
    if is_hitl or is_reassign:
      result = get_documents(payload)
      applications = result[0]
      supporting_docs = result[1]
    # for other cases like normal flow classify the documents
    elif not is_reassign:
      result = filter_documents(payload.get("configs"))
      applications = result[0]
      supporting_docs = result[1]

    # for normal flow and for hitl run the extraction of documents
    if is_hitl or applications or supporting_docs:
      # extract the application first
      if applications:
        for doc in applications:
          extraction_score = extract_documents(
              doc, document_type="application_form")
      # extract,validate and match supporting documents
      if supporting_docs:
        for doc in supporting_docs:
          # In case of reassign extraction is not required
          if not is_reassign:
            extraction_output = extract_documents(
                doc, document_type="supporting_documents")
            extraction_score = extraction_output[0]
            extraction_entities = extraction_output[1]
            Logger.info(f" Executing pipeline for normal scenario {doc}")
            if extraction_score is not None and extraction_entities:
              Logger.info(f"extraction score is {extraction_score},{doc}")
              validate_match_approve(doc, extraction_score, extraction_entities)
          else:
            Logger.info(f" Executing pipeline for reassign scenario "
                        f"{doc}")
            extraction_score = doc["extraction_score"]
            extraction_entities = doc["extraction_entities"]
            validate_match_approve(doc, extraction_score, extraction_entities)
  except Exception as e:
    err = traceback.format_exc().replace("\n", " ")
    Logger.error(err)
    raise HTTPException(status_code=500, detail=e) from e