in providers/google/src/airflow/providers/google/cloud/operators/gcs.py [0:0]
def execute(self, context: Context) -> list[str]:
# Define intervals and prefixes.
orig_start = context["data_interval_start"]
orig_end = context["data_interval_end"]
if orig_start is None or orig_end is None:
raise RuntimeError("`data_interval_start` & `data_interval_end` must not be None")
if not isinstance(orig_start, pendulum.DateTime):
orig_start = pendulum.instance(orig_start)
if not isinstance(orig_end, pendulum.DateTime):
orig_end = pendulum.instance(orig_end)
timespan_start = orig_start
if orig_start >= orig_end: # Airflow 2.2 sets start == end for non-perodic schedules.
self.log.warning("DAG schedule not periodic, setting timespan end to max %s", orig_end)
timespan_end = pendulum.instance(datetime.datetime.max)
else:
timespan_end = orig_end
timespan_start = timespan_start.in_timezone(timezone.utc)
timespan_end = timespan_end.in_timezone(timezone.utc)
self._source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
self.source_prefix,
timespan_start,
)
self._destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
self.destination_prefix,
timespan_start,
)
source_hook = GCSHook(
gcp_conn_id=self.source_gcp_conn_id,
impersonation_chain=self.source_impersonation_chain,
)
destination_hook = GCSHook(
gcp_conn_id=self.destination_gcp_conn_id,
impersonation_chain=self.destination_impersonation_chain,
)
StorageLink.persist(
context=context,
task_instance=self,
uri=self.destination_bucket,
project_id=destination_hook.project_id,
)
# Fetch list of files.
blobs_to_transform = source_hook.list_by_timespan(
bucket_name=self.source_bucket,
prefix=self._source_prefix_interp,
timespan_start=timespan_start,
timespan_end=timespan_end,
)
with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as temp_output_dir:
temp_input_dir_path = Path(temp_input_dir)
temp_output_dir_path = Path(temp_output_dir)
# TODO: download in parallel.
for blob_to_transform in blobs_to_transform:
destination_file = temp_input_dir_path / blob_to_transform
destination_file.parent.mkdir(parents=True, exist_ok=True)
try:
source_hook.download(
bucket_name=self.source_bucket,
object_name=blob_to_transform,
filename=str(destination_file),
chunk_size=self.chunk_size,
num_max_attempts=self.download_num_attempts,
)
except GoogleCloudError:
if not self.download_continue_on_fail:
raise
self.log.info("Starting the transformation")
cmd = [self.transform_script] if isinstance(self.transform_script, str) else self.transform_script
cmd += [
str(temp_input_dir_path),
str(temp_output_dir_path),
timespan_start.replace(microsecond=0).isoformat(),
timespan_end.replace(microsecond=0).isoformat(),
]
with subprocess.Popen(
args=cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True
) as process:
self.log.info("Process output:")
if process.stdout:
for line in iter(process.stdout.readline, b""):
self.log.info(line.decode(self.output_encoding).rstrip())
process.wait()
if process.returncode:
raise AirflowException(f"Transform script failed: {process.returncode}")
self.log.info("Transformation succeeded. Output temporarily located at %s", temp_output_dir_path)
files_uploaded = []
# TODO: upload in parallel.
for upload_file in temp_output_dir_path.glob("**/*"):
if upload_file.is_dir():
continue
upload_file_name = str(upload_file.relative_to(temp_output_dir_path))
if self._destination_prefix_interp is not None:
upload_file_name = f"{self._destination_prefix_interp.rstrip('/')}/{upload_file_name}"
self.log.info("Uploading file %s to %s", upload_file, upload_file_name)
try:
destination_hook.upload(
bucket_name=self.destination_bucket,
object_name=upload_file_name,
filename=str(upload_file),
chunk_size=self.chunk_size,
num_max_attempts=self.upload_num_attempts,
)
files_uploaded.append(str(upload_file_name))
except GoogleCloudError:
if not self.upload_continue_on_fail:
raise
return files_uploaded