def create_job_id()

in tools/cloud_functions/gcs_event_based_ingest/gcs_ocn_bq_ingest/common/utils.py [0:0]


def create_job_id(success_file_path, data_source_name=None, table=None):
    """Create job id prefix with a consistent naming convention based on the
    success file path to give context of what caused this job to be submitted.
    the rules for success file name -> job id are:
    1. slashes to dashes
    2. all non-alphanumeric dash or underscore will be replaced with underscore
    Note, gcf-ingest- can be overridden with environment variable JOB_PREFIX
    3. uuid for uniqueness
    """
    if data_source_name and table:
        if len(table.table_id.split('$')) == 2:
            # This code is reached if the user has set an explicit load_data_source
            # key,value pair in the BQ_LOAD_CONFIG_FILENAME file and the GCS path has
            # partition information.
            partition_info = table.table_id.split('$')[1]
            clean_job_id = (f'{data_source_name}/'
                            f'{table.dataset_id}/'
                            f'{table.table_id.split("$")[0]}/')
            if len(partition_info) >= 4:
                clean_job_id += f'{partition_info[0:4]}/'
            if len(partition_info) >= 6:
                clean_job_id += f'{partition_info[4:6]}/'
            if len(partition_info) >= 8:
                clean_job_id += f'{partition_info[6:8]}/'
            if len(partition_info) == 10:
                clean_job_id += f'{partition_info[8:10]}/'
        else:
            # This code is reached if the user has set an explicit load_data_source
            # key,value pair in the BQ_LOAD_CONFIG_FILENAME file but the GCS path
            # and regex does NOT have any partition information.
            clean_job_id = (f'{data_source_name}/'
                            f'{table.dataset_id}/'
                            f'{table.table_id}/')
        clean_job_id = clean_job_id.replace('-',
                                            '_').replace('/',
                                                         '-').replace('$', '')
        clean_job_id = os.getenv('JOB_PREFIX',
                                 constants.DEFAULT_JOB_PREFIX) + clean_job_id
        clean_job_id += str(uuid.uuid4())
    else:
        clean_job_id = os.getenv('JOB_PREFIX', constants.DEFAULT_JOB_PREFIX)
        clean_job_id += re.compile(constants.NON_BQ_JOB_ID_REGEX).sub(
            '_', success_file_path.replace('/', '-'))
        # add uniqueness in case we have to "re-process" a success file that is
        # republished (e.g. to fix a bad batch of data) or handle multiple load jobs
        # for a single success file.
        clean_job_id += str(uuid.uuid4())
    # Make sure job id isn't too long (1024 chars max), but also leave 3 chars of space so that if a job fails
    # we can add a retry counter suffix to the original job_id.
    # For example, if 'some_job_id' fails, then on the third retry we'd see the following job id:
    #   some_job_id_03
    # where _03 means the third retry attempt.
    #
    # Source for job id max length: https://cloud.google.com/bigquery/docs/running-jobs#generate-jobid
    return clean_job_id[:1021]