in src/ab/plugins/data/engine.py [0:0]
def convert_date_type(rows, column_name):
# https://spark.apache.org/docs/latest/sql-reference.html
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType, DateType
none_count = 0
# try to verify date type without conversion
for row in rows:
val = row[column_name]
if val is None:
none_count += 1
continue
# pd.Timestamp is subclass of datetime, but....spark don't understand
# have to convert it
if isinstance(val, pd.Timestamp):
break
# datetime is subclass of date
if isinstance(val, datetime):
return rows, TimestampType()
if isinstance(val, date):
return rows, DateType()
if isinstance(val, int):
# perhaps for timestamp or interval?
return rows, IntegerType()
if isinstance(val, float):
# perhaps for timestamp or interval?
return rows, DoubleType()
if none_count == len(rows):
# all None, any type is ok
return rows, TimestampType()
try:
# convert to datetime
for row in rows:
val = row[column_name]
if val is None:
continue
if isinstance(val, pd.Timestamp):
row[column_name] = val.to_pydatetime()
else:
row[column_name] = dateutil.parser.parse(str(row[column_name]), fuzzy=True)
return rows, TimestampType()
except Exception as e:
# fallback to string
for row in rows:
val = row[column_name]
if val is None or isinstance(val, str):
continue
row[column_name] = str(row[column_name])
return rows, StringType()