# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import datetime
import logging
import os
import sys
import yaml
from google.api_core import exceptions
from google.cloud import resourcemanager_v3
from typing import Optional, List, Dict, Tuple
from sending import SendingClient

CONFIG_FILE = "config.yml"

class IAMPolicyComplianceChecker:

    def is_project_service_account_email(self, email: Optional[str]) -> bool:
        """
        Returns True if the email is not a service account, or if it is a service account and the email contains the project_id.
        """
        if email and email.endswith('.gserviceaccount.com'):
            return self.project_id in email
        return True

    def __init__(self, project_id: str, users_file: str, logger: logging.Logger, sending_client: Optional[SendingClient] = None):
        self.project_id = project_id
        self.users_file = users_file
        self.client = resourcemanager_v3.ProjectsClient()
        self.logger = logger
        self.sending_client = sending_client

    def _parse_member(self, member: str) -> tuple[str, Optional[str], str]:
        """Parses an IAM member string to extract type, email, and a derived username.

        Args:
            member: The IAM member string
        Returns:
            A tuple containing:
                - username: The derived username from the member string.
                - email: The email address if available, otherwise None.
                - member_type: The type of the member (e.g., user, serviceAccount, group).
        """
        email = None
        username = member

        # Split the member string to determine type and identifier
        parts = member.split(':', 1)
        member_type = parts[0] if len(parts) > 1 else "unknown"
        identifier = parts[1] if len(parts) > 1 else member

        if member_type in ["user", "serviceAccount", "group"]:
            email = identifier
            if '@' in identifier:
                username = identifier.split('@')[0]
            else:
                username = identifier
        else:
            username = identifier
            member_type = "unknown"
            email = None

        return username, email, member_type

    def _export_project_iam(self) -> List[Dict]:
        """Exports the IAM policy for a given project to YAML format.

        Returns:
            A list of dictionaries containing the IAM policy details.
        """

        try:
            policy = self.client.get_iam_policy(resource=f"projects/{self.project_id}")
            self.logger.debug(f"Retrieved IAM policy for project {self.project_id}")
        except exceptions.NotFound as e:
            self.logger.error(f"Project {self.project_id} not found: {e}")
            raise
        except exceptions.PermissionDenied as e:
            self.logger.error(f"Permission denied for project {self.project_id}: {e}")
            raise
        except Exception as e:
            self.logger.error(f"An error occurred while retrieving IAM policy for project {self.project_id}: {e}")
            raise

        members_data = {}

        for binding in policy.bindings:
            role = binding.role

            for member_str in binding.members:
                if member_str not in members_data:
                    username, email_address, member_type = self._parse_member(member_str)
                    # Skip service accounts not matching the project_id
                    if member_type == "serviceAccount" and not self.is_project_service_account_email(email_address):
                        self.logger.debug(f"Skipping service account not matching project_id ({self.project_id}): {email_address}")
                        continue
                    if member_type == "unknown":
                        self.logger.warning(f"Skipping member {member_str} with no email address")
                        continue  # Skip if no email address is found, probably a malformed member
                    members_data[member_str] = {
                        "username": username,
                        "email": email_address,
                        "member_type": member_type,
                        "permissions": []
                    }

                # Skip permissions that have a condition
                if "withcond" in role:
                    continue

                permission_entry = {}
                permission_entry["role"] = role

                members_data[member_str]["permissions"].append(permission_entry)

        output_list = []
        for data in members_data.values():
            data["permissions"] = sorted(data["permissions"], key=lambda p: p["role"])
            output_list.append({
                "username": data["username"],
                "email": data["email"],
                "member_type": data["member_type"],
                "permissions": data["permissions"]
            })

        output_list.sort(key=lambda x: x["username"])
        return output_list

    def _read_project_iam_file(self) -> List[Dict]:
        """Reads the IAM policy from a YAML file.

        Returns:
            A list of dictionaries containing the IAM policy details.
        """
        try:
            with open(self.users_file, "r") as file:
                iam_policy = yaml.safe_load(file)


                self.logger.debug(f"Retrieved IAM policy from file for project {self.project_id}")
                return iam_policy
        except FileNotFoundError:
            self.logger.error(f"IAM policy file not found for project {self.project_id}")
            return []
        except Exception as e:
            self.logger.error(f"An error occurred while reading IAM policy file for project {self.project_id}: {e}")
            return []

    def _to_yaml_file(self, data: List[Dict], output_file: str, header_info: str = "") -> None:
        """
        Writes a list of dictionaries to a YAML file.
        Include the apache license header on the files

        Args:
            data: A list of dictionaries containing user permissions and details.
            output_file: The file path where the YAML output will be written.
            header_info: A string containing the header information to be included in the YAML file.
        """

        apache_license_header = """# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
    """

        # Prepare the header with the Apache license
        header = f"{apache_license_header}\n# {header_info}\n# Generated on {datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n"

        try:
            with open(output_file, "w") as file:
                file.write(header)
                yaml.dump(data, file, sort_keys=False, default_flow_style=False, indent=2)
            self.logger.info(f"Successfully wrote IAM policy data to {output_file}")
        except IOError as e:
            self.logger.error(f"Failed to write to {output_file}: {e}")
            raise
        
    def check_compliance(self) -> List[str]:
        """
        Checks the compliance of the IAM policy against the defined policies.

        Returns:
            A list of strings describing any compliance issues found.
        """

        current_users = {user['email']: user for user in self._export_project_iam() if self.is_project_service_account_email(user.get('email'))}
        existing_users = {user['email']: user for user in self._read_project_iam_file() if self.is_project_service_account_email(user.get('email'))}

        if not existing_users:
            error_msg = f"No IAM policy found in the {self.users_file}."
            self.logger.info(error_msg)
            raise RuntimeError(error_msg)

        differences = []        

        all_emails = set(current_users.keys()) | set(existing_users.keys())

        for email in sorted(list(all_emails)):
            current_user = current_users.get(email)
            existing_user = existing_users.get(email)

            if current_user and not existing_user:
                differences.append(f"User {email} not found in existing policy.")
            elif not current_user and existing_user:
                differences.append(f"User {email} found in policy file but not in GCP.")
            elif current_user and existing_user:
                if current_user.get("member_type") != existing_user.get("member_type"):
                    differences.append(f"User {email} has different member type. In GCP: {current_user.get('member_type')}, in file: {existing_user.get('member_type')}")
                if current_user["permissions"] != existing_user["permissions"]:
                    msg = f"\nPermissions for user {email} differ."
                    msg += f"\nIn GCP: {current_user['permissions']}"
                    msg += f"\nIn {self.users_file}: {existing_user['permissions']}"
                    self.logger.info(msg)
                    differences.append(msg)

        return differences

    def create_announcement(self, recipient: str) -> None:
        """
        Creates an announcement about compliance issues using the SendingClient.

        Args:
            recipient (str): The email address of the announcement recipient.
        """
        if not self.sending_client:
            raise ValueError("SendingClient is required for creating announcements")
            
        diff = self.check_compliance()

        if not diff:
            self.logger.info("No compliance issues found, no announcement will be created.")
            return

        title = f"IAM Policy Non-Compliance Detected"
        body = f"IAM policy for project {self.project_id} is not compliant with the defined policies on {self.users_file}\n\n"
        for issue in diff:
            body += f"- {issue}\n"

        announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
        announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n"
        announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations."

        self.sending_client.create_announcement(title, body, recipient, announcement)

    def print_announcement(self, recipient: str) -> None:
        """
        Prints announcement details instead of sending them (for testing purposes).
        
        Args:
            recipient (str): The email address of the announcement recipient.
        """
        if not self.sending_client:
            raise ValueError("SendingClient is required for printing announcements")
            
        diff = self.check_compliance()

        if not diff:
            self.logger.info("No compliance issues found, no announcement will be printed.")
            return

        title = f"IAM Policy Non-Compliance Detected"
        body = f"IAM policy for project {self.project_id} is not compliant with the defined policies on {self.users_file}\n\n"
        for issue in diff:
            body += f"- {issue}\n"

        announcement = f"Dear team,\n\nThis is an automated notification about compliance issues detected in the IAM policy for project {self.project_id}.\n\n"
        announcement += f"We found {len(diff)} compliance issue(s) that need your attention.\n"
        announcement += f"\nPlease check the GitHub issue for detailed information and take appropriate action to resolve these compliance violations."

        self.sending_client.print_announcement(title, body, recipient, announcement)
    
    def generate_compliance(self) -> None:
        """
        Modifies the users file to match the current IAM policy.
        If no changes are needed, no file will be written.
        """
        
        try:
            diff = self.check_compliance()
        except RuntimeError:
            self.logger.info("No existing IAM policy found.")
            diff = ["No existing policy found"]

        if not diff or (len(diff) == 1 and "No existing policy found" not in diff[0]):
            self.logger.info("No compliance issues found, no changes will be made.")
            return

        current_policy = self._export_project_iam()
        header_info = f"IAM policy for project {self.project_id}"
        
        self._to_yaml_file(current_policy, self.users_file, header_info)
        self.logger.info(f"Generated new compliance file: {self.users_file}")

