in bsp_server/scip_sync_util/scip_utils.py [0:0]
def copy_index(index_to_copy: set[str], dest: str) -> None:
utils.safe_create(dest, is_dir=True)
def process_and_copy_scip_index(
source_path: str, current_status: dict
) -> tuple[str, str]:
try:
# Process index info
source_sha_path = source_path + scip_const.SHA256_FILE_SUFFIX
relative_path = source_path.split(os.path.sep + "bin" + os.path.sep)[-1]
scip_index_name = relative_path.replace("/", "_").replace("-", "_")
new_sha = get_sha256_for_file(source_sha_path)
# Check if copy needed
if (
scip_index_name in current_status
and current_status[scip_index_name] == new_sha
):
return (
scip_index_name,
scip_index_name + scip_const.SHA256_FILE_SUFFIX,
)
# Copy files if needed
dest_index_path = os.path.join(dest, scip_index_name)
shutil.copy(source_path, dest_index_path)
shutil.copy(
source_sha_path, dest_index_path + scip_const.SHA256_FILE_SUFFIX
)
return (scip_index_name, scip_index_name + scip_const.SHA256_FILE_SUFFIX)
except Exception as e:
print(f"Failed to process index {source_path}: {str(e)}")
return None
def get_current_status(filename: str) -> tuple[str, str]:
if not filename.endswith(".scip"):
return None, None
sha = get_sha256_for_file(
os.path.join(dest, filename + scip_const.SHA256_FILE_SUFFIX)
)
return (filename, sha) if sha else (None, None)
with ThreadPoolExecutor(max_workers=get_thread_pool_size()) as executor:
# Get current status
current_status = dict(
filter(
None,
executor.map(
get_current_status,
[f for f in os.listdir(dest) if f.endswith(".scip")],
),
)
)
# Process and copy files
copy_results = list(
filter(
None,
executor.map(
lambda src: process_and_copy_scip_index(src, current_status),
index_to_copy,
),
)
)
# Delete old files
files_to_keep = {name for pair in copy_results for name in pair}
files_to_delete = (
set(os.listdir(dest)) - files_to_keep - {scip_const.WORKSPACE_FILE_NAME}
)
files_to_delete = {
f
for f in files_to_delete
if not f.startswith(scip_const.JDK_SCIP_FILE_PREFIX)
}
if files_to_delete:
list(
executor.map(
lambda f: (
os.remove(os.path.join(dest, f))
if os.path.isfile(os.path.join(dest, f))
else shutil.rmtree(os.path.join(dest, f))
),
files_to_delete,
)
)