eventdata/parameter_sources/timeutils.py (71 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import datetime import re epoch = datetime.datetime.utcfromtimestamp(0) class TimeParsingError(Exception): """Exception raised for parameter parsing errors. Attributes: message -- explanation of the error """ def __init__(self, message): self.message = message class TimestampStructGenerator: def __init__(self, starting_point, offset=None, acceleration_factor=1.0, utcnow=None): self._utcnow = utcnow if utcnow else datetime.datetime.utcnow # the (actual) time when this generator has started self._start = self._utcnow() # the logical point in time for which we'll generate timestamps self._starting_point = self.__parse_starting_point(starting_point) + self.__parse_offset(offset) self._acceleration_factor = acceleration_factor # reuse to reduce object churn self._ts = {} self._simulated_micros = 0.0 def next_timestamp(self): self._simulated_micros = 0.0 delta = (self._utcnow() - self._start) * self._acceleration_factor self.__to_struct(self._starting_point + delta) return self.simulate_tick(0) def simulate_tick(self, micros): """ Advances the current timestamp by a given number of microseconds but keep all other time components. This can be used to avoid retrieving the current timestamp to often but still simulate changes in time. :param micros: A positive number of microseconds to add. :return: The current (formatted) timestamp structure as a dict. """ self._simulated_micros += micros self._ts["iso"] = "%s.%03dZ" % (self._ts["iso_prefix"], self._simulated_micros) return self._ts def skip(self, delta): # advance the generated timestamp by delta self._starting_point = self._starting_point + delta # also reset the generator start as we want to ensure the same delta in #next_timestamp() self._start = self._utcnow() def __to_struct(self, dt): # string formatting is about 4 times faster than strftime. iso_prefix = "%04d-%02d-%02dT%02d:%02d:%02d" % (dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) self._ts["iso_prefix"] = iso_prefix self._ts["yyyy"] = iso_prefix[:4] self._ts["yy"] = iso_prefix[2:4] self._ts["mm"] = iso_prefix[5:7] self._ts["dd"] = iso_prefix[8:10] self._ts["hh"] = iso_prefix[11:13] def __parse_starting_point(self, point): if point == "now": # this is "now" at this point return self._start else: match = re.match(r"^(\d{4})\D(\d{2})\D(\d{2})\D(\d{2})\D(\d{2})\D(\d{2})$", point) if match: return datetime.datetime(year=int(match.group(1)), month=int(match.group(2)), day=int(match.group(3)), hour=int(match.group(4)), minute=int(match.group(5)), second=int(match.group(6)), tzinfo=datetime.timezone.utc) else: match = re.match(r"^(\d{4})\D(\d{2})\D(\d{2})$", point) if match: return datetime.datetime(year=int(match.group(1)), month=int(match.group(2)), day=int(match.group(3)), tzinfo=datetime.timezone.utc) raise TimeParsingError("Invalid time format: {}".format(point)) def __parse_offset(self, offset): if offset is None: return datetime.timedelta() match = re.match(r"^([+-]\d+)([hmd])$", offset) if match: offset_amount = int(match.group(1)) if match.group(2) == "m": return datetime.timedelta(minutes=offset_amount) elif match.group(2) == "h": return datetime.timedelta(hours=offset_amount) elif match.group(2) == "d": return datetime.timedelta(days=offset_amount) else: raise TimeParsingError("Invalid offset: {}".format(offset)) else: raise TimeParsingError("Invalid offset: {}".format(offset))