in classify-split-extract-workflow/classify-job/split_and_classify.py [0:0]
def process_classify_results(metadata: BatchProcessMetadata) -> Optional[Dict]:
"""Processes the results of a classification operation."""
logger.info(f"handling classification results - operation.metadata={metadata}")
documents = {}
if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
raise ValueError(f"Batch Process Failed: {metadata.state_message}")
for process in metadata.individual_process_statuses:
matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
if matches:
output_bucket, output_prefix = matches.groups()
else:
logger.error(
f"Invalid GCS destination format: {process.output_gcs_destination}"
)
continue
input_gcs_source = process.input_gcs_source
logger.info(
f"output_bucket = {output_bucket}, "
f"output_prefix={output_prefix}, "
f"input_gcs_source = {input_gcs_source}, "
f"output_gcs_destination = {process.output_gcs_destination}"
)
# Adding support for shards using toolbox after the issue is addressed
# https://github.com/googleapis/python-documentai-toolbox/issues/332
output_blob = list(
storage_client.list_blobs(output_bucket, prefix=output_prefix + "/")
)[0]
if ".json" not in output_blob.name:
logger.info(
f"Skipping non-supported file: {output_blob.name} - Mimetype: "
f"{output_blob.content_type}"
)
continue
document_out = documentai.Document.from_json(
output_blob.download_as_bytes(), ignore_unknown_fields=True
)
blob_entities = document_out.entities
if not blob_entities:
logger.info(f"No entities found for {input_gcs_source}")
continue
if is_splitting_required(blob_entities):
documents = split_pdf(input_gcs_source, blob_entities)
else:
max_confidence_entity = max(blob_entities, key=lambda item: item.confidence)
metadata = get_metadata(max_confidence_entity)
gcs_helper.add_metadata(input_gcs_source, metadata)
add_predicted_document_type(
metadata, input_gcs_source=input_gcs_source, documents=documents
)
return documents