datasets/hacker_news/pipelines/full/full_dag.py (172 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from airflow import DAG from airflow.operators import bash from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", "depends_on_past": False, "start_date": "2022-10-31", } with DAG( dag_id="hacker_news.full", default_args=default_args, max_active_runs=1, schedule_interval="0 10 * * *", catchup=False, default_view="graph", ) as dag: # Fetch data gcs - gcs bash_gcs_to_gcs = bash.BashOperator( task_id="bash_gcs_to_gcs", bash_command="gsutil -m rm -a gs://{{ var.value.composer_bucket }}/data/hacker_news/batch/**\ngsutil cp `gsutil ls gs://hacker-news-backups/*_data.json |sort |tail -n 2 |head -n 1` gs://{{ var.value.composer_bucket }}/data/hacker_news/source_file.json\n", ) create_cluster = kubernetes_engine.GKECreateClusterOperator( task_id="create_cluster", project_id="{{ var.value.gcp_project }}", location="us-central1-c", body={ "name": "pdp-hacker-news", "initial_node_count": 2, "network": "{{ var.value.vpc_network }}", "node_config": { "machine_type": "e2-standard-16", "oauth_scopes": [ "https://www.googleapis.com/auth/devstorage.read_write", "https://www.googleapis.com/auth/cloud-platform", ], }, }, ) # Run CSV transform within kubernetes pod transform_csv = kubernetes_engine.GKEStartPodOperator( task_id="transform_csv", name="generate_output_files", namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", cluster_name="pdp-hacker-news", image_pull_policy="Always", image="{{ var.json.hacker_news.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_BUCKET": "{{ var.value.composer_bucket }}", "SOURCE_OBJECT": "data/hacker_news/source_file.json", "CHUNK_SIZE": "50000", "TARGET_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_LOCAL_DIR": "data/hacker_news/", "OUTPUT_CSV_HEADERS": '[ "title", "url", "text", "dead", "by",\n "score", "time", "timestamp", "type", "id",\n "parent", "descendants", "ranking", "deleted" ]', }, container_resources={ "memory": {"request": "48Gi"}, "cpu": {"request": "2"}, "ephemeral-storage": {"request": "10Gi"}, }, ) delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( task_id="delete_cluster", project_id="{{ var.value.gcp_project }}", location="us-central1-c", name="pdp-hacker-news", ) # Task to load CSV data to a BigQuery table load_full_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_full_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/hacker_news/batch/hn_processed_*.csv"], source_format="CSV", field_delimiter="|", destination_project_dataset_table="hacker_news.full", skip_leading_rows=1, ignore_unknown_values=True, allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ { "name": "title", "type": "string", "description": "Story title", "mode": "nullable", }, { "name": "url", "type": "string", "description": "Story url", "mode": "nullable", }, { "name": "text", "type": "string", "description": "Story or comment text", "mode": "nullable", }, { "name": "dead", "type": "boolean", "description": "Is dead?", "mode": "nullable", }, { "name": "by", "type": "string", "description": "The username of the item's author.", "mode": "nullable", }, { "name": "score", "type": "integer", "description": "Story score", "mode": "nullable", }, { "name": "time", "type": "integer", "description": "Unix time", "mode": "nullable", }, { "name": "timestamp", "type": "timestamp", "description": "Timestamp for the unix time", "mode": "nullable", }, { "name": "type", "type": "string", "description": "type of details (comment comment_ranking poll story job pollopt)", "mode": "nullable", }, { "name": "id", "type": "integer", "description": "The item's unique id.", "mode": "nullable", }, { "name": "parent", "type": "integer", "description": "Parent comment ID", "mode": "nullable", }, { "name": "descendants", "type": "integer", "description": "Number of story or poll descendants", "mode": "nullable", }, { "name": "ranking", "type": "integer", "description": "Comment ranking", "mode": "nullable", }, { "name": "deleted", "type": "boolean", "description": "Is deleted?", "mode": "nullable", }, ], ) ( bash_gcs_to_gcs >> create_cluster >> transform_csv >> delete_cluster >> load_full_to_bq )