in athena_glue_service_logs/partitioners/grouped_date_partitioner.py [0:0]
def find_recent_partitions(self, existing_partitions):
"""Search for recent grouped date partitions on S3 and return a list of the partition values.
Note that this uses the existing_partitions list to determine the region/grouping names."""
parts_by_group = defaultdict(list)
partitions_to_add = []
# Create a dictionary of date partitions by region
for part in existing_partitions:
parts_by_group[part[0]].append(part[1:])
# Now check to see if, in each region, that a partition day exists for today.
# If it does not, backfill up to today.
today = datetime.utcfromtimestamp(time.time()).date()
day_diff = 0
for key, values in parts_by_group.items():
# Go back a set number of days for now and only if S3 objects actually exist...
for _ in range(self.MAX_RECENT_DAYS):
new_day = today + timedelta(days=day_diff)
new_day_tuple = new_day.strftime('%Y-%m-%d').split('-')
if values[-1] != new_day_tuple:
partition_tuple = [key] + new_day_tuple
if S3Reader(self.build_partitioned_path(partition_tuple)).does_have_objects():
partitions_to_add.append(partition_tuple)
else:
break
day_diff -= 1
return partitions_to_add