odps/expressions/functions.py (69 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
try:
import pyarrow as pa
except ImportError:
pa = None
try:
import pandas as pd
except ImportError:
pd = None
from ..compat import datetime_utcnow, six
from ..utils import camel_to_underline
_name_to_funcs = {}
class ExprFunction(object):
arg_count = None
@classmethod
def _load_name_to_funcs(cls):
if not _name_to_funcs:
for val in globals().values():
if (
not isinstance(val, type)
or not issubclass(val, ExprFunction)
or val is ExprFunction
):
continue
cls_name = getattr(val, "_func_name", camel_to_underline(val.__name__))
_name_to_funcs[cls_name.lower()] = val
return _name_to_funcs
@classmethod
def get_cls(cls, func_name):
try:
return ExprFunction._load_name_to_funcs()[func_name.lower()]
except KeyError:
six.raise_from(ValueError("%s function not found" % func_name), None)
@classmethod
def call(cls, *args):
raise NotImplementedError
@classmethod
def to_str(cls, arg_strs):
return "%s(%s)" % (cls._func_name, ", ".join(arg_strs))
_date_patterns = {
"year": "%Y",
"month": "%Y-%m",
"day": "%Y-%m-%d",
"hour": "%Y-%m-%d %H:00:00",
}
class TruncateTime(ExprFunction):
_func_name = "trunc_time"
arg_count = 2
@classmethod
def _call_single(cls, val, date_part):
if not isinstance(val, datetime.datetime):
val = datetime.datetime.strptime(val, "%Y-%m-%d %H:%M:%S")
return val.strftime(_date_patterns[date_part])
@classmethod
def call(cls, arg, date_part):
assert isinstance(date_part, six.string_types)
date_part = date_part.lower()
if pa and isinstance(arg, (pa.Array, pa.ChunkedArray)):
res = [cls._call_single(x, date_part) for x in arg.to_pandas()]
return pa.array(res)
elif pd and isinstance(arg, pd.Series):
return arg.map(lambda x: cls._call_single(x, date_part))
return cls._call_single(arg, date_part)
class CurrentTimestampNTZ(ExprFunction):
_func_name = "current_timestamp_ntz"
arg_count = 0
@classmethod
def call(cls):
return datetime_utcnow()