in odps/df/backends/sqlalchemy/compiler.py [0:0]
def visit_binary_op(self, expr):
if isinstance(expr, Power):
op = func.pow
elif isinstance(expr, FloorDivide):
op = operator.div if six.PY2 else operator.truediv
elif isinstance(expr, (Add, Substract)) and expr.dtype == df_types.datetime:
if isinstance(expr, Add) and \
all(child.dtype == df_types.datetime for child in (expr.lhs, expr.rhs)):
raise CompileError('Cannot add two datetimes')
if isinstance(expr.rhs, DTScalar) or (isinstance(expr, Add) and expr.lhs, DTScalar):
if isinstance(expr.rhs, DTScalar):
dt, scalar = expr.lhs, expr.rhs
else:
dt, scalar = expr.rhs, expr.lhs
val = scalar.value
if isinstance(expr, Substract):
val = -val
dt_type = type(scalar).__name__[:-6]
sa_dt = self._expr_to_sqlalchemy[dt]
try:
key = DATE_KEY_DIC[dt_type]
except KeyError:
raise NotImplementedError
if self._sa_engine and self._sa_engine.name == 'mysql':
if dt_type == 'MilliSecond':
val, dt_type = val * 1000, 'MicroSecond'
sa_expr = func.date_add(sa_dt, text('interval %d %s' % (val, dt_type.lower())))
else:
sa_expr = sa_dt + timedelta(**{key: val})
self._add(expr, sa_expr)
return
else:
raise NotImplementedError
elif isinstance(expr, Substract) and expr._lhs.dtype == df_types.datetime and \
expr._rhs.dtype == df_types.datetime:
sa_expr = self._expr_to_sqlalchemy[expr._lhs] - self._expr_to_sqlalchemy[expr._rhs]
if self._sa_engine and self._sa_engine.name == 'mysql':
sa_expr = func.abs(func.microsecond(sa_expr)
.cast(types.df_type_to_sqlalchemy_type(expr.dtype))) / 1000
else:
sa_expr = func.abs(extract('MICROSECONDS', sa_expr)
.cast(types.df_type_to_sqlalchemy_type(expr.dtype))) / 1000
self._add(expr, sa_expr)
return
elif isinstance(expr, Mod):
lhs, rhs = self._expr_to_sqlalchemy[expr._lhs], self._expr_to_sqlalchemy[expr._rhs]
sa_expr = BINARY_OP[expr.node_name](lhs, rhs)
if not is_constant_scalar(expr._rhs):
sa_expr = case([(rhs > 0, func.abs(sa_expr))], else_=sa_expr)
elif expr._rhs.value > 0:
sa_expr = func.abs(sa_expr)
self._add(expr, sa_expr)
return
else:
op = BINARY_OP[expr.node_name]
lhs, rhs = self._expr_to_sqlalchemy[expr._lhs], self._expr_to_sqlalchemy[expr._rhs]
sa_expr = op(lhs, rhs)
self._add(expr, sa_expr)