in infra-as-code/modules/export-to-bq-incremental/function-source-code/lib.py [0:0]
def execute_merge_query(self):
"""
Executes a merge query in BigQuery to update or insert data from the staging table to the final table.
"""
merge_query = f'''
MERGE `{self.final_table_id}` T
USING (
SELECT
conversationName,
audioFileUri,
dialogflowConversationProfileId,
startTimestampUtc,
loadTimestampUtc,
analysisTimestampUtc,
conversationUpdateTimestampUtc,
year,
month,
day,
durationNanos,
silenceNanos,
silencePercentage,
agentSpeakingPercentage,
clientSpeakingPercentage,
agentSentimentScore,
agentSentimentMagnitude,
clientSentimentScore,
clientSentimentMagnitude,
transcript,
turnCount,
languageCode,
medium,
issues,
entities,
labels,
words,
sentences,
latestSummary,
qaScorecardResults,
agents
FROM `{self.staging_table_id}`) S
ON T.conversationName = S.conversationName
WHEN MATCHED AND T.conversationUpdateTimestampUtc != S.conversationUpdateTimestampUtc THEN
UPDATE SET
audioFileUri = S.audioFileUri,
dialogflowConversationProfileId = S.dialogflowConversationProfileId,
startTimestampUtc = S.startTimestampUtc,
loadTimestampUtc = S.loadTimestampUtc,
analysisTimestampUtc = S.analysisTimestampUtc,
conversationUpdateTimestampUtc = S.conversationUpdateTimestampUtc,
year = S.year,
month = S.month,
day = S.day,
durationNanos = S.durationNanos,
silenceNanos = S.silenceNanos,
silencePercentage = S.silencePercentage,
agentSpeakingPercentage = S.agentSpeakingPercentage,
clientSpeakingPercentage = S.clientSpeakingPercentage,
agentSentimentScore = S.agentSentimentScore,
agentSentimentMagnitude = S.agentSentimentMagnitude,
clientSentimentScore = S.clientSentimentScore,
clientSentimentMagnitude = S.clientSentimentMagnitude,
transcript = S.transcript,
turnCount = S.turnCount,
languageCode = S.languageCode,
medium = S.medium,
issues = S.issues,
entities = S.entities,
labels = S.labels,
words = S.words,
sentences = S.sentences,
latestSummary = S.latestSummary,
qaScorecardResults = S.qaScorecardResults,
agents = S.agents
WHEN NOT MATCHED THEN
INSERT ROW
'''
print('Merge query to be executed:')
print(merge_query)
merge_job = self.bq_client.query(merge_query) # API request
merge_result = merge_job.result() # Waits for query to finish
print('Merge query result:')
print(f'job_id: {merge_job.job_id}')
print(f'num_dml_affected_rows: {merge_job.num_dml_affected_rows}')