src/lambda_codebase/initial_commit/bootstrap_repository/adf-bootstrap/deployment/lambda_codebase/initial_commit/initial_commit.py [44:203]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    TargetRegions: Optional[List[str]] = None
    NotificationEndpoint: Optional[str] = None
    NotificationEndpointType: Optional[str] = None
    ProtectedOUs: Optional[List[str]] = None

    def __post_init__(self):
        if self.NotificationEndpoint:
            self.NotificationEndpointType = (
                "email"
                if "@"
                in self.NotificationEndpoint  # pylint:disable=unsupported-membership-test
                else "slack"
            )


@dataclass
class Event:
    RequestType: str
    ServiceToken: str
    ResponseURL: str
    StackId: str
    RequestId: str
    ResourceType: str
    LogicalResourceId: str
    ResourceProperties: CustomResourceProperties

    def __post_init__(self):
        self.ResourceProperties = CustomResourceProperties(
            **self.ResourceProperties # pylint: disable=not-a-mapping
        )


class FileMode(Enum):
    EXECUTABLE = "EXECUTABLE"
    NORMAL = "NORMAL"
    SYMLINK = "SYMLINK"


@dataclass
class FileToCommit:
    filePath: str
    fileMode: FileMode
    fileContent: bytes

    def as_dict(self) -> Dict[str, Union[str, bytes]]:
        return {
            "filePath": self.filePath,
            "fileMode": self.fileMode.value,
            "fileContent": self.fileContent,
        }

@dataclass
class FileToDelete:
    filePath: str

    def as_dict(self) -> Dict[str, Union[str, bytes]]:
        return {
            "filePath": self.filePath
        }

@dataclass
class CreateEvent(Event):
    pass

@dataclass
class UpdateEvent(Event):
    PhysicalResourceId: str
    OldResourceProperties: CustomResourceProperties

    def __post_init__(self):
        self.ResourceProperties = CustomResourceProperties(
            **self.ResourceProperties # pylint: disable=not-a-mapping
        )
        self.OldResourceProperties = CustomResourceProperties(
            **self.OldResourceProperties # pylint: disable=not-a-mapping
        )

def generate_create_branch_input(event, repo_name, commit_id):
    return {
        "repositoryName": repo_name,
        "branchName": event.ResourceProperties.Version,
        "commitId": commit_id
    }

def generate_delete_branch_input(event, repo_name):
    return {
        "repositoryName": repo_name,
        "branchName": event.ResourceProperties.Version
    }

def chunks(list_to_chunk, number_to_chunk_into):
    number_of_chunks = max(1, number_to_chunk_into)
    return (list_to_chunk[item:item + number_of_chunks] for item in range(0, len(list_to_chunk), number_of_chunks))

def generate_pull_request_input(event, repo_name):
    return {
        "title": f'ADF {event.ResourceProperties.Version} Automated Update PR',
        "description": PR_DESCRIPTION.format(event.ResourceProperties.Version),
        "targets": [
            {
                'repositoryName': repo_name,
                'sourceReference': event.ResourceProperties.Version,
                'destinationReference': 'master'
            },
        ]
    }

def generate_commit_input(repo_name, index, branch="master", parent_commit_id=None, puts=None, deletes=None):
    commit_action = "Delete" if deletes else "Create"
    output = {
        "repositoryName": repo_name,
        "branchName": branch,
        "authorName": "AWS ADF Builders Team",
        "email": "adf-builders@amazon.com",
        "commitMessage": f"Automated Commit - {commit_action} Part {index}",
        "putFiles": puts if puts else [],
        "deleteFiles": deletes if deletes else []
    }
    if parent_commit_id:
        output["parentCommitId"] = parent_commit_id
    return output

@create()
def create_(event: Mapping[str, Any], _context: Any) -> Tuple[Union[None, PhysicalResourceId], Data]:
    create_event = CreateEvent(**event)
    repo_name = repo_arn_to_name(create_event.ResourceProperties.RepositoryArn)
    directory = create_event.ResourceProperties.DirectoryName
    try:
        commit_id = CC_CLIENT.get_branch(
            repositoryName=repo_name,
            branchName="master",
        )["branch"]["commitId"]
        CC_CLIENT.create_branch(
            repositoryName=repo_name,
            branchName=create_event.ResourceProperties.Version,
            commitId=commit_id
        )
        # CodeCommit only allows 100 files per commit, so we chunk them up here
        for index, files in enumerate(chunks([f.as_dict() for f in get_files_to_commit(directory)], 99)):
            if index == 0:
                commit_id = CC_CLIENT.create_commit(
                    **generate_commit_input(repo_name, index, puts=files)
                )["commitId"]
            else:
                commit_id = CC_CLIENT.create_commit(
                    **generate_commit_input(repo_name, index, puts=files, parent_commit_id=commit_id)
                )["commitId"]

        CC_CLIENT.create_pull_request(
            **generate_pull_request_input(create_event, repo_name)
        )
        return event.get("PhysicalResourceId"), {}

    except (CC_CLIENT.exceptions.FileEntryRequiredException, CC_CLIENT.exceptions.NoChangeException):
        CC_CLIENT.delete_branch(**generate_delete_branch_input(create_event, repo_name))
        return event.get("PhysicalResourceId"), {}

    except CC_CLIENT.exceptions.BranchDoesNotExistException:
        files_to_commit = get_files_to_commit(directory)
        if directory == "bootstrap_repository":
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



