in src/common_utils/bigquery_client_utils/ddl.py [0:0]
def run_script_files(file_list, error_handler, success_handler, job_id_prefix=None):
scripts = []
for file in file_list:
if file.endswith(".sql"):
scripts.append(Script(file))
marked_for_retry_count = 0
with concurrent.futures.ThreadPoolExecutor() as executor:
while True:
# Run all runnable scripts async
future_scripts = map(
lambda script: script.run(executor, job_id_prefix),
filter(Script.is_runnable, scripts),
)
# Handle results
for future_script in concurrent.futures.as_completed(future_scripts):
script = future_script.result()
job_exception = script.get_job().exception()
if not job_exception:
script.mark_as_done()
success_handler(script)
else:
script.mark_as_failed()
error_handler(script, job_exception)
marked_for_retry = list(
filter(
lambda script: script.get_status() == ScriptStatus.RETRY, scripts
)
)
if len(marked_for_retry) == marked_for_retry_count:
logging.info(
f"stopped retrying, retried {len(marked_for_retry)} files but none succeed, marking them as FAILED"
)
for script in marked_for_retry:
script.mark_as_failed()
break
else:
marked_for_retry_count = len(marked_for_retry)
logging.info(
f"finished iteration, marked {marked_for_retry_count} for retry"
)
if not marked_for_retry_count:
break
return scripts