def execute_merge_query()

in infra-as-code/modules/export-to-bq-incremental/function-source-code/lib.py [0:0]


    def execute_merge_query(self):
        """
        Executes a merge query in BigQuery to update or insert data from the staging table to the final table.
        """
        merge_query = f'''
            MERGE `{self.final_table_id}` T 
                USING (
                    SELECT 
                    conversationName, 
                    audioFileUri, 
                    dialogflowConversationProfileId, 
                    startTimestampUtc, 
                    loadTimestampUtc, 
                    analysisTimestampUtc, 
                    conversationUpdateTimestampUtc, 
                    year,
                    month,
                    day, 
                    durationNanos, 
                    silenceNanos, 
                    silencePercentage, 
                    agentSpeakingPercentage, 
                    clientSpeakingPercentage, 
                    agentSentimentScore, 
                    agentSentimentMagnitude, 
                    clientSentimentScore, 
                    clientSentimentMagnitude, 
                    transcript, 
                    turnCount, 
                    languageCode, 
                    medium, 
                    issues,
                    entities,
                    labels,
                    words,
                    sentences,
                    latestSummary,
                    qaScorecardResults,
                    agents
                    FROM `{self.staging_table_id}`) S 
                    ON T.conversationName = S.conversationName 
                WHEN MATCHED AND T.conversationUpdateTimestampUtc != S.conversationUpdateTimestampUtc THEN 
                    UPDATE SET
                        audioFileUri = S.audioFileUri, 
                        dialogflowConversationProfileId = S.dialogflowConversationProfileId, 
                        startTimestampUtc = S.startTimestampUtc, 
                        loadTimestampUtc = S.loadTimestampUtc, 
                        analysisTimestampUtc = S.analysisTimestampUtc, 
                        conversationUpdateTimestampUtc = S.conversationUpdateTimestampUtc, 
                        year = S.year,
                        month = S.month,
                        day = S.day, 
                        durationNanos = S.durationNanos, 
                        silenceNanos = S.silenceNanos, 
                        silencePercentage = S.silencePercentage, 
                        agentSpeakingPercentage = S.agentSpeakingPercentage, 
                        clientSpeakingPercentage = S.clientSpeakingPercentage, 
                        agentSentimentScore = S.agentSentimentScore, 
                        agentSentimentMagnitude = S.agentSentimentMagnitude, 
                        clientSentimentScore = S.clientSentimentScore, 
                        clientSentimentMagnitude = S.clientSentimentMagnitude, 
                        transcript = S.transcript, 
                        turnCount = S.turnCount, 
                        languageCode = S.languageCode, 
                        medium = S.medium, 
                        issues = S.issues,
                        entities = S.entities,
                        labels = S.labels,
                        words = S.words,
                        sentences = S.sentences,
                        latestSummary = S.latestSummary,
                        qaScorecardResults = S.qaScorecardResults,
                        agents = S.agents
                WHEN NOT MATCHED THEN
                    INSERT ROW
        '''

        print('Merge query to be executed:')
        print(merge_query)

        merge_job = self.bq_client.query(merge_query)  # API request
        merge_result = merge_job.result()  # Waits for query to finish

        print('Merge query result:')
        print(f'job_id: {merge_job.job_id}')
        print(f'num_dml_affected_rows: {merge_job.num_dml_affected_rows}')