in providers/google/src/airflow/providers/google/cloud/operators/dataproc.py [0:0]
def execute(self, context: Context):
if self.asynchronous and self.deferrable:
raise AirflowException(
"Both asynchronous and deferrable parameters were passed. Please, provide only one."
)
batch_id: str = ""
if self.batch_id:
batch_id = self.batch_id
self.log.info("Starting batch %s", batch_id)
# Persist the link earlier so users can observe the progress
DataprocBatchLink.persist(
context=context,
operator=self,
project_id=self.project_id,
region=self.region,
batch_id=self.batch_id,
)
else:
self.log.info("Starting batch. The batch ID will be generated since it was not provided.")
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self._inject_openlineage_properties_into_dataproc_batch(context)
self.__update_batch_labels()
try:
self.operation = self.hook.create_batch(
region=self.region,
project_id=self.project_id,
batch=self.batch,
batch_id=self.batch_id,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
except AlreadyExists:
self.log.info("Batch with given id already exists.")
self.log.info("Attaching to the job %s if it is still running.", batch_id)
else:
if self.operation and self.operation.metadata:
batch_id = self.operation.metadata.batch.split("/")[-1]
else:
raise AirflowException("Operation metadata is not available.")
self.log.info("The batch %s was created.", batch_id)
DataprocBatchLink.persist(
context=context,
operator=self,
project_id=self.project_id,
region=self.region,
batch_id=batch_id,
)
if self.asynchronous:
batch = self.hook.get_batch(
batch_id=batch_id,
region=self.region,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
self.log.info("The batch %s was created asynchronously. Exiting.", batch_id)
return Batch.to_dict(batch)
if self.deferrable:
self.defer(
trigger=DataprocBatchTrigger(
batch_id=batch_id,
project_id=self.project_id,
region=self.region,
gcp_conn_id=self.gcp_conn_id,
impersonation_chain=self.impersonation_chain,
polling_interval_seconds=self.polling_interval_seconds,
),
method_name="execute_complete",
)
self.log.info("Waiting for the completion of batch job %s", batch_id)
batch = self.hook.wait_for_batch(
batch_id=batch_id,
region=self.region,
project_id=self.project_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
if self.num_retries_if_resource_is_not_ready and self.hook.check_error_for_resource_is_not_ready_msg(
batch.state_message
):
attempt = self.num_retries_if_resource_is_not_ready
while attempt > 0:
attempt -= 1
batch, batch_id = self.retry_batch_creation(batch_id)
if not self.hook.check_error_for_resource_is_not_ready_msg(batch.state_message):
break
self.handle_batch_status(context, batch.state, batch_id, batch.state_message)
return Batch.to_dict(batch)