in tools/scripts/ops/fetch_latest_models.py [0:0]
def _copy_models(aws_reference_models_dir: str,
destination_dir: str,
destination_suffix: str,
source_filename: str,
filtering_regex_pattern: re.Pattern):
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
existing_rules = os.listdir(destination_dir)
existing_rules_set = {filename for filename in existing_rules
if os.path.isfile("/".join([destination_dir, filename])) and re.search(filtering_regex_pattern, filename)}
cloned_items = os.listdir(aws_reference_models_dir)
for cloned_item in cloned_items:
if not cloned_item.startswith(".") and os.path.isdir(aws_reference_models_dir + "/" + cloned_item):
aws_model_name = cloned_item
aws_model_dirs = os.listdir(aws_reference_models_dir + "/" + cloned_item)
try:
latest_c2j_model_date = str(max(d for d in aws_model_dirs if isinstance(_dir_name_to_date(d), datetime.date)))
dst_model_name = f"{aws_model_name}-{latest_c2j_model_date}.{destination_suffix}"
if dst_model_name not in existing_rules_set:
print(f"New model file: {dst_model_name}")
shutil.copyfile(f"{aws_reference_models_dir}/{aws_model_name}/{latest_c2j_model_date}/{source_filename}", f"{destination_dir}/{dst_model_name}")
# remove all models with this name
existing_rules_for_this_service = {item for item in existing_rules_set if filtering_regex_pattern.match(item).group("service") == aws_model_name}
existing_rules_set -= existing_rules_for_this_service
except Exception as exc:
print(f"Error: Unexpected folder structure at {destination_dir} {cloned_item}: {exc}")
if existing_rules_set:
print(f"Warning: Extra SDK Models not present in the reference models repo:\n {sorted(existing_rules_set)}")