in odps/models/tableio.py [0:0]
def _fill_missing_expressions(cls, data, col_to_expr):
def handle_recordbatch(batch):
col_names = list(batch.schema.names)
col_arrays = list(batch.columns)
for col in missing_cols:
col_names.append(col)
col_arrays.append(col_to_expr[col].eval(batch))
return pa.RecordBatch.from_arrays(col_arrays, col_names)
if pa and isinstance(data, (pa.Table, pa.RecordBatch)):
col_name_set = set(c.lower() for c in data.schema.names)
missing_cols = [c for c in col_to_expr if c not in col_name_set]
if not missing_cols:
return data
if isinstance(data, pa.Table):
batches = [handle_recordbatch(b) for b in data.to_batches()]
return pa.Table.from_batches(batches)
else:
return handle_recordbatch(data)
elif pd and isinstance(data, pd.DataFrame):
col_name_set = set(c.lower() for c in data.columns)
missing_cols = [c for c in col_to_expr if c not in col_name_set]
if not missing_cols:
return data
data = data.copy()
for col in missing_cols:
data[col] = col_to_expr[col].eval(data)
return data
else:
wrapped = False
if odps_types.is_record(data):
data = [data]
wrapped = True
for rec in data:
if not odps_types.is_record(rec):
continue
for c in col_to_expr:
if rec[c] is not None:
continue
rec[c] = col_to_expr[c].eval(rec)
return data[0] if wrapped else data