def execute()

in providers/sftp/src/airflow/providers/sftp/operators/sftp.py [0:0]


    def execute(self, context: Any) -> str | list[str] | None:
        if self.local_filepath is None:
            local_filepath_array = []
        elif isinstance(self.local_filepath, str):
            local_filepath_array = [self.local_filepath]
        else:
            local_filepath_array = self.local_filepath

        if isinstance(self.remote_filepath, str):
            remote_filepath_array = [self.remote_filepath]
        else:
            remote_filepath_array = self.remote_filepath

        if self.operation.lower() in (SFTPOperation.GET, SFTPOperation.PUT) and len(
            local_filepath_array
        ) != len(remote_filepath_array):
            raise ValueError(
                f"{len(local_filepath_array)} paths in local_filepath "
                f"!= {len(remote_filepath_array)} paths in remote_filepath"
            )

        if self.operation.lower() == SFTPOperation.DELETE and local_filepath_array:
            raise ValueError("local_filepath should not be provided for delete operation")

        if self.operation.lower() not in (SFTPOperation.GET, SFTPOperation.PUT, SFTPOperation.DELETE):
            raise TypeError(
                f"Unsupported operation value {self.operation}, "
                f"expected {SFTPOperation.GET} or {SFTPOperation.PUT} or {SFTPOperation.DELETE}."
            )

        if self.concurrency < 1:
            raise ValueError(f"concurrency should be greater than 0, got {self.concurrency}")

        file_msg = None
        try:
            if self.ssh_conn_id:
                if self.sftp_hook and isinstance(self.sftp_hook, SFTPHook):
                    self.log.info("ssh_conn_id is ignored when sftp_hook is provided.")
                else:
                    self.log.info("sftp_hook not provided or invalid. Trying ssh_conn_id to create SFTPHook.")
                    self.sftp_hook = SFTPHook(ssh_conn_id=self.ssh_conn_id)

            if not self.sftp_hook:
                raise AirflowException("Cannot operate without sftp_hook or ssh_conn_id.")

            if self.remote_host is not None:
                self.log.info(
                    "remote_host is provided explicitly. "
                    "It will replace the remote_host which was defined "
                    "in sftp_hook or predefined in connection of ssh_conn_id."
                )
                self.sftp_hook.remote_host = self.remote_host

            if self.operation.lower() in (SFTPOperation.GET, SFTPOperation.PUT):
                for _local_filepath, _remote_filepath in zip(local_filepath_array, remote_filepath_array):
                    if self.operation.lower() == SFTPOperation.GET:
                        local_folder = os.path.dirname(_local_filepath)
                        if self.create_intermediate_dirs:
                            Path(local_folder).mkdir(parents=True, exist_ok=True)
                        file_msg = f"from {_remote_filepath} to {_local_filepath}"
                        self.log.info("Starting to transfer %s", file_msg)
                        if self.sftp_hook.isdir(_remote_filepath):
                            if self.concurrency > 1:
                                self.sftp_hook.retrieve_directory_concurrently(
                                    _remote_filepath,
                                    _local_filepath,
                                    workers=self.concurrency,
                                )
                            elif self.concurrency == 1:
                                self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath)
                        else:
                            self.sftp_hook.retrieve_file(_remote_filepath, _local_filepath)
                    elif self.operation.lower() == SFTPOperation.PUT:
                        remote_folder = os.path.dirname(_remote_filepath)
                        if self.create_intermediate_dirs:
                            self.sftp_hook.create_directory(remote_folder)
                        file_msg = f"from {_local_filepath} to {_remote_filepath}"
                        self.log.info("Starting to transfer file %s", file_msg)
                        if os.path.isdir(_local_filepath):
                            if self.concurrency > 1:
                                self.sftp_hook.store_directory_concurrently(
                                    _remote_filepath,
                                    _local_filepath,
                                    confirm=self.confirm,
                                    workers=self.concurrency,
                                )
                            elif self.concurrency == 1:
                                self.sftp_hook.store_directory(
                                    _remote_filepath, _local_filepath, confirm=self.confirm
                                )
                        else:
                            self.sftp_hook.store_file(_remote_filepath, _local_filepath, confirm=self.confirm)
            elif self.operation.lower() == SFTPOperation.DELETE:
                for _remote_filepath in remote_filepath_array:
                    file_msg = f"{_remote_filepath}"
                    self.log.info("Starting to delete %s", file_msg)
                    if self.sftp_hook.isdir(_remote_filepath):
                        self.sftp_hook.delete_directory(_remote_filepath, include_files=True)
                    else:
                        self.sftp_hook.delete_file(_remote_filepath)

        except Exception as e:
            raise AirflowException(
                f"Error while processing {self.operation.upper()} operation {file_msg}, error: {e}"
            )

        return self.local_filepath