# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from google.cloud import bigquery
from datetime import date, timedelta, datetime
import json
import logging
import functions_framework
import google.oauth2.id_token
from google.cloud import error_reporting
from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1
from google.cloud.workflows.executions_v1.types.executions import Execution

# Access environment variables
WORKFLOW_CONTROL_PROJECT_ID = os.environ.get('WORKFLOW_CONTROL_PROJECT_ID')
WORKFLOW_CONTROL_DATASET_ID = os.environ.get('WORKFLOW_CONTROL_DATASET_ID')
WORKFLOW_CONTROL_TABLE_ID = os.environ.get('WORKFLOW_CONTROL_TABLE_ID')
WORKFLOWS_LOCATION = os.environ.get('WORKFLOWS_LOCATION')
DEFAULT_TIME_FORMAT = '%Y-%m-%d'

# Logs
error_client = error_reporting.Client()
client = google.cloud.logging.Client()
client.setup_logging()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# clients
bq_client = bigquery.Client(project=WORKFLOW_CONTROL_PROJECT_ID)
execution_client = executions_v1.ExecutionsClient()
workflows_client = workflows_v1.WorkflowsClient()


@functions_framework.http
def main(request):
    """
    Main function, likely triggered by an HTTP request
    its main responsibility is to trigger a cloud workflows pipeline, parsing start and end
    execution dates, and logging to bigquery control table.

    Args:
        request: The incoming HTTP request object.

    """
    event = request.get_json()
    print("event: " + str(event))
    start_date = event.get('start_date')
    end_date = event.get('end_date')
    workflows_name = event.get('workflows_name')
    validation_date_pattern = event.get('validation_date_pattern')
    same_day_execution = event.get('same_day_execution', 'YESTERDAY')
    workflow_status = event.get('workflow_status')
    workflow_properties = event.get('workflow_properties')
    execution_id = 0
    try:
        if workflow_status == "ENABLED":
            execution_id = call_workflows(workflows_name, start_date, end_date,
                                          validation_date_pattern, workflow_properties,
                                          same_day_execution)
        else:
            print('Workflow Disabled')
        return execution_id
    except Exception as ex:
        exception_message = "Exception : " + repr(ex)
        error_client.report_exception()
        logger.error(exception_message)
        print(RuntimeError(repr(ex)))
        return exception_message, 500


def call_workflows(workflows_name, start_date, end_date,
                   validation_date_pattern, workflow_properties,
                   same_day_execution):
    """
    calls a cloud workflows pipeline passed by parameter

    Args:
        workflows_name: name of te cloud workflows to execute
        start_date: start date passed by parameter to the workflows pipeline ( normally a data pipeline )
        end_date: end date passed by parameter to the workflows pipeline ( normally a data pipeline )
        validation_date_pattern: python data pattern format to apply to start and end dates
        workflow_properties: custom properties passed to the cloud workflows.
        same_day_execution: can be YESTERDAY, TODAY or YESTERDAY_TODAY indicating dates that should be passed in
        start and end dates, if not received by parameter.

    Returns:
        execution_id: cloud workflows unique execution identifier
    """
    print("Launching Custom Workflow.....")

    if start_date is None:  # it means is not done manually
        start_date, end_date = process_dates(validation_date_pattern, same_day_execution)
    if end_date is None:
        end_date = start_date
    if isinstance(workflow_properties, str):
        workflow_properties = json.loads(workflow_properties)
    arguments = {
        "workflow_name": workflows_name,
        "query_variables": {
            "start_date": start_date,
            "end_date": end_date,
        },
        "workflow_properties": workflow_properties
    }
    print('Cloud Workflows input params: %s ', arguments)
    execution = Execution(argument=json.dumps(arguments))
    # Construct the fully qualified location path.
    parent = workflows_client.workflow_path(WORKFLOW_CONTROL_PROJECT_ID, WORKFLOWS_LOCATION, workflows_name)

    # Execute the workflow.
    response = execution_client.create_execution(parent=parent, execution=execution)
    execution_id = response.name.split("/")[-1]
    print(f"Created execution: {execution_id}")
    return execution_id


def process_dates(validation_date_pattern, same_day_execution):
    """method to process start and end dates when no received by parameter

    Args:
        validation_date_pattern: python data pattern format to apply to start and end dates
        same_day_execution: can be YESTERDAY, TODAY or YESTERDAY_TODAY indicating dates that should be passed in
        start and end dates, if not received by parameter.

    Returns:
        Start and end dates parsed

    """
    today = date.today()
    # if is a daily pattern, execute the previous day
    if validation_date_pattern == DEFAULT_TIME_FORMAT:
        if same_day_execution == 'YESTERDAY':
            last_day = today - timedelta(days=1)
        else:  # TODAY, YESTERDAY_TODAY
            last_day = today
        end_date = str(last_day.strftime(validation_date_pattern))
        if same_day_execution == 'TODAY':
            start_date = today
        else:  # YESTERDAY_TODAY, YESTERDAY
            start_date = today - timedelta(days=1)
        start_date = str(start_date.strftime(validation_date_pattern))
    # is a monthly pattern, execute the pattern the last month with regard
    # to the actual date
    else:
        first = today.replace(day=1)
        last_month_last_day = first - timedelta(days=1)
        last_month_first_day = last_month_last_day.replace(day=1)
        end_date = str(last_month_last_day.strftime(validation_date_pattern))
        start_date = str(last_month_first_day.strftime(validation_date_pattern))
    return start_date, end_date
