in odps/df/expr/merge.py [0:0]
def join(left, right, on=None, how='inner', suffixes=('_x', '_y'), mapjoin=False, skewjoin=False):
"""
Join two collections.
If `on` is not specified, we will find the common fields of the left and right collection.
`suffixes` means that if column names conflict, the suffixes will be added automatically.
For example, both left and right has a field named `col`,
there will be col_x, and col_y in the joined collection.
:param left: left collection
:param right: right collection
:param on: fields to join on
:param how: 'inner', 'left', 'right', or 'outer'
:param suffixes: when name conflict, the suffix will be added to both columns.
:param mapjoin: set use mapjoin or not, default value False.
:param skewjoin: set use of skewjoin or not, default value False. Can specify True if
the collection is skew, or a list specifying columns with skew values, or a list of
dicts specifying skew combinations.
:return: collection
:Example:
>>> df.dtypes.names
['name', 'id']
>>> df2.dtypes.names
['name', 'id1']
>>> df.join(df2)
>>> df.join(df2, on='name')
>>> df.join(df2, on=('id', 'id1'))
>>> df.join(df2, on=['name', ('id', 'id1')])
>>> df.join(df2, on=[df.name == df2.name, df.id == df2.id1])
>>> df.join(df2, mapjoin=False)
>>> df.join(df2, skewjoin=True)
>>> df.join(df2, skewjoin=["c0", "c1"])
>>> df.join(df2, skewjoin=[{"c0": 1, "c1": "2"}, {"c0": 3, "c1": "4"}])
"""
if mapjoin and skewjoin:
raise TypeError("Cannot specify mapjoin and skewjoin at the same time")
if isinstance(left, TypedExpr):
left = to_collection(left)
if isinstance(right, TypedExpr):
right = to_collection(right)
if on is None:
if not mapjoin:
on = [name for name in left.schema.names if to_lower_str(name) in right.schema._name_indexes]
if not on and len(left.schema) == 1 and len(right.schema) == 1:
on = [(left.schema.names[0], right.schema.names[0])]
skewjoin_values = None
if isinstance(skewjoin, (dict, six.string_types)):
skewjoin = [skewjoin]
if isinstance(skewjoin, list):
if (
all(isinstance(c, six.string_types) for c in skewjoin)
and any(c not in right.schema.names for c in skewjoin)
):
raise ValueError(
"All columns specified in `skewjoin` need to exist in the right collection"
)
elif (
all(isinstance(c, dict) for c in skewjoin)
):
cols = sorted(skewjoin[0].keys())
cols_set = set(cols)
if any(c not in right.schema.names for c in cols):
raise ValueError(
"All columns specified in `skewjoin` need to exist in the right collection"
)
if any(cols_set != set(c.keys()) for c in skewjoin):
raise ValueError("All values in `skewjoin` need to have same columns")
skewjoin_values = [[d[c] for c in cols] for d in skewjoin]
skewjoin = cols
elif skewjoin and not isinstance(skewjoin, bool):
raise TypeError("Cannot accept skewjoin type %s" % type(skewjoin))
if isinstance(suffixes, (tuple, list)) and len(suffixes) == 2:
left_suffix, right_suffix = suffixes
else:
raise ValueError('suffixes must be a tuple or list with two elements, got %s' % suffixes)
if not isinstance(on, list):
on = [on, ]
for i in range(len(on)):
it = on[i]
if inspect.isfunction(it):
on[i] = it(left, right)
left, right = _make_different_sources(left, right, on)
try:
return _join_dict[how.upper()](
_lhs=left, _rhs=right, _predicate=on, _left_suffix=left_suffix, _right_suffix=right_suffix,
_mapjoin=mapjoin, _skewjoin=skewjoin, _skewjoin_values=skewjoin_values
)
except KeyError:
return JoinCollectionExpr(
_lhs=left, _rhs=right, _predicate=on, _how=how, _left_suffix=left_suffix,
_right_suffix=right_suffix, _mapjoin=mapjoin, _skewjoin=skewjoin,
_skewjoin_values=skewjoin_values
)