in assets/model_monitoring/components/src/model_data_collector_preprocessor/mdc_preprocessor_helper.py [0:0]
def get_file_list(start_datetime: datetime, end_datetime: datetime, store_url: StoreUrl = None,
input_data: str = None, scheme: str = "abfs") -> List[str]:
"""Get the available file list for the given time window under the input_data folder."""
def date_range(start, end, step):
cur = start
while cur < end:
yield cur
cur += step
def is_same_year(start, end) -> bool:
return start.year == end.year
def is_same_month(start, end) -> bool:
return is_same_year(start, end) and start.month == end.month
def is_same_day(start, end) -> bool:
return is_same_month(start, end) and start.day == end.day
def is_start_of_year(d) -> bool:
return (d.month, d.day, d.hour) == (1, 1, 0)
def is_end_of_year(d) -> bool:
return (d.month, d.day, d.hour) == (12, 31, 23)
def is_start_of_month(d) -> bool:
return (d.day, d.hour) == (1, 0)
def is_end_of_month(d) -> bool:
_, month_days = calendar.monthrange(d.year, d.month)
return (d.day, d.hour) == (month_days, 23)
def is_start_of_day(d) -> bool:
return d.hour == 0
def is_end_of_day(d) -> bool:
return d.hour == 23
def get_url(path) -> str:
if scheme == "abfs":
return store_url.get_abfs_url(path)
elif scheme == "azureml":
return store_url.get_azureml_url(path)
else:
raise ValueError(f"Unsupported scheme {scheme}")
def same_year(start, end) -> List[str]:
if is_same_month(start, end):
return same_month(start, end)
if is_start_of_year(start) and is_end_of_year(end):
return _get_files(f"{start.strftime('%Y')}/*/*/*/*.jsonl")
return cross_month(start, end)
def same_month(start, end) -> List[str]:
if is_same_day(start, end):
return same_day(start, end)
if is_start_of_month(start) and is_end_of_month(end):
return _get_files(f"{start.strftime('%Y/%m')}/*/*/*.jsonl")
return cross_day(start, end)
def same_day(start, end) -> List[str]:
if is_start_of_day(start) and is_end_of_day(end):
return _get_files(f"{start.strftime('%Y/%m/%d')}/*/*.jsonl")
return cross_hour(start, end)
def start_of_year(y) -> List[str]:
if is_end_of_year(y):
return _get_files(f"{y.strftime('%Y')}/*/*/*/*.jsonl")
return same_year(y.replace(month=1, day=1, hour=0, minute=0, second=0), y)
def end_of_year(y) -> List[str]:
if is_start_of_year(y):
return _get_files(f"{y.strftime('%Y')}/*/*/*/*.jsonl")
return same_year(y, y.replace(month=12, day=31, hour=23, minute=59, second=59))
def start_of_month(m) -> List[str]:
if is_end_of_month(m):
return _get_files(f"{m.strftime('%Y/%m')}/*/*/*.jsonl")
return same_month(m.replace(day=1, hour=0, minute=0, second=0), m)
def end_of_month(m) -> List[str]:
if is_start_of_month(m):
return _get_files(f"{m.strftime('%Y/%m')}/*/*/*.jsonl")
_, month_days = calendar.monthrange(m.year, m.month)
return same_month(m, m.replace(day=month_days, hour=23, minute=59, second=59))
def start_of_day(d) -> List[str]:
if is_end_of_day(d):
return _get_files(f"{d.strftime('%Y/%m/%d')}/*/*.jsonl")
return same_day(d.replace(hour=0, minute=0, second=0), d)
def end_of_day(d) -> List[str]:
if is_start_of_day(d):
return _get_files(f"{d.strftime('%Y/%m/%d')}/*/*.jsonl")
return same_day(d, d.replace(hour=23, minute=59, second=59))
def cross_year(start, end) -> List[str]:
middle_years = [
get_url(f"{y}/*/*/*/*.jsonl") for y in range(start.year+1, end.year)
if store_url.any_files(f"{y}/*/*/*/*.jsonl")
]
return end_of_year(start) + middle_years + start_of_year(end)
def cross_month(start: datetime, end) -> List[str]:
_start = (start + relativedelta(months=1)).replace(day=1, hour=1)
_end = end.replace(day=1, hour=0) # skip last month
middle_months = [
get_url(f"{m.strftime('%Y/%m')}/*/*/*.jsonl")
for m in date_range(_start, _end, relativedelta(months=1))
if store_url.any_files(f"{m.strftime('%Y/%m')}/*/*/*.jsonl")
]
return end_of_month(start) + middle_months + start_of_month(end)
def cross_day(start, end) -> List[str]:
_start = (start + timedelta(days=1)).replace(hour=1)
_end = end.replace(hour=0) # skip last day
middle_days = [
get_url(f"{d.strftime('%Y/%m/%d')}/*/*.jsonl")
for d in date_range(_start, _end, timedelta(days=1))
if store_url.any_files(f"{d.strftime('%Y/%m/%d')}/*/*.jsonl")
]
return end_of_day(start) + middle_days + start_of_day(end)
def cross_hour(start, end) -> List[str]:
_start = start.replace(minute=0)
_end = end.replace(minute=59)
return [
get_url(f"{h.strftime('%Y/%m/%d/%H')}/*.jsonl")
for h in date_range(_start, _end, timedelta(hours=1))
if store_url.any_files(f"{h.strftime('%Y/%m/%d/%H')}/*.jsonl")
]
def _get_files(file_pattern: str) -> List[str]:
return [get_url(file_pattern)] if store_url.any_files(file_pattern) else []
# ensure start_datetime and end_datetime both or neither have tzinfo
if start_datetime.tzinfo is None:
start_datetime = start_datetime.replace(tzinfo=end_datetime.tzinfo)
if end_datetime.tzinfo is None:
end_datetime = end_datetime.replace(tzinfo=start_datetime.tzinfo)
store_url = store_url or StoreUrl(input_data)
if end_datetime.minute == 0 and end_datetime.second == 0:
# if end_datetime is a whole hour, the last hour folder is not needed
end_datetime -= timedelta(seconds=1)
if is_same_year(start_datetime, end_datetime):
return same_year(start_datetime, end_datetime)
return cross_year(start_datetime, end_datetime)