in odps/df/expr/merge.py [0:0]
def _merge_joined_fields(self, merge_columns):
if not merge_columns:
return self
predicate_fields = self._get_predicate_fields()
if not predicate_fields:
raise ValueError('No fields in predicate. Cannot merge columns.')
src_map = self._column_origins
rename_map = dict()
for name, src in six.iteritems(src_map):
src_id, src_name = src
if src_name not in predicate_fields:
continue
if src_name not in rename_map:
rename_map[src_name] = [None, None]
rename_map[src_name][src_id] = name
if merge_columns in ('auto', 'left', 'right') or (isinstance(merge_columns, bool) and merge_columns):
merge_columns = dict((k, merge_columns) for k in six.iterkeys(rename_map))
if isinstance(merge_columns, six.string_types):
merge_columns = {merge_columns: 'auto'}
if isinstance(merge_columns, list):
merge_columns = dict((k, 'auto') for k in merge_columns)
excludes = set()
for col, action in six.iteritems(merge_columns):
if col not in rename_map:
raise ValueError('Column {0} not exists in join predicate.'.format(col))
if isinstance(action, bool) and action:
merge_columns[col] = 'auto'
else:
merge_columns[col] = action.lower()
excludes.update(rename_map[col])
selects = []
merged = set()
for col in self.schema.names:
if col not in excludes:
selects.append(self[col])
else:
src_name = src_map[col][1]
if src_name in merged:
continue
merged.add(src_name)
left_name, right_name = rename_map[src_name]
left_col = self[left_name]
right_col = self[right_name]
if merge_columns[src_name] == 'auto':
selects.append(left_col.isnull().ifelse(right_col, left_col).rename(src_name))
elif merge_columns[src_name] == 'left':
selects.append(left_col.rename(src_name))
elif merge_columns[src_name] == 'right':
selects.append(right_col.rename(src_name))
selected = self.select(*selects)
return JoinFieldMergedCollectionExpr(_input=self, _fields=selected._fields,
_schema=selected._schema, _rename_map=rename_map)