in bq-connector/docai_bq_connector/connector/BqDocumentMapper.py [0:0]
def _map_document_metadata_to_bigquery_schema(self, bq_schema: List[SchemaField]):
result: dict = {}
mapped_metadata = self.metadata_mapper.map_metadata()
for cur_metadata_mapping in mapped_metadata:
col_name = cur_metadata_mapping["bq_column_name"]
col_value = cur_metadata_mapping["bq_column_value"]
if col_value is None:
continue
bq_field = find(
lambda schema_field: schema_field.name == col_name, bq_schema
)
if bq_field is None:
logging.warning(
"Parsed field '%s' not found in BigQuery schema. Field will be excluded from the "
"BigQuery payload",
col_name,
)
continue
_value = self._cast_type(
DocumentField(
name=col_name,
value=col_value,
normalized_value=col_value,
confidence=-1,
page_number=-1,
),
bq_field.field_type,
)
if not isinstance(_value, ConversionError):
result[col_name] = _value
return result