odps/tunnel/hasher.py (191 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import ctypes
import functools
import struct
from .. import compat, types
from ..utils import to_binary, to_milliseconds
_int32_struct = struct.Struct("<l")
_int64_struct = struct.Struct("<q")
_float_struct = struct.Struct("<f")
_double_struct = struct.Struct("<d")
_ord_code = ord if compat.PY27 else (lambda x: x)
class AbstractHasher(object):
def hash_bigint(self, val):
raise NotImplementedError
def hash_float(self, val):
raise NotImplementedError
def hash_double(self, val):
raise NotImplementedError
def hash_bool(self, val):
raise NotImplementedError
def hash_string(self, val):
raise NotImplementedError
class DefaultHasher(AbstractHasher):
def hash_bigint(self, val):
val = (~val) + ctypes.c_int64(val << 18).value
val ^= val >> 31
val = ctypes.c_int64(val * 21).value
val ^= val >> 11
val += ctypes.c_int64(val << 6).value
val ^= val >> 22
return ctypes.c_int32(val).value
def hash_float(self, val):
return self.hash_bigint(_int32_struct.unpack(_float_struct.pack(val))[0])
def hash_double(self, val):
return self.hash_bigint(_int64_struct.unpack(_double_struct.pack(val))[0])
def hash_bool(self, val):
# it is a magic number
if val:
return 0x172BA9C7
else:
return -0x3A59CB12
def hash_string(self, val):
val = to_binary(val)
hash_val = 0
for ch in val:
hash_val += _ord_code(ch)
hash_val += ctypes.c_int32(hash_val << 10).value
hash_val ^= hash_val >> 6
hash_val += ctypes.c_int32(hash_val << 3).value
hash_val ^= hash_val >> 11
hash_val += ctypes.c_int32(hash_val << 15).value
return ctypes.c_int32(hash_val).value
class LegacyHasher(AbstractHasher):
def hash_bigint(self, val):
return ctypes.c_int32((val >> 32) ^ val).value
def hash_float(self, val):
return self.hash_bigint(_int32_struct.unpack(_float_struct.pack(val))[0])
def hash_double(self, val):
return self.hash_bigint(_int64_struct.unpack(_double_struct.pack(val))[0])
def hash_bool(self, val):
# it is a magic number
if val:
return 0x172BA9C7
else:
return -0x3A59CB12
def hash_string(self, val):
val = to_binary(val)
hash_val = 0
for ch in val:
hash_val = ctypes.c_int32(hash_val * 31 + _ord_code(ch)).value
return hash_val
def get_hasher(hasher_type):
if hasher_type == "legacy":
return LegacyHasher()
elif hasher_type == "default":
return DefaultHasher()
else:
raise ValueError("Hasher type %s not supported" % hasher_type)
def _hash_date(hasher, x):
return hasher.hash_bigint(int(calendar.timegm(x.timetuple())))
def _hash_datetime(hasher, x):
return hasher.hash_bigint(int(to_milliseconds(x)))
def _hash_timestamp(hasher, x):
seconds = int(to_milliseconds(x.to_pydatetime()) / 1000)
nanos = x.microsecond * 1000 + x.nanosecond
return hasher.hash_bigint((seconds << 30) | nanos)
def _hash_timedelta(hasher, x):
seconds = int(x.total_seconds())
nanos = x.microseconds * 1000 + x.nanoseconds
return hasher.hash_bigint((seconds << 30) | nanos)
def _hash_decimal(hasher, x, precision, scale):
x = str(x).strip()
x_len = len(x)
ptr = 0
is_negative = False
if x_len > 0:
if x[ptr] == "-":
is_negative = True
ptr += 1
x_len -= 1
elif x[ptr] == "+":
is_negative = False
ptr += 1
x_len -= 1
while x_len > 0 and x[ptr] == "0":
ptr += 1
x_len -= 1
value_scale = 0
found_dot = found_exponent = False
for i in range(x_len):
c = x[ptr + i]
if c.isdigit():
if found_dot:
value_scale += 1
elif c == "." and not found_dot:
found_dot = True
elif c in ("e", "E") and i + 1 < x_len:
found_exponent = True
exponent = int(x[ptr + i + 1 :])
value_scale -= exponent
x_len = ptr + i
break
else:
raise ValueError("Invalid decimal format: " + x)
num_without_exp = x[ptr:x_len] if found_exponent else x[ptr:]
if found_dot:
num_without_exp = num_without_exp.replace(".", "")
if not num_without_exp:
tmp_result = 0
else:
tmp_result = compat.long_type(num_without_exp)
if value_scale > scale:
tmp_result //= 10 ** (value_scale - scale)
if num_without_exp[len(num_without_exp) - (value_scale - scale)] >= "5":
tmp_result += 1
elif value_scale < scale:
tmp_result *= 10 ** (scale - value_scale)
if is_negative:
tmp_result *= -1
if precision > 18:
return hasher.hash_bigint(tmp_result & 0xFFFFFFFFFFFFFFFF) + hasher.hash_bigint(
tmp_result >> 64
)
return hasher.hash_bigint(tmp_result)
_type_to_hash_fun = {
types.tinyint: lambda hasher, x: hasher.hash_bigint(x),
types.smallint: lambda hasher, x: hasher.hash_bigint(x),
types.int_: lambda hasher, x: hasher.hash_bigint(x),
types.bigint: lambda hasher, x: hasher.hash_bigint(x),
types.boolean: lambda hasher, x: hasher.hash_bool(x),
types.float_: lambda hasher, x: hasher.hash_float(x),
types.double: lambda hasher, x: hasher.hash_double(x),
types.date: _hash_date,
types.datetime: _hash_datetime,
types.timestamp: _hash_timestamp,
types.interval_day_time: _hash_timedelta,
types.binary: lambda hasher, x: hasher.hash_string(x),
types.string: lambda hasher, x: hasher.hash_string(x),
}
def _get_hash_func(typ):
if typ in _type_to_hash_fun:
return _type_to_hash_fun[typ]
elif isinstance(typ, (types.Char, types.Varchar)):
return _type_to_hash_fun[types.string]
elif isinstance(typ, types.Decimal):
precision = typ.precision or types.Decimal._default_precision
scale = typ.scale or types.Decimal._default_scale
return functools.partial(_hash_decimal, precision=precision, scale=scale)
else:
raise TypeError("Hash for type %s not supported" % typ)
class RecordHasher(object):
def __init__(self, schema, hasher_type, hash_keys):
self._schema = schema
self._hasher = get_hasher(hasher_type)
self._hash_keys = hash_keys
self._column_hash_appenders = []
for col_name in hash_keys:
col = self._schema.get_column(col_name)
self._column_hash_appenders.append(_get_hash_func(col.type))
def hash(self, record):
hash_sum = 0
for idx, key in enumerate(self._hash_keys):
if record[key] is None:
continue
hash_sum += self._column_hash_appenders[idx](self._hasher, record[key])
hash_sum = ctypes.c_int32(hash_sum).value
return hash_sum ^ (hash_sum >> 8)
def hash_value(hasher_type, data_type, value):
"""Simple value hash function for test purpose"""
if value is None:
return 0
hasher = get_hasher(hasher_type)
data_type = types.validate_data_type(data_type)
return _get_hash_func(data_type)(hasher, value)