def _fill_missing_expressions()

in odps/models/tableio.py [0:0]


    def _fill_missing_expressions(cls, data, col_to_expr):
        def handle_recordbatch(batch):
            col_names = list(batch.schema.names)
            col_arrays = list(batch.columns)
            for col in missing_cols:
                col_names.append(col)
                col_arrays.append(col_to_expr[col].eval(batch))
            return pa.RecordBatch.from_arrays(col_arrays, col_names)

        if pa and isinstance(data, (pa.Table, pa.RecordBatch)):
            col_name_set = set(c.lower() for c in data.schema.names)
            missing_cols = [c for c in col_to_expr if c not in col_name_set]
            if not missing_cols:
                return data
            if isinstance(data, pa.Table):
                batches = [handle_recordbatch(b) for b in data.to_batches()]
                return pa.Table.from_batches(batches)
            else:
                return handle_recordbatch(data)
        elif pd and isinstance(data, pd.DataFrame):
            col_name_set = set(c.lower() for c in data.columns)
            missing_cols = [c for c in col_to_expr if c not in col_name_set]
            if not missing_cols:
                return data
            data = data.copy()
            for col in missing_cols:
                data[col] = col_to_expr[col].eval(data)
            return data
        else:
            wrapped = False
            if odps_types.is_record(data):
                data = [data]
                wrapped = True
            for rec in data:
                if not odps_types.is_record(rec):
                    continue
                for c in col_to_expr:
                    if rec[c] is not None:
                        continue
                    rec[c] = col_to_expr[c].eval(rec)
            return data[0] if wrapped else data