def config_process() -> Dict[str, str]:
    with open(CONFIG_FILE, "r") as file:
        config = yaml.safe_load(file)

    if not config:
        raise ValueError("Configuration file is empty or invalid.")
    
    config_res = dict()

    config_res["project_id"] = config.get("project_id", "apache-beam-testing")
    config_res["logging_level"] = config.get("logging", {}).get("level", "INFO")
    config_res["logging_format"] = config.get("logging", {}).get("format", "[%(asctime)s] %(levelname)s: %(message)s")
    config_res["users_file"] = config.get("users_file", "../iam/users.yml")
    config_res["action"] = config.get("action", "check")

    # SendingClient configuration
    config_res["github_token"] = os.getenv("GITHUB_TOKEN", "")
    config_res["github_repo"] = os.getenv("GITHUB_REPOSITORY", "apache/beam")
    config_res["smtp_server"] = os.getenv("SMTP_SERVER", "")
    config_res["smtp_port"] = os.getenv("SMTP_PORT", 587)
    config_res["email"] = os.getenv("EMAIL_ADDRESS", "")
    config_res["password"] = os.getenv("EMAIL_PASSWORD", "")
    config_res["recipient"] = os.getenv("EMAIL_RECIPIENT", "")

    return config_res

def main():
    # Parse command line arguments
    parser = argparse.ArgumentParser(description="IAM Policy Compliance Checker")
    parser.add_argument("--action", choices=["check", "announce", "print", "generate"], 
                       help="Action to perform: check compliance, create announcement, print announcement, or generate new compliance")
    args = parser.parse_args()

    config = config_process()

    # Command line argument takes precedence over config file
    action = args.action if args.action else config.get("action", "check")

    logging.basicConfig(level=getattr(logging, config["logging_level"].upper(), logging.INFO),
                        format=config["logging_format"])
    logger = logging.getLogger("IAMPolicyComplianceChecker")

    # Create SendingClient if needed for announcement actions
    sending_client = None
    if action in ["announce", "print"]:
        try:
            # Provide default values for testing, especially for print action
            github_token = config["github_token"] or "dummy-token"
            github_repo = config["github_repo"] or "dummy/repo"
            smtp_server = config["smtp_server"] or "dummy-server"
            smtp_port = int(config["smtp_port"]) if config["smtp_port"] else 587
            email = config["email"] or "dummy@example.com"
            password = config["password"] or "dummy-password"
            
            sending_client = SendingClient(
                logger=logger,
                github_token=github_token,
                github_repo=github_repo,
                smtp_server=smtp_server,
                smtp_port=smtp_port,
                email=email,
                password=password
            )
        except Exception as e:
            logger.error(f"Failed to initialize SendingClient: {e}")
            return 1

    logger.info(f"Starting IAM policy compliance check with action: {action}")
    iam_checker = IAMPolicyComplianceChecker(config["project_id"], config["users_file"], logger, sending_client)

    try:
        if action == "check":
            compliance_issues = iam_checker.check_compliance()
            if compliance_issues:
                logger.warning("IAM policy compliance issues found:")
                for issue in compliance_issues:
                    logger.warning(issue)
            else:
                logger.info("IAM policy is compliant.")
        elif action == "announce":
            logger.info("Creating announcement for compliance violations...")
            recipient = config["recipient"] or "admin@example.com"
            iam_checker.create_announcement(recipient)
        elif action == "print":
            logger.info("Printing announcement for compliance violations...")
            recipient = config["recipient"] or "admin@example.com"
            iam_checker.print_announcement(recipient)
        elif action == "generate":
            logger.info("Generating new compliance based on current IAM policy...")
            iam_checker.generate_compliance()
        else:
            logger.error(f"Unknown action: {action}")
            return 1
    except Exception as e:
        logger.error(f"Error executing action '{action}': {e}")
        return 1

    return 0

if __name__ == "__main__":
    
    sys.exit(main())
