in bigquery_etl/backfill/shredder_mitigation.py [0:0]
def get_bigquery_type(value) -> DataTypeGroup:
"""Find the datatype of a value, grouping similar types."""
if isinstance(value, dt):
return DataTypeGroup.DATETIME
try:
if isinstance(dt.strptime(value, "%H:%M:%S").time(), time):
return DataTypeGroup.TIME
except (ValueError, TypeError, AttributeError):
pass
try:
value_parsed = parser.isoparse(value.replace(" UTC", "Z").replace(" ", "T"))
if (
isinstance(value_parsed, dt)
and value_parsed.time() == time(0, 0)
and isinstance(dt.strptime(value, "%Y-%m-%d"), date)
):
return DataTypeGroup.DATE
if isinstance(value_parsed, dt) and value_parsed.tzinfo is None:
return DataTypeGroup.DATETIME
if isinstance(value_parsed, dt) and value_parsed.tzinfo is not None:
return DataTypeGroup.TIMESTAMP
except (ValueError, TypeError, AttributeError):
pass
if isinstance(value, time):
return DataTypeGroup.TIME
if isinstance(value, date):
return DataTypeGroup.DATE
if isinstance(value, bool):
return DataTypeGroup.BOOLEAN
if isinstance(value, int):
return DataTypeGroup.INTEGER
if isinstance(value, float):
return DataTypeGroup.FLOAT
if isinstance(value, (str, bytes)):
return DataTypeGroup.STRING
if isinstance(value, NoneType):
return DataTypeGroup.UNDETERMINED
raise ValueError(f"Unsupported data type: {type(value)}")