in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]
def create_job_id(success_file_path, data_source_name=None, table=None):
"""Create job id prefix with a consistent naming convention based on the
success file path to give context of what caused this job to be submitted.
the rules for success file name -> job id are:
1. slashes to dashes
2. all non-alphanumeric dash or underscore will be replaced with underscore
Note, gcf-ingest- can be overridden with environment variable JOB_PREFIX
3. uuid for uniqueness
"""
if data_source_name and table:
if len(table.table_id.split('$')) == 2:
# This code is reached if the user has set an explicit load_data_source
# key,value pair in the BQ_LOAD_CONFIG_FILENAME file and the GCS path has
# partition information.
partition_info = table.table_id.split('$')[1]
clean_job_id = (f'{data_source_name}/'
f'{table.dataset_id}/'
f'{table.table_id.split("$")[0]}/')
if len(partition_info) >= 4:
clean_job_id += f'{partition_info[0:4]}/'
if len(partition_info) >= 6:
clean_job_id += f'{partition_info[4:6]}/'
if len(partition_info) >= 8:
clean_job_id += f'{partition_info[6:8]}/'
if len(partition_info) == 10:
clean_job_id += f'{partition_info[8:10]}/'
else:
# This code is reached if the user has set an explicit load_data_source
# key,value pair in the BQ_LOAD_CONFIG_FILENAME file but the GCS path
# and regex does NOT have any partition information.
clean_job_id = (f'{data_source_name}/'
f'{table.dataset_id}/'
f'{table.table_id}/')
clean_job_id = clean_job_id.replace('-',
'_').replace('/',
'-').replace('$', '')
clean_job_id = os.getenv('JOB_PREFIX',
constants.DEFAULT_JOB_PREFIX) + clean_job_id
clean_job_id += str(uuid.uuid4())
else:
clean_job_id = os.getenv('JOB_PREFIX', constants.DEFAULT_JOB_PREFIX)
clean_job_id += re.compile(constants.NON_BQ_JOB_ID_REGEX).sub(
'_', success_file_path.replace('/', '-'))
# add uniqueness in case we have to "re-process" a success file that is
# republished (e.g. to fix a bad batch of data) or handle multiple load jobs
# for a single success file.
clean_job_id += str(uuid.uuid4())
# Make sure job id isn't too long (1024 chars max), but also leave 3 chars of space so that if a job fails
# we can add a retry counter suffix to the original job_id.
# For example, if 'some_job_id' fails, then on the third retry we'd see the following job id:
# some_job_id_03
# where _03 means the third retry attempt.
#
# Source for job id max length: https://cloud.google.com/bigquery/docs/running-jobs#generate-jobid
return clean_job_id[:1021]