pyignite/datatypes/expiry_policy.py (69 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import ctypes import math from datetime import timedelta from io import SEEK_CUR from typing import Union import attr from pyignite.constants import PROTOCOL_BYTE_ORDER def _positive(_, attrib, value): if isinstance(value, timedelta): value = value.total_seconds() * 1000 if value < 0 and value not in [ExpiryPolicy.UNCHANGED, ExpiryPolicy.ETERNAL]: raise ValueError(f"'{attrib.name}' value must not be negative") def _write_duration(stream, value): if isinstance(value, timedelta): value = math.floor(value.total_seconds() * 1000) stream.write(value.to_bytes(8, byteorder=PROTOCOL_BYTE_ORDER, signed=True)) @attr.s class ExpiryPolicy: """ Set expiry policy for the cache. """ #: Set TTL unchanged. UNCHANGED = -2 #: Set TTL eternal. ETERNAL = -1 #: Set TTL for create in milliseconds or :py:class:`~time.timedelta` create = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta], validator=[attr.validators.instance_of((int, timedelta)), _positive]) #: Set TTL for update in milliseconds or :py:class:`~time.timedelta` update = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta], validator=[attr.validators.instance_of((int, timedelta)), _positive]) #: Set TTL for access in milliseconds or :py:class:`~time.timedelta` access = attr.ib(kw_only=True, default=UNCHANGED, type=Union[int, timedelta], validator=[attr.validators.instance_of((int, timedelta)), _positive]) class _CType(ctypes.LittleEndianStructure): _pack_ = 1 _fields_ = [ ('not_null', ctypes.c_byte), ('create', ctypes.c_longlong), ('update', ctypes.c_longlong), ('access', ctypes.c_longlong) ] @classmethod def parse(cls, stream): init = stream.tell() not_null = int.from_bytes(stream.slice(init, 1), byteorder=PROTOCOL_BYTE_ORDER) if not_null: stream.seek(ctypes.sizeof(ExpiryPolicy._CType), SEEK_CUR) return ExpiryPolicy._CType stream.seek(ctypes.sizeof(ctypes.c_byte), SEEK_CUR) return ctypes.c_byte @classmethod async def parse_async(cls, stream): return cls.parse(stream) @classmethod def to_python(cls, ctypes_object, **kwargs): if ctypes_object == 0: return None return ExpiryPolicy(create=ctypes_object.create, update=ctypes_object.update, access=ctypes_object.access) @classmethod async def to_python_async(cls, ctypes_object, **kwargs): return cls.to_python(ctypes_object) @classmethod def from_python(cls, stream, value): if not value: stream.write(b'\x00') return stream.write(b'\x01') cls.write_policy(stream, value) @classmethod async def from_python_async(cls, stream, value): return cls.from_python(stream, value) @classmethod def write_policy(cls, stream, value): _write_duration(stream, value.create) _write_duration(stream, value.update) _write_duration(stream, value.access)