def sync()

in samcli/lib/sync/flows/zip_function_sync_flow.py [0:0]


    def sync(self) -> None:
        if not self._zip_file:
            LOG.debug("%sSkipping Sync. ZIP file is None.", self.log_prefix)
            return

        zip_file_size = os.path.getsize(self._zip_file)
        if zip_file_size < MAXIMUM_FUNCTION_ZIP_SIZE:
            # Direct upload through Lambda API
            LOG.debug("%sUploading Function Directly", self.log_prefix)
            with open(self._zip_file, "rb") as zip_file:
                data = zip_file.read()

                with ExitStack() as exit_stack:
                    if self.has_locks():
                        exit_stack.enter_context(self._get_lock_chain())

                    self._lambda_client.update_function_code(
                        FunctionName=self.get_physical_id(self._function_identifier), ZipFile=data
                    )

                    # We need to wait for the cloud side update to finish
                    # Otherwise even if the call is finished and lockchain is released
                    # It is still possible that we have a race condition on cloud updating the same function
                    wait_for_function_update_complete(
                        self._lambda_client, self.get_physical_id(self._function_identifier)
                    )

        else:
            # Upload to S3 first for oversized ZIPs
            LOG.debug("%sUploading Function Through S3", self.log_prefix)
            uploader = S3Uploader(
                s3_client=self._s3_client,
                bucket_name=self._deploy_context.s3_bucket,
                prefix=self._deploy_context.s3_prefix,
                kms_key_id=self._deploy_context.kms_key_id,
                force_upload=True,
                no_progressbar=True,
            )
            s3_url = uploader.upload_with_dedup(self._zip_file)
            s3_key = s3_url[5:].split("/", 1)[1]

            with ExitStack() as exit_stack:
                if self.has_locks():
                    exit_stack.enter_context(self._get_lock_chain())

                self._lambda_client.update_function_code(
                    FunctionName=self.get_physical_id(self._function_identifier),
                    S3Bucket=self._deploy_context.s3_bucket,
                    S3Key=s3_key,
                )

                # We need to wait for the cloud side update to finish
                # Otherwise even if the call is finished and lockchain is released
                # It is still possible that we have a race condition on cloud updating the same function
                wait_for_function_update_complete(self._lambda_client, self.get_physical_id(self._function_identifier))

        if os.path.exists(self._zip_file):
            os.remove(self._zip_file)