odps/tunnel/hasher_c.pyx (312 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import calendar
import sys
from cpython.datetime cimport import_datetime
from libc.stdint cimport *
from .. import compat, types
from ..src.types_c cimport BaseRecord
from ..src.utils_c cimport CMillisecondsConverter
cdef:
int64_t BOOL_TYPE_ID = types.boolean._type_id
int64_t DATETIME_TYPE_ID = types.datetime._type_id
int64_t DATE_TYPE_ID = types.date._type_id
int64_t STRING_TYPE_ID = types.string._type_id
int64_t FLOAT_TYPE_ID = types.float_._type_id
int64_t DOUBLE_TYPE_ID = types.double._type_id
int64_t BIGINT_TYPE_ID = types.bigint._type_id
int64_t BINARY_TYPE_ID = types.binary._type_id
int64_t TIMESTAMP_TYPE_ID = types.timestamp._type_id
int64_t TIMESTAMP_NTZ_TYPE_ID = types.timestamp_ntz._type_id
int64_t INTERVAL_DAY_TIME_TYPE_ID = types.interval_day_time._type_id
int64_t INTERVAL_YEAR_MONTH_TYPE_ID = types.interval_year_month._type_id
int64_t DECIMAL_TYPE_ID = types.Decimal._type_id
int64_t JSON_TYPE_ID = types.Json._type_id
bint _is_py2 = sys.version_info[0] == 2
import_datetime()
cdef class AbstractHasher:
cdef int32_t c_hash_bigint(self, int64_t val) noexcept nogil:
return 0
def hash_bigint(self, int64_t val):
return self.c_hash_bigint(val)
cdef int32_t c_hash_float(self, float val) nogil:
cdef int32_t *ptr = <int32_t *>&val
return self.c_hash_bigint(ptr[0])
def hash_float(self, float val):
return self.c_hash_float(val)
cdef int32_t c_hash_double(self, double val) nogil:
cdef int64_t *ptr = <int64_t *>&val
return self.c_hash_bigint(ptr[0])
def hash_double(self, double val):
return self.c_hash_double(val)
cdef int32_t c_hash_bool(self, bint val) nogil:
return 0
def hash_bool(self, bint val):
return self.c_hash_bool(val)
cdef int32_t c_hash_string(self, char *ptr, size_t size) nogil:
return 0
def hash_string(self, val):
cdef bytes bval
if isinstance(val, unicode):
bval = (<unicode>val).encode()
else:
bval = val
return self.c_hash_string(bval, len(bval))
cdef class DefaultHasher(AbstractHasher):
cdef int32_t c_hash_bigint(self, int64_t val) noexcept nogil:
val = (~val) + (val << 18)
val ^= val >> 31
val *= <long>21
val ^= val >> 11
val += val << 6
val ^= val >> 22
return <int32_t>val
cdef int32_t c_hash_bool(self, bint val) nogil:
# it is a magic number
if val:
return 0x172ba9c7
else:
return -0x3a59cb12
cdef int32_t c_hash_string(self, char *ptr, size_t size) nogil:
cdef int32_t hash_val = 0
for i in range(size):
hash_val += ptr[i]
hash_val += hash_val << 10
hash_val ^= hash_val >> 6
hash_val += hash_val << 3
hash_val ^= hash_val >> 11
hash_val += hash_val << 15
return hash_val
cdef class LegacyHasher(AbstractHasher):
cdef int32_t c_hash_bigint(self, int64_t val) noexcept nogil:
return (val >> 32) ^ val
cdef int32_t c_hash_bool(self, bint val) nogil:
# it is a magic number
if val:
return 0x172ba9c7
else:
return -0x3a59cb12
cdef int32_t c_hash_string(self, char *ptr, size_t size) nogil:
cdef int32_t hash_val = 0
for i in range(size):
hash_val = hash_val * 31 + ptr[i]
return hash_val
cdef class FieldHasher:
def __init__(self, AbstractHasher hasher):
self._hasher = hasher
cdef int32_t hash_object(self, object value) except? -1:
raise NotImplementedError
cdef class BigintFieldHasher(FieldHasher):
cdef int32_t hash_object(self, object value) except? -1:
return self._hasher.c_hash_bigint(value)
cdef class FloatFieldHasher(FieldHasher):
cdef int32_t hash_object(self, object value) except? -1:
return self._hasher.c_hash_float(value)
cdef class DoubleFieldHasher(FieldHasher):
cdef int32_t hash_object(self, object value) except? -1:
return self._hasher.c_hash_double(value)
cdef class BoolFieldHasher(FieldHasher):
cdef int32_t hash_object(self, object value) except? -1:
return self._hasher.c_hash_bool(value)
cdef class StringFieldHasher(FieldHasher):
cdef int32_t hash_object(self, object value) except? -1:
cdef bytes bval
if isinstance(value, unicode):
bval = (<unicode>value).encode()
else:
bval = value
return self._hasher.c_hash_string(bval, len(bval))
cdef class DateFieldHasher(FieldHasher):
cdef int32_t hash_object(self, object value) except? -1:
return self._hasher.c_hash_bigint(
int(calendar.timegm(value.timetuple()))
)
cdef class FieldHasherWithTZ(FieldHasher):
cdef CMillisecondsConverter _mills_converter
def __init__(
self, AbstractHasher hasher, CMillisecondsConverter mills_converter
):
super(FieldHasherWithTZ, self).__init__(hasher)
self._mills_converter = mills_converter
cdef class DatetimeFieldHasher(FieldHasherWithTZ):
cdef int32_t hash_object(self, object value) except? -1:
return self._hasher.c_hash_bigint(
self._mills_converter.to_milliseconds(value)
)
cdef class TimestampFieldHasher(FieldHasherWithTZ):
cdef int32_t hash_object(self, object value) except? -1:
cdef int64_t seconds = int(
self._mills_converter.to_milliseconds(value.to_pydatetime()) / 1000
)
cdef int64_t nanos = value.microsecond * 1000 + value.nanosecond
return self._hasher.hash_bigint((seconds << 30) | nanos)
cdef class TimedeltaFieldHasher(FieldHasherWithTZ):
cdef int32_t hash_object(self, object value) except? -1:
cdef int64_t seconds = int(value.total_seconds())
cdef int64_t nanos = value.microseconds * 1000 + value.nanoseconds
return self._hasher.hash_bigint((seconds << 30) | nanos)
cdef class DecimalFieldHasher(FieldHasher):
cdef int32_t _precision
cdef int32_t _scale
def __init__(
self, AbstractHasher hasher, int32_t precision, int32_t scale
):
super(DecimalFieldHasher, self).__init__(hasher)
self._precision = precision
self._scale = scale
cdef int32_t hash_object(self, object value) except? -1:
cdef:
bytes x
bytes num_without_exp
int32_t exponent
int32_t x_len
char *ptr
bint is_negative
bint found_dot, found_exponent
int32_t value_scale
int32_t i
object tmp_result
x = str(value).encode()
x_len = len(x)
ptr = x
is_negative = False
if x_len > 0:
if ptr[0] == ord("-"):
is_negative = True
ptr += 1
x_len -= 1
elif ptr[0] == ord("+"):
is_negative = False
ptr += 1
x_len -= 1
while x_len > 0 and ptr[0] == ord("0"):
ptr += 1
x_len -= 1
value_scale = 0
found_dot = found_exponent = False
for i in range(x_len):
c = ptr[i]
if ord("0") <= c <= ord("9"):
if found_dot:
value_scale += 1
elif c == ord(".") and not found_dot:
found_dot = True
elif c in (ord("e"), ord("E")) and i + 1 < x_len:
found_exponent = True
exponent = int(ptr[i + 1 :])
value_scale -= exponent
x_len = i
break
else:
raise ValueError("Invalid decimal format: " + x)
num_without_exp = ptr[0 : x_len]
if found_dot:
num_without_exp = num_without_exp.replace(b".", b"")
if not num_without_exp:
tmp_result = 0
else:
if not _is_py2:
tmp_result = int(num_without_exp)
else:
tmp_result = compat.long_type(num_without_exp)
if value_scale > self._scale:
tmp_result //= <object>int(10) ** (value_scale - self._scale)
if num_without_exp[
len(num_without_exp) - (value_scale - self._scale)
] >= ord("5"):
tmp_result += 1
elif value_scale < self._scale:
tmp_result *= <object>int(10) ** (self._scale - value_scale)
if is_negative:
tmp_result *= -1
if self._precision > 18:
return self._hasher.c_hash_bigint(
<int64_t>(tmp_result & 0xFFFFFFFFFFFFFFFF)
) + self._hasher.c_hash_bigint(<int64_t>(tmp_result >> 64))
return self._hasher.c_hash_bigint(<int64_t>tmp_result)
cpdef AbstractHasher get_hasher(hasher_type):
if hasher_type == "legacy":
return LegacyHasher()
elif hasher_type == "default":
return DefaultHasher()
else:
raise ValueError("Hasher type %s not supported" % hasher_type)
def _hash_timestamp(hasher, x):
seconds = int(x.timestamp())
nanos = x.microsecond * 1000 + x.nanosecond
return hasher.hash_bigint((seconds << 30) | nanos)
def _hash_timedelta(hasher, x):
seconds = int(x.total_seconds())
nanos = x.microsecond * 1000 + x.nanosecond
return hasher.hash_bigint((seconds << 30) | nanos)
cdef class RecordHasher:
def __init__(self, schema, hasher_type, hash_keys):
cdef int type_id, i
cdef int precision, scale
cdef int key_count = len(hash_keys)
cdef set hash_keys_set = set(hash_keys)
self._schema_snapshot = schema.build_snapshot()
self._hasher = get_hasher(hasher_type)
self._mills_converter = CMillisecondsConverter()
self._col_ids.reserve(key_count)
self._idx_to_hash_fun = [None] * key_count
for i in range(self._schema_snapshot._col_count):
if self._schema_snapshot._columns[i].name not in hash_keys_set:
continue
self._col_ids.push_back(i)
type_id = self._schema_snapshot._col_type_ids[i]
if type_id == BIGINT_TYPE_ID:
self._idx_to_hash_fun[i] = BigintFieldHasher(self._hasher)
elif type_id == FLOAT_TYPE_ID:
self._idx_to_hash_fun[i] = FloatFieldHasher(self._hasher)
elif type_id == DOUBLE_TYPE_ID:
self._idx_to_hash_fun[i] = DoubleFieldHasher(self._hasher)
elif type_id == STRING_TYPE_ID or type_id == BINARY_TYPE_ID:
self._idx_to_hash_fun[i] = StringFieldHasher(self._hasher)
elif type_id == BOOL_TYPE_ID:
self._idx_to_hash_fun[i] = BoolFieldHasher(self._hasher)
elif type_id == DATE_TYPE_ID:
self._idx_to_hash_fun[i] = DateFieldHasher(self._hasher)
elif type_id == DATETIME_TYPE_ID:
self._idx_to_hash_fun[i] = DatetimeFieldHasher(
self._hasher, self._mills_converter
)
elif type_id == TIMESTAMP_TYPE_ID:
self._idx_to_hash_fun[i] = TimestampFieldHasher(
self._hasher, self._mills_converter
)
elif type_id == INTERVAL_DAY_TIME_TYPE_ID:
self._idx_to_hash_fun[i] = TimedeltaFieldHasher(
self._hasher, self._mills_converter
)
elif type_id == DECIMAL_TYPE_ID:
precision = (
self._schema_snapshot._col_types[i].precision
or types.Decimal._default_precision
)
scale = (
self._schema_snapshot._col_types[i].scale
or types.Decimal._default_scale
)
self._idx_to_hash_fun[i] = DecimalFieldHasher(
self._hasher, precision, scale
)
else:
raise TypeError(
"Hash for type %s not supported"
% self._schema_snapshot._col_types[i]
)
cpdef int32_t hash(self, BaseRecord record):
cdef int i
cdef int32_t hash_sum = 0
for i in range(self._col_ids.size()):
if record._c_values[<int>self._col_ids[i]] is None:
continue
hash_sum += (<FieldHasher>self._idx_to_hash_fun[i]).hash_object(
record._c_values[i]
)
return hash_sum ^ (hash_sum >> 8)
cpdef int32_t hash_value(hasher_type, data_type, value):
"""Simple hash function for test purpose"""
cdef RecordHasher rec_hasher
from ..models import Record
from ..types import Column, OdpsSchema
schema = OdpsSchema([Column("col", data_type)])
record = Record(schema=schema, values=[value])
rec_hasher = RecordHasher(schema, hasher_type, ["col"])
return (<FieldHasher>rec_hasher._idx_to_hash_fun[0]).hash_object(value)