tools/cloud-composer-stress-testing/cloud-composer-workload-simulator/dags/sample/dag_0.py (132 lines of code) (raw):
# -------------------------------------------------
# Base Taskflow Imports
# -------------------------------------------------
# -------------------------------------------------
# Google Cloud Taskflow Imports
# -------------------------------------------------
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
BranchPythonOperator,
PythonOperator,
)
from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryDeleteDatasetOperator,
BigQueryInsertJobOperator,
)
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateBatchOperator,
DataprocCreateClusterOperator,
DataprocDeleteClusterOperator,
DataprocSubmitJobOperator,
)
from airflow.providers.google.cloud.operators.gcs import (
GCSCreateBucketOperator,
GCSDeleteBucketOperator,
)
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKECreateClusterOperator,
GKEDeleteClusterOperator,
GKEStartPodOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator
# -------------------------------------------------
# Begin DAG
# -------------------------------------------------
with DAG(
dag_id="sample_dag_0",
description="This DAG was auto-generated for experimentation purposes.",
schedule="50 * * * *",
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=2),
"execution_timeout": timedelta(minutes=30),
"sla": timedelta(minutes=25),
"deferrable": False,
},
start_date=datetime.strptime("9/20/2024", "%m/%d/%Y"),
catchup=False,
dagrun_timeout=timedelta(minutes=60),
is_paused_upon_creation=False,
tags=["generated_workload", "sample"],
) as dag:
# -------------------------------------------------
# Default DataprocBatchOperator Taskflow
# -------------------------------------------------
batch_id = "sample_dag_0-0-batch".replace("_", "-")
batch_config = {
"spark_batch": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
}
}
task_0 = DataprocCreateBatchOperator(
task_id="create_batch_0",
project_id="your-project",
region="your-region",
batch_id=batch_id,
batch=batch_config,
)
# -------------------------------------------------
# Default EmptyOperator Taskflow
# -------------------------------------------------
task_1 = EmptyOperator(
task_id=f"empty_task_1",
)
# -------------------------------------------------
# Default EmptyOperator Taskflow
# -------------------------------------------------
task_2 = EmptyOperator(
task_id=f"empty_task_2",
)
# -------------------------------------------------
# Default EmptyOperator Taskflow
# -------------------------------------------------
task_3 = EmptyOperator(
task_id=f"empty_task_3",
)
# -------------------------------------------------
# Default PythonOperator Taskflow
# -------------------------------------------------
task_4 = PythonOperator(
task_id="python_4",
python_callable=lambda: print(f"Hello World from DAG: sample_dag_0, Task: 4"),
)
# -------------------------------------------------
# Default PythonOperator Taskflow
# -------------------------------------------------
task_5 = PythonOperator(
task_id="python_5",
python_callable=lambda: print(f"Hello World from DAG: sample_dag_0, Task: 5"),
)
# -------------------------------------------------
# Default BigQueryInsertJobOperator Taskflow
# -------------------------------------------------
task_6 = BigQueryInsertJobOperator(
task_id="bigquery_insert_job_6",
configuration={
"query": {
"query": "SELECT 1",
"useLegacySql": False,
}
},
location="your-region",
)
# -------------------------------------------------
# Default BigQueryInsertJobOperator Taskflow
# -------------------------------------------------
task_7 = BigQueryInsertJobOperator(
task_id="bigquery_insert_job_7",
configuration={
"query": {
"query": "SELECT 1",
"useLegacySql": False,
}
},
location="your-region",
)
# -------------------------------------------------
# Default DataprocBatchOperator Taskflow
# -------------------------------------------------
batch_id = "sample_dag_0-8-batch".replace("_", "-")
batch_config = {
"spark_batch": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
}
}
task_8 = DataprocCreateBatchOperator(
task_id="create_batch_8",
project_id="your-project",
region="your-region",
batch_id=batch_id,
batch=batch_config,
)
# -------------------------------------------------
# Default EmptyOperator Taskflow
# -------------------------------------------------
task_9 = EmptyOperator(
task_id=f"empty_task_9",
)
(
task_0
>> task_1
>> task_2
>> task_3
>> task_4
>> task_5
>> task_6
>> task_7
>> task_8
>> task_9
)