in providers/sftp/src/airflow/providers/sftp/operators/sftp.py [0:0]
def execute(self, context: Any) -> str | list[str] | None:
if self.local_filepath is None:
local_filepath_array = []
elif isinstance(self.local_filepath, str):
local_filepath_array = [self.local_filepath]
else:
local_filepath_array = self.local_filepath
if isinstance(self.remote_filepath, str):
remote_filepath_array = [self.remote_filepath]
else:
remote_filepath_array = self.remote_filepath
if self.operation.lower() in (SFTPOperation.GET, SFTPOperation.PUT) and len(
local_filepath_array
) != len(remote_filepath_array):
raise ValueError(
f"{len(local_filepath_array)} paths in local_filepath "
f"!= {len(remote_filepath_array)} paths in remote_filepath"
)
if self.operation.lower() == SFTPOperation.DELETE and local_filepath_array:
raise ValueError("local_filepath should not be provided for delete operation")
if self.operation.lower() not in (SFTPOperation.GET, SFTPOperation.PUT, SFTPOperation.DELETE):
raise TypeError(
f"Unsupported operation value {self.operation}, "
f"expected {SFTPOperation.GET} or {SFTPOperation.PUT} or {SFTPOperation.DELETE}."
)
if self.concurrency < 1:
raise ValueError(f"concurrency should be greater than 0, got {self.concurrency}")
file_msg = None
try:
if self.ssh_conn_id:
if self.sftp_hook and isinstance(self.sftp_hook, SFTPHook):
self.log.info("ssh_conn_id is ignored when sftp_hook is provided.")
else:
self.log.info("sftp_hook not provided or invalid. Trying ssh_conn_id to create SFTPHook.")
self.sftp_hook = SFTPHook(ssh_conn_id=self.ssh_conn_id)
if not self.sftp_hook:
raise AirflowException("Cannot operate without sftp_hook or ssh_conn_id.")
if self.remote_host is not None:
self.log.info(
"remote_host is provided explicitly. "
"It will replace the remote_host which was defined "
"in sftp_hook or predefined in connection of ssh_conn_id."
)
self.sftp_hook.remote_host = self.remote_host
if self.operation.lower() in (SFTPOperation.GET, SFTPOperation.PUT):
for _local_filepath, _remote_filepath in zip(local_filepath_array, remote_filepath_array):
if self.operation.lower() == SFTPOperation.GET:
local_folder = os.path.dirname(_local_filepath)
if self.create_intermediate_dirs:
Path(local_folder).mkdir(parents=True, exist_ok=True)
file_msg = f"from {_remote_filepath} to {_local_filepath}"
self.log.info("Starting to transfer %s", file_msg)
if self.sftp_hook.isdir(_remote_filepath):
if self.concurrency > 1:
self.sftp_hook.retrieve_directory_concurrently(
_remote_filepath,
_local_filepath,
workers=self.concurrency,
)
elif self.concurrency == 1:
self.sftp_hook.retrieve_directory(_remote_filepath, _local_filepath)
else:
self.sftp_hook.retrieve_file(_remote_filepath, _local_filepath)
elif self.operation.lower() == SFTPOperation.PUT:
remote_folder = os.path.dirname(_remote_filepath)
if self.create_intermediate_dirs:
self.sftp_hook.create_directory(remote_folder)
file_msg = f"from {_local_filepath} to {_remote_filepath}"
self.log.info("Starting to transfer file %s", file_msg)
if os.path.isdir(_local_filepath):
if self.concurrency > 1:
self.sftp_hook.store_directory_concurrently(
_remote_filepath,
_local_filepath,
confirm=self.confirm,
workers=self.concurrency,
)
elif self.concurrency == 1:
self.sftp_hook.store_directory(
_remote_filepath, _local_filepath, confirm=self.confirm
)
else:
self.sftp_hook.store_file(_remote_filepath, _local_filepath, confirm=self.confirm)
elif self.operation.lower() == SFTPOperation.DELETE:
for _remote_filepath in remote_filepath_array:
file_msg = f"{_remote_filepath}"
self.log.info("Starting to delete %s", file_msg)
if self.sftp_hook.isdir(_remote_filepath):
self.sftp_hook.delete_directory(_remote_filepath, include_files=True)
else:
self.sftp_hook.delete_file(_remote_filepath)
except Exception as e:
raise AirflowException(
f"Error while processing {self.operation.upper()} operation {file_msg}, error: {e}"
)
return self.local_filepath