in scripts/share_ad_job_state/import_model_snapshot.py [0:0]
def generate_actions(file_path: str, new_index: str):
"""
Generator function that reads a .ndjson file and yields actions.
It yields two lines at a time (action + document).
Args:
file_path (str): Path to the .ndjson file.
new_index (str): The new index name to replace in actions.
Yields:
Dict[str, Any]: Action dictionaries for bulk upload.
"""
try:
with open(file_path, "r", encoding="utf-8") as f:
while True:
action_line = f.readline().strip()
document_line = f.readline().strip()
if not action_line or not document_line:
break
# Replace the index name in the action
action = json.loads(action_line)
document = json.loads(document_line)
yield {
"_op_type": "index",
"_index": new_index,
"_id": action["index"]["_id"],
"_source": document,
}
except IOError as e:
logger.error(f"Error reading file {file_path}: {e}")