datasets/libraries_io/pipelines/versions/versions_dag.py (139 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from airflow import DAG from airflow.operators import bash from airflow.providers.google.cloud.operators import kubernetes_engine from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", "depends_on_past": False, "start_date": "2022-11-15", } with DAG( dag_id="libraries_io.versions", default_args=default_args, max_active_runs=1, schedule_interval="@daily", catchup=False, default_view="graph", ) as dag: # Fetch data gcs - gcs bash_gcs_to_gcs = bash.BashOperator( task_id="bash_gcs_to_gcs", bash_command="if test -f /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz;\nthen\n mkdir /home/airflow/gcs/data/libraries_io/versions/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/versions-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/versions/versions.csv\nelse\n mkdir /home/airflow/gcs/data/libraries_io/\n curl -o /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -L https://zenodo.org/record/2536573/files/Libraries.io-open-data-1.4.0.tar.gz\n tar -xf /home/airflow/gcs/data/libraries_io/lib-1.6.0.tar.gz -C /home/airflow/gcs/data/libraries_io/\n mkdir /home/airflow/gcs/data/libraries_io/versions/\n cp /home/airflow/gcs/data/libraries_io/libraries-1.4.0-2018-12-22/versions-1.4.0-2018-12-22.csv /home/airflow/gcs/data/libraries_io/versions/versions.csv\nfi\n", ) create_cluster = kubernetes_engine.GKECreateClusterOperator( task_id="create_cluster", project_id="{{ var.value.gcp_project }}", location="us-central1-c", body={ "name": "pdp-libraries-io-versions", "initial_node_count": 1, "network": "{{ var.value.vpc_network }}", "node_config": { "machine_type": "e2-standard-16", "oauth_scopes": [ "https://www.googleapis.com/auth/devstorage.read_write", "https://www.googleapis.com/auth/cloud-platform", ], }, }, ) # Run CSV transform within kubernetes pod transform_versions = kubernetes_engine.GKEStartPodOperator( task_id="transform_versions", startup_timeout_seconds=600, name="versions", namespace="default", project_id="{{ var.value.gcp_project }}", location="us-central1-c", cluster_name="pdp-libraries-io-versions", image_pull_policy="Always", image="{{ var.json.libraries_io.container_registry.run_csv_transform_kub }}", env_vars={ "SOURCE_GCS_BUCKET": "{{ var.value.composer_bucket }}", "SOURCE_GCS_OBJECT": "data/libraries_io/versions/versions.csv", "SOURCE_FILE": "files/versions.csv", "TARGET_FILE": "files/data_versions.csv", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/libraries_io/versions/data_versions.csv", "CHUNKSIZE": "100000", "PIPELINE_NAME": "versions", "RENAME_MAPPINGS": '{"ID":"id","Platform":"platform","Project Name":"project_name","Project ID":"project_id","Number":"number", "Published Timestamp":"published_timestamp","Created Timestamp":"created_timestamp","Updated Timestamp":"updated_timestamp"}', "CSV_HEADERS": '["id","platform","project_name","project_id","number","published_timestamp","created_timestamp","updated_timestamp"]', }, container_resources={ "memory": {"request": "16Gi"}, "cpu": {"request": "1"}, "ephemeral-storage": {"request": "10Gi"}, }, ) # Task to load CSV data to a BigQuery table load_versions_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_versions_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/libraries_io/versions/data_versions.csv"], source_format="CSV", destination_project_dataset_table="libraries_io.versions", skip_leading_rows=2, allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", schema_fields=[ { "name": "id", "type": "integer", "description": "The unique primary key of the version in the Libraries.io database.", "mode": "nullable", }, { "name": "platform", "type": "string", "description": "The name of the Package manager the version is available on.", "mode": "nullable", }, { "name": "project_name", "type": "string", "description": "The name of the project the version belongs to.", "mode": "nullable", }, { "name": "project_id", "type": "integer", "description": "The unique primary key of the project for this version in the Libraries.io database.", "mode": "nullable", }, { "name": "number", "type": "string", "description": "The number of the release often confirms to semantic versioning.", "mode": "nullable", }, { "name": "published_timestamp", "type": "timestamp", "description": "The timestamp of when the version was published.", "mode": "nullable", }, { "name": "created_timestamp", "type": "timestamp", "description": "The timestamp of when the version was first detected by Libraries.io.", "mode": "nullable", }, { "name": "updated_timestamp", "type": "timestamp", "description": "The timestamp of when the version was last saved by Libraries.io.", "mode": "nullable", }, ], ) delete_cluster = kubernetes_engine.GKEDeleteClusterOperator( task_id="delete_cluster", project_id="{{ var.value.gcp_project }}", location="us-central1-c", name="pdp-libraries-io-versions", ) ( bash_gcs_to_gcs >> create_cluster >> transform_versions >> delete_cluster >> load_versions_to_bq )