hasher-matcher-actioner/hmalib/common/timebucketizer.py (115 lines of code) (raw):
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import csv
import datetime
import os
import typing as t
import uuid
MAX_BUFFER_SIZE = 3200
SECONDS_PER_DAY = 86400
SECONDS_PER_MINUTE = 60
Self = t.TypeVar("Self", bound="CSViable")
class CSViable:
"""
Interface of methods that must be implemented to run TimeBucketizer. It is used to guarantee safe parsing data in and out of CSV files
"""
def to_csv(self) -> t.List[t.Union[str, int]]:
raise NotImplementedError
@classmethod
def from_csv(cls: t.Type[Self], value: t.List[str]) -> Self:
raise NotImplementedError
T = t.TypeVar("T", bound=CSViable)
class TimeBucketizer(t.Generic[T]):
def __init__(
self,
bucket_width: datetime.timedelta,
storage_path: str,
type: str,
id: str,
buffer_size: int = MAX_BUFFER_SIZE,
):
"""
Divides the day into 24h / bucket_width buckets. When add_record
is called, based on current time, appends record to current bucket.
Each bucket's data is stored as a serialized CSV file.
Note that there may be multiple instances of TimeBucket writing to the
same file system so add some uniqueness for this instance.
Say your storage path is /var/data/threatexchange/timebuckets
The file for current datetime (2022/02/08/20:49 UTC) for a bucket_width
of 5 minutes should be "/var/data/threatexchange/timebuckets/<type>/2022/02/08/20/45/<unique_id>.csv"
The first <type> allows you to use the same storage folder for
say hashes and matches.
The second <unique_id> allows you to have multiple instances running
(eg. multiple lambdas) and writing to the same file system.
"""
if bucket_width.total_seconds() < 60 or bucket_width.total_seconds() % 60 != 0:
raise Exception("Please ensure timedelta is atleast a minute long.")
if SECONDS_PER_DAY % bucket_width.total_seconds() != 0:
raise Exception(
"Time Delta must be equally divisible into buckets based on a 24 hour clock"
)
self.bucket_width = bucket_width
self.start, self.end = self._calculate_bucket_endpoints(
datetime.datetime.now(), bucket_width
)
self.storage_path = storage_path
self.id = id
self.type = type
self.buffer_size = buffer_size
self.buffer: t.List[T] = []
@staticmethod
def _generate_path(storage_path: str, type: str, date: datetime.datetime):
return os.path.join(
storage_path,
type,
str(date.year),
str(date.month),
str(date.day),
str(date.hour),
str(date.minute),
)
@staticmethod
def _calculate_bucket_endpoints(
time: datetime.datetime, bucket_width: datetime.timedelta
):
now = time
rounded = now - (now - datetime.datetime.min) % bucket_width
return (rounded, rounded + bucket_width)
def add_record(self, record: T) -> None:
"""
Adds the record to the current bucket.
"""
if len(self.buffer) >= MAX_BUFFER_SIZE or self.end <= datetime.datetime.now():
self._flush()
self.buffer.append(record)
def force_flush(self) -> None:
"""
Used to trigger a force flush of remaining data stored inside buffer
"""
if not len(self.buffer):
return
self._flush()
def _flush(self) -> None:
"""
Flushes the current data onto storage source
"""
file_name = str(self.id) + ".csv"
directory_path = self._generate_path(self.storage_path, self.type, self.start)
file_path = os.path.join(directory_path, file_name)
if not os.path.isdir(directory_path):
os.makedirs(directory_path)
# Opening in append mode because it's possible we have to write to the same file multiple times with preexisting data
# newline is overridden to prevent spaces in the csv file between sections
with open(file_path, "a+", newline="") as outfile:
writer = csv.writer(outfile)
writer.writerows(map(lambda x: x.to_csv(), self.buffer))
self.start, self.end = self._calculate_bucket_endpoints(
datetime.datetime.now(), self.bucket_width
)
self.buffer = []
@staticmethod
def get_records(
since: datetime.datetime,
until: datetime.datetime,
type: str,
storage_path: str,
bucket_width: datetime.timedelta,
type_class: t.Type[CSViable],
):
"""
Returns the data of all csv files stored between the interval of 'since' to 'until'.
Uses the provided type_class.from_csv() method to return record data as a list of type_class instances.
"""
since_nearest = TimeBucketizer._calculate_bucket_endpoints(since, bucket_width)[
0
]
until_nearest = TimeBucketizer._calculate_bucket_endpoints(until, bucket_width)[
1
]
content_list = []
file_list = []
while since_nearest <= until_nearest:
directory_path = TimeBucketizer._generate_path(
storage_path, type, since_nearest
)
if os.path.isdir(directory_path):
file_list.extend(
[
os.path.join(directory_path, file)
for file in os.listdir(directory_path)
if os.path.isfile(os.path.join(directory_path, file))
]
)
since_nearest += bucket_width
for file in file_list:
with open(file, "r") as my_file:
content_list.extend(
list(map(type_class().from_csv, csv.reader(my_file)))
)
return content_list