def execute()

in providers/google/src/airflow/providers/google/cloud/operators/gcs.py [0:0]


    def execute(self, context: Context) -> list[str]:
        # Define intervals and prefixes.
        orig_start = context["data_interval_start"]
        orig_end = context["data_interval_end"]

        if orig_start is None or orig_end is None:
            raise RuntimeError("`data_interval_start` & `data_interval_end` must not be None")

        if not isinstance(orig_start, pendulum.DateTime):
            orig_start = pendulum.instance(orig_start)

        if not isinstance(orig_end, pendulum.DateTime):
            orig_end = pendulum.instance(orig_end)

        timespan_start = orig_start
        if orig_start >= orig_end:  # Airflow 2.2 sets start == end for non-perodic schedules.
            self.log.warning("DAG schedule not periodic, setting timespan end to max %s", orig_end)
            timespan_end = pendulum.instance(datetime.datetime.max)
        else:
            timespan_end = orig_end

        timespan_start = timespan_start.in_timezone(timezone.utc)
        timespan_end = timespan_end.in_timezone(timezone.utc)

        self._source_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
            self.source_prefix,
            timespan_start,
        )
        self._destination_prefix_interp = GCSTimeSpanFileTransformOperator.interpolate_prefix(
            self.destination_prefix,
            timespan_start,
        )

        source_hook = GCSHook(
            gcp_conn_id=self.source_gcp_conn_id,
            impersonation_chain=self.source_impersonation_chain,
        )
        destination_hook = GCSHook(
            gcp_conn_id=self.destination_gcp_conn_id,
            impersonation_chain=self.destination_impersonation_chain,
        )
        StorageLink.persist(
            context=context,
            task_instance=self,
            uri=self.destination_bucket,
            project_id=destination_hook.project_id,
        )

        # Fetch list of files.
        blobs_to_transform = source_hook.list_by_timespan(
            bucket_name=self.source_bucket,
            prefix=self._source_prefix_interp,
            timespan_start=timespan_start,
            timespan_end=timespan_end,
        )

        with TemporaryDirectory() as temp_input_dir, TemporaryDirectory() as temp_output_dir:
            temp_input_dir_path = Path(temp_input_dir)
            temp_output_dir_path = Path(temp_output_dir)

            # TODO: download in parallel.
            for blob_to_transform in blobs_to_transform:
                destination_file = temp_input_dir_path / blob_to_transform
                destination_file.parent.mkdir(parents=True, exist_ok=True)
                try:
                    source_hook.download(
                        bucket_name=self.source_bucket,
                        object_name=blob_to_transform,
                        filename=str(destination_file),
                        chunk_size=self.chunk_size,
                        num_max_attempts=self.download_num_attempts,
                    )
                except GoogleCloudError:
                    if not self.download_continue_on_fail:
                        raise

            self.log.info("Starting the transformation")
            cmd = [self.transform_script] if isinstance(self.transform_script, str) else self.transform_script
            cmd += [
                str(temp_input_dir_path),
                str(temp_output_dir_path),
                timespan_start.replace(microsecond=0).isoformat(),
                timespan_end.replace(microsecond=0).isoformat(),
            ]
            with subprocess.Popen(
                args=cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True
            ) as process:
                self.log.info("Process output:")
                if process.stdout:
                    for line in iter(process.stdout.readline, b""):
                        self.log.info(line.decode(self.output_encoding).rstrip())

                process.wait()
                if process.returncode:
                    raise AirflowException(f"Transform script failed: {process.returncode}")

            self.log.info("Transformation succeeded. Output temporarily located at %s", temp_output_dir_path)

            files_uploaded = []

            # TODO: upload in parallel.
            for upload_file in temp_output_dir_path.glob("**/*"):
                if upload_file.is_dir():
                    continue

                upload_file_name = str(upload_file.relative_to(temp_output_dir_path))

                if self._destination_prefix_interp is not None:
                    upload_file_name = f"{self._destination_prefix_interp.rstrip('/')}/{upload_file_name}"

                self.log.info("Uploading file %s to %s", upload_file, upload_file_name)

                try:
                    destination_hook.upload(
                        bucket_name=self.destination_bucket,
                        object_name=upload_file_name,
                        filename=str(upload_file),
                        chunk_size=self.chunk_size,
                        num_max_attempts=self.upload_num_attempts,
                    )
                    files_uploaded.append(str(upload_file_name))
                except GoogleCloudError:
                    if not self.upload_continue_on_fail:
                        raise

            return files_uploaded