src/lambda_codebase/initial_commit/initial_commit.py [48:207]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    TargetRegions: Optional[List[str]] = None
    NotificationEndpoint: Optional[str] = None
    NotificationEndpointType: Optional[str] = None
    ProtectedOUs: Optional[List[str]] = None

    def __post_init__(self):
        if self.NotificationEndpoint:
            self.NotificationEndpointType = (
                "email"
                if "@"
                in self.NotificationEndpoint  # pylint:disable=unsupported-membership-test
                else "slack"
            )


@dataclass
class Event:
    RequestType: str
    ServiceToken: str
    ResponseURL: str
    StackId: str
    RequestId: str
    ResourceType: str
    LogicalResourceId: str
    ResourceProperties: CustomResourceProperties

    def __post_init__(self):
        self.ResourceProperties = CustomResourceProperties(
            **self.ResourceProperties # pylint: disable=not-a-mapping
        )


class FileMode(Enum):
    EXECUTABLE = "EXECUTABLE"
    NORMAL = "NORMAL"
    SYMLINK = "SYMLINK"


@dataclass
class FileToCommit:
    filePath: str
    fileMode: FileMode
    fileContent: bytes

    def as_dict(self) -> Dict[str, Union[str, bytes]]:
        return {
            "filePath": self.filePath,
            "fileMode": self.fileMode.value,
            "fileContent": self.fileContent,
        }

@dataclass
class FileToDelete:
    filePath: str

    def as_dict(self) -> Dict[str, Union[str, bytes]]:
        return {
            "filePath": self.filePath
        }

@dataclass
class CreateEvent(Event):
    pass

@dataclass
class UpdateEvent(Event):
    PhysicalResourceId: str
    OldResourceProperties: CustomResourceProperties

    def __post_init__(self):
        self.ResourceProperties = CustomResourceProperties(
            **self.ResourceProperties # pylint: disable=not-a-mapping
        )
        self.OldResourceProperties = CustomResourceProperties(
            **self.OldResourceProperties # pylint: disable=not-a-mapping
        )

def generate_create_branch_input(event, repo_name, commit_id):
    return {
        "repositoryName": repo_name,
        "branchName": event.ResourceProperties.Version,
        "commitId": commit_id
    }

def generate_delete_branch_input(event, repo_name):
    return {
        "repositoryName": repo_name,
        "branchName": event.ResourceProperties.Version
    }

def chunks(list_to_chunk, number_to_chunk_into):
    number_of_chunks = max(1, number_to_chunk_into)
    return (list_to_chunk[item:item + number_of_chunks] for item in range(0, len(list_to_chunk), number_of_chunks))

def generate_pull_request_input(event, repo_name):
    return {
        "title": f'ADF {event.ResourceProperties.Version} Automated Update PR',
        "description": PR_DESCRIPTION.format(event.ResourceProperties.Version),
        "targets": [
            {
                'repositoryName': repo_name,
                'sourceReference': event.ResourceProperties.Version,
                'destinationReference': 'master'
            },
        ]
    }

def generate_commit_input(repo_name, index, branch="master", parent_commit_id=None, puts=None, deletes=None):
    commit_action = "Delete" if deletes else "Create"
    output = {
        "repositoryName": repo_name,
        "branchName": branch,
        "authorName": "AWS ADF Builders Team",
        "email": "adf-builders@amazon.com",
        "commitMessage": f"Automated Commit - {commit_action} Part {index}",
        "putFiles": puts if puts else [],
        "deleteFiles": deletes if deletes else []
    }
    if parent_commit_id:
        output["parentCommitId"] = parent_commit_id
    return output

@create()
def create_(event: Mapping[str, Any], _context: Any) -> Tuple[Union[None, PhysicalResourceId], Data]:
    create_event = CreateEvent(**event)
    repo_name = repo_arn_to_name(create_event.ResourceProperties.RepositoryArn)
    directory = create_event.ResourceProperties.DirectoryName
    try:
        commit_id = CC_CLIENT.get_branch(
            repositoryName=repo_name,
            branchName="master",
        )["branch"]["commitId"]
        CC_CLIENT.create_branch(
            repositoryName=repo_name,
            branchName=create_event.ResourceProperties.Version,
            commitId=commit_id
        )
        # CodeCommit only allows 100 files per commit, so we chunk them up here
        for index, files in enumerate(chunks([f.as_dict() for f in get_files_to_commit(directory)], 99)):
            if index == 0:
                commit_id = CC_CLIENT.create_commit(
                    **generate_commit_input(repo_name, index, puts=files)
                )["commitId"]
            else:
                commit_id = CC_CLIENT.create_commit(
                    **generate_commit_input(repo_name, index, puts=files, parent_commit_id=commit_id)
                )["commitId"]

        CC_CLIENT.create_pull_request(
            **generate_pull_request_input(create_event, repo_name)
        )
        return event.get("PhysicalResourceId"), {}

    except (CC_CLIENT.exceptions.FileEntryRequiredException, CC_CLIENT.exceptions.NoChangeException):
        CC_CLIENT.delete_branch(**generate_delete_branch_input(create_event, repo_name))
        return event.get("PhysicalResourceId"), {}

    except CC_CLIENT.exceptions.BranchDoesNotExistException:
        files_to_commit = get_files_to_commit(directory)
        if directory == "bootstrap_repository":
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



