datasets/imdb/pipelines/interfaces/pipeline.yaml (479 lines of code) (raw):

# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. --- resources: - type: bigquery_table table_id: name_basics description: "It consists details about unique identifier of the name/person." - type: bigquery_table table_id: title_akas description: "It consists details about unique identifier of the title_id." - type: bigquery_table table_id: title_basics description: "It consists additional details about unique identifier of the title_id." - type: bigquery_table table_id: title_crew description: "Contains the director and writer information for all the titles in IMDb." - type: bigquery_table table_id: title_episode description: "Contains the tv episode information." - type: bigquery_table table_id: title_principals description: "Contains the principal cast/crew for titles." - type: bigquery_table table_id: title_ratings description: "Contains the IMDb rating and votes information for titles." dag: airflow_version: 2 initialize: dag_id: interfaces default_args: owner: "Google" depends_on_past: False start_date: "2021-03-01" max_active_runs: 1 schedule_interval: "@weekly" catchup: False default_view: graph tasks: - operator: "GKECreateClusterOperator" args: task_id: "create_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" body: name: pdp-imdb-interfaces initial_node_count: 1 network: "{{ var.value.vpc_network }}" node_config: machine_type: e2-standard-16 oauth_scopes: - https://www.googleapis.com/auth/devstorage.read_write - https://www.googleapis.com/auth/cloud-platform - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "name_basics_transform_csv" startup_timeout_seconds: 600 name: "name_basics" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/name.basics.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/name_basics.tsv.gz"} TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/name_basics_data_output.csv" TABLE_NAME: "name_basics" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["nconst", "primary_name", "birth_year", "death_year", "primary_profession", "known_for_titles"] RENAME_MAPPINGS: >- {"nconst": "nconst", "primaryName": "primary_name", "birthYear": "birth_year", "deathYear": "death_year", "primaryProfession": "primary_profession", "knownForTitles": "known_for_titles"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_name_basics_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/name_basics_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.name_basics" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "nconst" type: "string" description: "Alphanumeric unique identifier of the name/person." mode: "nullable" - name: "primary_name" type: "string" description: "Name by which the person is most often credited." mode: "nullable" - name: "birth_year" type: "integer" description: "Birth year in YYYY format." mode: "nullable" - name: "death_year" type: "integer" description: "Death year in YYYY format if applicable." mode: "nullable" - name: "primary_profession" type: "string" description: "The top-3 professions of the person." mode: "nullable" - name: "known_for_titles" type: "string" description: "Titles the person is known for." mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "title_akas_transform_csv" startup_timeout_seconds: 900 name: "title_akas" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/title.akas.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/title_akas.tsv.gz"} CHUNK_SIZE: "300000" TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/title_akas_data_output.csv" TABLE_NAME: "title_akas" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["title_id", "ordering", "title", "region", "language", "types", "attributes", "is_original_title"] RENAME_MAPPINGS: >- {"titleId": "title_id", "ordering": "ordering", "title": "title", "region": "region", "language": "language", "types": "types", "attributes": "attributes", "isOriginalTitle": "is_original_title"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_title_akas_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/title_akas_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.title_akas" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "title_id" type: "string" description: "A tconst, an alphanumeric unique identifier of the title." mode: "nullable" - name: "ordering" type: "integer" description: "A number to uniquely identify rows for a given title_id." mode: "nullable" - name: "title" type: "string" description: "The localized title." mode: "nullable" - name: "region" type: "string" description: "The region for this version of the title." mode: "nullable" - name: "language" type: "string" description: "The language of the title." mode: "nullable" - name: "types" type: "string" description: "Enumerated set of attributes for this alternative title. One or more of the following: 'alternative', 'dvd', 'festival', 'tv', 'video', 'working', 'original', 'imdbDisplay'. New values may be added in the future without warning." mode: "nullable" - name: "attributes" type: "string" description: "Additional terms to describe this alternative title, not enumerated" mode: "nullable" - name: "is_original_title" type: "boolean" description: "False: not original title; True: original title." mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "title_basics_transform_csv" startup_timeout_seconds: 600 name: "title_basics" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/title.basics.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/title_basics.tsv.gz"} TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/title_basics_data_output.csv" TABLE_NAME: "title_basics" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["tconst", "title_type", "primary_title", "original_title", "is_adult", "start_year", "end_year", "runtime_minutes", "genres"] RENAME_MAPPINGS: >- {"tconst": "tconst", "titleType": "title_type", "primaryTitle": "primary_title", "originalTitle": "original_title", "isAdult": "is_adult", "startYear": "start_year", "endYear": "end_year", "runtimeMinutes": "runtime_minutes", "genres": "genres"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_title_basics_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/title_basics_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.title_basics" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "tconst" type: "string" description: "Alphanumeric unique identifier of the title." mode: "nullable" - name: "title_type" type: "string" description: "The type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc)." mode: "nullable" - name: "primary_title" type: "string" description: "The more popular title / the title used by the filmmakers on promotional materials at the point of release." mode: "nullable" - name: "original_title" type: "string" description: "Original title, in the original language." mode: "nullable" - name: "is_adult" type: "integer" description: "0: non-adult title; 1: adult title." mode: "nullable" - name: "start_year" type: "integer" description: "Represents the release year of a title. In the case of TV Series, it is the series start year." mode: "nullable" - name: "end_year" type: "integer" description: "TV Series end year." mode: "nullable" - name: "runtime_minutes" type: "integer" description: "Primary runtime of the title, in minutes." mode: "nullable" - name: "genres" type: "string" description: "Includes up to three genres associated with the title." mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "title_crew_transform_csv" startup_timeout_seconds: 600 name: "title_crew" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/title.crew.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/title_crew.tsv.gz"} TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/title_crew_data_output.csv" TABLE_NAME: "title_crew" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["tconst", "directors", "writers"] RENAME_MAPPINGS: >- {"tconst": "tconst", "directors": "directors", "writers": "writers"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_title_crew_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/title_crew_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.title_crew" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "tconst" type: "string" description: "Alphanumeric unique identifier of the title." mode: "nullable" - name: "directors" type: "string" description: "Strinng of nconsts - director(s) of the given title." mode: "nullable" - name: "writers" type: "string" description: "String of nconsts - writer(s) of the given title." mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "title_episode_transform_csv" startup_timeout_seconds: 600 name: "title_episode" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/title.episode.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/title_episode.tsv.gz"} TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/title_episode_data_output.csv" TABLE_NAME: "title_episode" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["tconst", "parent_tconst", "season_number", "episode_number"] RENAME_MAPPINGS: >- {"tconst": "tconst", "parentTconst": "parent_tconst", "seasonNumber": "season_number", "episodeNumber": "episode_number"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_title_episode_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/title_episode_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.title_episode" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "tconst" type: "string" description: "Alphanumeric identifier of episode." mode: "nullable" - name: "parent_tconst" type: "string" description: "Alphanumeric identifier of the parent TV Series." mode: "nullable" - name: "season_number" type: "integer" description: "Season number the episode belongs to." mode: "nullable" - name: "episode_number" type: "integer" description: "Episode number of the tconst in the TV series." mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "title_principals_transform_csv" startup_timeout_seconds: 900 name: "title_principals" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/title.principals.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/title_principals.tsv.gz"} CHUNK_SIZE: "300000" TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/title_principals_data_output.csv" TABLE_NAME: "title_principals" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["tconst", "ordering", "nconst", "category", "job", "characters"] RENAME_MAPPINGS: >- {"tconst": "tconst", "ordering": "ordering", "nconst": "nconst", "category": "category", "job": "job", "characters": "characters"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_title_principals_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/title_principals_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.title_principals" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "tconst" type: "string" description: "Alphanumeric unique identifier of the title." mode: "nullable" - name: "ordering" type: "integer" description: "a number to uniquely identify rows for a given title_id." mode: "nullable" - name: "nconst" type: "string" description: "Alphanumeric unique identifier of the name/person." mode: "nullable" - name: "category" type: "string" description: "The category of job that person was in." mode: "nullable" - name: "job" type: "string" description: "The specific job title if applicable." mode: "nullable" - name: "characters" type: "string" description: "The name of the character played if applicable." mode: "nullable" - operator: "GKEStartPodOperator" description: "Run CSV transform within kubernetes pod" args: task_id: "title_ratings_transform_csv" startup_timeout_seconds: 600 name: "title_ratings" namespace: "default" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" cluster_name: pdp-imdb-interfaces image_pull_policy: "Always" image: "{{ var.json.imdb.container_registry.run_csv_transform_kub }}" env_vars: SOURCE_URL: >- {"url": "https://datasets.imdbws.com/title.ratings.tsv.gz"} SOURCE_FILE: >- {"url_data": "./files/title_ratings.tsv.gz"} TARGET_CSV_FILE: "./files/data_output.csv" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/imdb/interfaces/title_ratings_data_output.csv" TABLE_NAME: "title_ratings" PIPELINE_NAME: "interfaces" CSV_HEADERS: >- ["tconst", "average_rating", "num_votes"] RENAME_MAPPINGS: >- {"tconst": "tconst", "averageRating": "average_rating", "numVotes": "num_votes"} - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" args: task_id: "load_title_ratings_to_bq" bucket: "{{ var.value.composer_bucket }}" source_objects: ["data/imdb/interfaces/title_ratings_data_output.csv"] source_format: "CSV" destination_project_dataset_table: "imdb.title_ratings" skip_leading_rows: 1 write_disposition: "WRITE_TRUNCATE" schema_fields: - name: "tconst" type: "string" description: "Alphanumeric unique identifier for title." mode: "nullable" - name: "average_rating" type: "float" description: "Weighted average of all the individual user ratings." mode: "nullable" - name: "num_votes" type: "integer" description: "Number of votes the title has received." mode: "nullable" - operator: "GKEDeleteClusterOperator" args: task_id: "delete_cluster" project_id: "{{ var.value.gcp_project }}" location: "us-central1-c" name: pdp-imdb-interfaces graph_paths: - "create_cluster >> [name_basics_transform_csv, title_akas_transform_csv, title_basics_transform_csv, title_crew_transform_csv, title_episode_transform_csv, title_principals_transform_csv, title_ratings_transform_csv] >> delete_cluster >> [load_name_basics_to_bq, load_title_akas_to_bq, load_title_basics_to_bq, load_title_crew_to_bq, load_title_episode_to_bq, load_title_principals_to_bq, load_title_ratings_to_bq]"