def process_classify_results()

in classify-split-extract-workflow/classify-job/split_and_classify.py [0:0]


def process_classify_results(metadata: BatchProcessMetadata) -> Optional[Dict]:
    """Processes the results of a classification operation."""
    logger.info(f"handling classification results - operation.metadata={metadata}")

    documents = {}

    if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
        raise ValueError(f"Batch Process Failed: {metadata.state_message}")

    for process in metadata.individual_process_statuses:
        matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
        if matches:
            output_bucket, output_prefix = matches.groups()
        else:
            logger.error(
                f"Invalid GCS destination format: {process.output_gcs_destination}"
            )
            continue
        input_gcs_source = process.input_gcs_source

        logger.info(
            f"output_bucket = {output_bucket}, "
            f"output_prefix={output_prefix}, "
            f"input_gcs_source = {input_gcs_source}, "
            f"output_gcs_destination = {process.output_gcs_destination}"
        )

        # Adding support for shards using toolbox after the issue is addressed
        # https://github.com/googleapis/python-documentai-toolbox/issues/332
        output_blob = list(
            storage_client.list_blobs(output_bucket, prefix=output_prefix + "/")
        )[0]

        if ".json" not in output_blob.name:
            logger.info(
                f"Skipping non-supported file: {output_blob.name} - Mimetype: "
                f"{output_blob.content_type}"
            )
            continue

        document_out = documentai.Document.from_json(
            output_blob.download_as_bytes(), ignore_unknown_fields=True
        )
        blob_entities = document_out.entities

        if not blob_entities:
            logger.info(f"No entities found for {input_gcs_source}")
            continue

        if is_splitting_required(blob_entities):
            documents = split_pdf(input_gcs_source, blob_entities)
        else:
            max_confidence_entity = max(blob_entities, key=lambda item: item.confidence)
            metadata = get_metadata(max_confidence_entity)
            gcs_helper.add_metadata(input_gcs_source, metadata)

            add_predicted_document_type(
                metadata, input_gcs_source=input_gcs_source, documents=documents
            )

    return documents