def writeTableToTargetBucket()

in sdlf-utils/pipeline-examples/glue-jobs-deployer/pipeline_scripts/examplepipeline-glue-job.py [0:0]


def writeTableToTargetBucket():
    # Initialize metadata objects related to target table
    if props.writeAsTable:
        getTablePartitionFields()
        getTableFields()
    props.processDatesCondition = getSourceTimeCondition(props)

    queryHql = sqlContent.replace('\t', '    ')
    queryHql = queryHql.replace("$TARGET_DATABASE", "'" + props.targetDatabase + "'")
    queryHql = queryHql.replace("$START_DATE", "'" + props.startDateStr + "'")
    queryHql = queryHql.replace("$END_DATE", "'" + props.endDateStr + "'")
    queryHql = queryHql.replace("$START_YEAR", "'" + props.startYearStr + "'")
    queryHql = queryHql.replace("$START_MONTH", "'" + props.startMonthStr + "'")
    queryHql = queryHql.replace("$START_DAY", "'" + props.startDayStr + "'")
    queryHql = queryHql.replace("$END_YEAR", "'" + props.endYearStr + "'")
    queryHql = queryHql.replace("$END_MONTH", "'" + props.endMonthStr + "'")
    queryHql = queryHql.replace("$END_DAY", "'" + props.endDayStr + "'")
    queryHql = queryHql.replace("$TIME_CONDITION", props.processDatesCondition)

    for index, value in enumerate(props.partitionValues.values(), start=1):
        queryHql = queryHql.replace("$VALUE_" + str(index), value)

    for value in props.sourcesDict.keys():
        if value == 'source_database':
            queryHql = queryHql.replace("$SOURCE_DATABASE.", props.sourcesDict[value] + ".")
        else:
            queryHql = queryHql.replace("$SOURCE_DATABASE_" + value.split("_")[-1] + ".",
                                        props.sourcesDict[value] + ".")

    log.info("JOB_ID::: " + str(props.jobId))

    queryHql = queryHql.replace('$JOB_ID', str(props.jobId))

    if not props.isCdcTable:
        deleteOldData(props.targetTableBucketLocation)

    spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    spark.conf.set("spark.sql.crossJoin.enabled", "true")
    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    if props.writeAsTable:
        spark.sql("use " + props.targetDatabase)

    log.info("sql: " + queryHql)
    try:
        resultDf = spark.sql(queryHql)
        log.info('Query executed')
        if props.isCdcTable:
            if props.hasDtPartition:
                props.cdcDatePartitionsToProcess = getCdcPartitionsToProcess(resultDf)
            else:
                createAnalyticsFolderIfNotExists(props)
            createCdcTempTableSQL(props)
            time.sleep(2)

    except Exception as x:
        err = 'Unable to process SQL query. ERROR : ' + str(x)
        log.error(err)
        finish_job_fail(err)

    if props.dropDuplicates==True:
        resultDf = resultDf.dropDuplicates()

    if props.isCdcTable:
        processCdcFlow(props, resultDf)
    else:
        if props.sparkPartitions > 0:
            if props.writeAsTable:
                resultDf.coalesce(props.sparkPartitions)\
                    .write.mode("overwrite")\
                    .format(props.outputFormat)\
                    .insertInto(props.targetTableFQDN, overwrite=True)
            else:
                resultDf.coalesce(props.sparkPartitions).write.mode("overwrite")\
                    .format(props.outputFormat) \
                    .option('header','true') \
                    .save('s3://'+props.targetTableBucketLocation+'/'+props.targetTablePathLocation+'/')
        else:
            if props.writeAsTable:
                resultDf.write.mode("overwrite")\
                    .format(props.outputFormat)\
                    .insertInto(props.targetTableFQDN, overwrite=True)
            else:
                resultDf.write.mode("overwrite")\
                    .format(props.outputFormat) \
                    .option('header', 'true') \
                    .save('s3://'+props.targetTableBucketLocation+'/'+props.targetTablePathLocation+'/')

    log.info("Query finished.")
    return