in sdlf-utils/pipeline-examples/glue-jobs-deployer/pipeline_scripts/examplepipeline-glue-job.py [0:0]
def writeTableToTargetBucket():
# Initialize metadata objects related to target table
if props.writeAsTable:
getTablePartitionFields()
getTableFields()
props.processDatesCondition = getSourceTimeCondition(props)
queryHql = sqlContent.replace('\t', ' ')
queryHql = queryHql.replace("$TARGET_DATABASE", "'" + props.targetDatabase + "'")
queryHql = queryHql.replace("$START_DATE", "'" + props.startDateStr + "'")
queryHql = queryHql.replace("$END_DATE", "'" + props.endDateStr + "'")
queryHql = queryHql.replace("$START_YEAR", "'" + props.startYearStr + "'")
queryHql = queryHql.replace("$START_MONTH", "'" + props.startMonthStr + "'")
queryHql = queryHql.replace("$START_DAY", "'" + props.startDayStr + "'")
queryHql = queryHql.replace("$END_YEAR", "'" + props.endYearStr + "'")
queryHql = queryHql.replace("$END_MONTH", "'" + props.endMonthStr + "'")
queryHql = queryHql.replace("$END_DAY", "'" + props.endDayStr + "'")
queryHql = queryHql.replace("$TIME_CONDITION", props.processDatesCondition)
for index, value in enumerate(props.partitionValues.values(), start=1):
queryHql = queryHql.replace("$VALUE_" + str(index), value)
for value in props.sourcesDict.keys():
if value == 'source_database':
queryHql = queryHql.replace("$SOURCE_DATABASE.", props.sourcesDict[value] + ".")
else:
queryHql = queryHql.replace("$SOURCE_DATABASE_" + value.split("_")[-1] + ".",
props.sourcesDict[value] + ".")
log.info("JOB_ID::: " + str(props.jobId))
queryHql = queryHql.replace('$JOB_ID', str(props.jobId))
if not props.isCdcTable:
deleteOldData(props.targetTableBucketLocation)
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
if props.writeAsTable:
spark.sql("use " + props.targetDatabase)
log.info("sql: " + queryHql)
try:
resultDf = spark.sql(queryHql)
log.info('Query executed')
if props.isCdcTable:
if props.hasDtPartition:
props.cdcDatePartitionsToProcess = getCdcPartitionsToProcess(resultDf)
else:
createAnalyticsFolderIfNotExists(props)
createCdcTempTableSQL(props)
time.sleep(2)
except Exception as x:
err = 'Unable to process SQL query. ERROR : ' + str(x)
log.error(err)
finish_job_fail(err)
if props.dropDuplicates==True:
resultDf = resultDf.dropDuplicates()
if props.isCdcTable:
processCdcFlow(props, resultDf)
else:
if props.sparkPartitions > 0:
if props.writeAsTable:
resultDf.coalesce(props.sparkPartitions)\
.write.mode("overwrite")\
.format(props.outputFormat)\
.insertInto(props.targetTableFQDN, overwrite=True)
else:
resultDf.coalesce(props.sparkPartitions).write.mode("overwrite")\
.format(props.outputFormat) \
.option('header','true') \
.save('s3://'+props.targetTableBucketLocation+'/'+props.targetTablePathLocation+'/')
else:
if props.writeAsTable:
resultDf.write.mode("overwrite")\
.format(props.outputFormat)\
.insertInto(props.targetTableFQDN, overwrite=True)
else:
resultDf.write.mode("overwrite")\
.format(props.outputFormat) \
.option('header', 'true') \
.save('s3://'+props.targetTableBucketLocation+'/'+props.targetTablePathLocation+'/')
log.info("Query finished.")
return