def _map_document_metadata_to_bigquery_schema()

in bq-connector/docai_bq_connector/connector/BqDocumentMapper.py [0:0]


    def _map_document_metadata_to_bigquery_schema(self, bq_schema: List[SchemaField]):
        result: dict = {}
        mapped_metadata = self.metadata_mapper.map_metadata()
        for cur_metadata_mapping in mapped_metadata:
            col_name = cur_metadata_mapping["bq_column_name"]
            col_value = cur_metadata_mapping["bq_column_value"]
            if col_value is None:
                continue
            bq_field = find(
                lambda schema_field: schema_field.name == col_name, bq_schema
            )
            if bq_field is None:
                logging.warning(
                    "Parsed field '%s' not found in BigQuery schema. Field will be excluded from the "
                    "BigQuery payload",
                    col_name,
                )
                continue
            _value = self._cast_type(
                DocumentField(
                    name=col_name,
                    value=col_value,
                    normalized_value=col_value,
                    confidence=-1,
                    page_number=-1,
                ),
                bq_field.field_type,
            )
            if not isinstance(_value, ConversionError):
                result[col_name] = _value

        return result