datasets/imdb/pipelines/interfaces/interfaces_dag.py (528 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow import DAG
from airflow.providers.google.cloud.operators import kubernetes_engine
from airflow.providers.google.cloud.transfers import gcs_to_bigquery
default_args = {
"owner": "Google",
"depends_on_past": False,
"start_date": "2021-03-01",
}
with DAG(
dag_id="imdb.interfaces",
default_args=default_args,
max_active_runs=1,
schedule_interval="@weekly",
catchup=False,
default_view="graph",
) as dag:
create_cluster = kubernetes_engine.GKECreateClusterOperator(
task_id="create_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
body={
"name": "pdp-imdb-interfaces",
"initial_node_count": 1,
"network": "{{ var.value.vpc_network }}",
"node_config": {
"machine_type": "e2-standard-16",
"oauth_scopes": [
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/cloud-platform",
],
},
},
)
# Run CSV transform within kubernetes pod
name_basics_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="name_basics_transform_csv",
startup_timeout_seconds=600,
name="name_basics",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/name.basics.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/name_basics.tsv.gz"}',
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/name_basics_data_output.csv",
"TABLE_NAME": "name_basics",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["nconst", "primary_name", "birth_year", "death_year", "primary_profession", "known_for_titles"]',
"RENAME_MAPPINGS": '{"nconst": "nconst", "primaryName": "primary_name", "birthYear": "birth_year", "deathYear": "death_year",\n "primaryProfession": "primary_profession", "knownForTitles": "known_for_titles"}',
},
)
# Task to load CSV data to a BigQuery table
load_name_basics_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_name_basics_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/name_basics_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.name_basics",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "nconst",
"type": "string",
"description": "Alphanumeric unique identifier of the name/person.",
"mode": "nullable",
},
{
"name": "primary_name",
"type": "string",
"description": "Name by which the person is most often credited.",
"mode": "nullable",
},
{
"name": "birth_year",
"type": "integer",
"description": "Birth year in YYYY format.",
"mode": "nullable",
},
{
"name": "death_year",
"type": "integer",
"description": "Death year in YYYY format if applicable.",
"mode": "nullable",
},
{
"name": "primary_profession",
"type": "string",
"description": "The top-3 professions of the person.",
"mode": "nullable",
},
{
"name": "known_for_titles",
"type": "string",
"description": "Titles the person is known for.",
"mode": "nullable",
},
],
)
# Run CSV transform within kubernetes pod
title_akas_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="title_akas_transform_csv",
startup_timeout_seconds=900,
name="title_akas",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.akas.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/title_akas.tsv.gz"}',
"CHUNK_SIZE": "300000",
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/title_akas_data_output.csv",
"TABLE_NAME": "title_akas",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["title_id", "ordering", "title", "region", "language", "types", "attributes", "is_original_title"]',
"RENAME_MAPPINGS": '{"titleId": "title_id", "ordering": "ordering", "title": "title", "region": "region", "language": "language", "types": "types", "attributes": "attributes", "isOriginalTitle": "is_original_title"}',
},
)
# Task to load CSV data to a BigQuery table
load_title_akas_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_title_akas_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/title_akas_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.title_akas",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "title_id",
"type": "string",
"description": "A tconst, an alphanumeric unique identifier of the title.",
"mode": "nullable",
},
{
"name": "ordering",
"type": "integer",
"description": "A number to uniquely identify rows for a given title_id.",
"mode": "nullable",
},
{
"name": "title",
"type": "string",
"description": "The localized title.",
"mode": "nullable",
},
{
"name": "region",
"type": "string",
"description": "The region for this version of the title.",
"mode": "nullable",
},
{
"name": "language",
"type": "string",
"description": "The language of the title.",
"mode": "nullable",
},
{
"name": "types",
"type": "string",
"description": "Enumerated set of attributes for this alternative title. One or more of the following: 'alternative', 'dvd', 'festival', 'tv', 'video', 'working', 'original', 'imdbDisplay'. New values may be added in the future without warning.",
"mode": "nullable",
},
{
"name": "attributes",
"type": "string",
"description": "Additional terms to describe this alternative title, not enumerated",
"mode": "nullable",
},
{
"name": "is_original_title",
"type": "boolean",
"description": "False: not original title; True: original title.",
"mode": "nullable",
},
],
)
# Run CSV transform within kubernetes pod
title_basics_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="title_basics_transform_csv",
startup_timeout_seconds=600,
name="title_basics",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.basics.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/title_basics.tsv.gz"}',
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/title_basics_data_output.csv",
"TABLE_NAME": "title_basics",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["tconst", "title_type", "primary_title", "original_title", "is_adult", "start_year", "end_year", "runtime_minutes", "genres"]',
"RENAME_MAPPINGS": '{"tconst": "tconst", "titleType": "title_type", "primaryTitle": "primary_title", "originalTitle": "original_title",\n "isAdult": "is_adult", "startYear": "start_year", "endYear": "end_year", "runtimeMinutes": "runtime_minutes", "genres": "genres"}',
},
)
# Task to load CSV data to a BigQuery table
load_title_basics_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_title_basics_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/title_basics_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.title_basics",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "tconst",
"type": "string",
"description": "Alphanumeric unique identifier of the title.",
"mode": "nullable",
},
{
"name": "title_type",
"type": "string",
"description": "The type/format of the title (e.g. movie, short, tvseries, tvepisode, video, etc).",
"mode": "nullable",
},
{
"name": "primary_title",
"type": "string",
"description": "The more popular title / the title used by the filmmakers on promotional materials at the point of release.",
"mode": "nullable",
},
{
"name": "original_title",
"type": "string",
"description": "Original title, in the original language.",
"mode": "nullable",
},
{
"name": "is_adult",
"type": "integer",
"description": "0: non-adult title; 1: adult title.",
"mode": "nullable",
},
{
"name": "start_year",
"type": "integer",
"description": "Represents the release year of a title. In the case of TV Series, it is the series start year.",
"mode": "nullable",
},
{
"name": "end_year",
"type": "integer",
"description": "TV Series end year.",
"mode": "nullable",
},
{
"name": "runtime_minutes",
"type": "integer",
"description": "Primary runtime of the title, in minutes.",
"mode": "nullable",
},
{
"name": "genres",
"type": "string",
"description": "Includes up to three genres associated with the title.",
"mode": "nullable",
},
],
)
# Run CSV transform within kubernetes pod
title_crew_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="title_crew_transform_csv",
startup_timeout_seconds=600,
name="title_crew",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.crew.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/title_crew.tsv.gz"}',
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/title_crew_data_output.csv",
"TABLE_NAME": "title_crew",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["tconst", "directors", "writers"]',
"RENAME_MAPPINGS": '{"tconst": "tconst", "directors": "directors", "writers": "writers"}',
},
)
# Task to load CSV data to a BigQuery table
load_title_crew_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_title_crew_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/title_crew_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.title_crew",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "tconst",
"type": "string",
"description": "Alphanumeric unique identifier of the title.",
"mode": "nullable",
},
{
"name": "directors",
"type": "string",
"description": "Strinng of nconsts - director(s) of the given title.",
"mode": "nullable",
},
{
"name": "writers",
"type": "string",
"description": "String of nconsts - writer(s) of the given title.",
"mode": "nullable",
},
],
)
# Run CSV transform within kubernetes pod
title_episode_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="title_episode_transform_csv",
startup_timeout_seconds=600,
name="title_episode",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.episode.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/title_episode.tsv.gz"}',
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/title_episode_data_output.csv",
"TABLE_NAME": "title_episode",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["tconst", "parent_tconst", "season_number", "episode_number"]',
"RENAME_MAPPINGS": '{"tconst": "tconst", "parentTconst": "parent_tconst", "seasonNumber": "season_number", "episodeNumber": "episode_number"}',
},
)
# Task to load CSV data to a BigQuery table
load_title_episode_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_title_episode_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/title_episode_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.title_episode",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "tconst",
"type": "string",
"description": "Alphanumeric identifier of episode.",
"mode": "nullable",
},
{
"name": "parent_tconst",
"type": "string",
"description": "Alphanumeric identifier of the parent TV Series.",
"mode": "nullable",
},
{
"name": "season_number",
"type": "integer",
"description": "Season number the episode belongs to.",
"mode": "nullable",
},
{
"name": "episode_number",
"type": "integer",
"description": "Episode number of the tconst in the TV series.",
"mode": "nullable",
},
],
)
# Run CSV transform within kubernetes pod
title_principals_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="title_principals_transform_csv",
startup_timeout_seconds=900,
name="title_principals",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.principals.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/title_principals.tsv.gz"}',
"CHUNK_SIZE": "300000",
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/title_principals_data_output.csv",
"TABLE_NAME": "title_principals",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["tconst", "ordering", "nconst", "category", "job", "characters"]',
"RENAME_MAPPINGS": '{"tconst": "tconst", "ordering": "ordering", "nconst": "nconst", "category": "category",\n "job": "job", "characters": "characters"}',
},
)
# Task to load CSV data to a BigQuery table
load_title_principals_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_title_principals_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/title_principals_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.title_principals",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "tconst",
"type": "string",
"description": "Alphanumeric unique identifier of the title.",
"mode": "nullable",
},
{
"name": "ordering",
"type": "integer",
"description": "a number to uniquely identify rows for a given title_id.",
"mode": "nullable",
},
{
"name": "nconst",
"type": "string",
"description": "Alphanumeric unique identifier of the name/person.",
"mode": "nullable",
},
{
"name": "category",
"type": "string",
"description": "The category of job that person was in.",
"mode": "nullable",
},
{
"name": "job",
"type": "string",
"description": "The specific job title if applicable.",
"mode": "nullable",
},
{
"name": "characters",
"type": "string",
"description": "The name of the character played if applicable.",
"mode": "nullable",
},
],
)
# Run CSV transform within kubernetes pod
title_ratings_transform_csv = kubernetes_engine.GKEStartPodOperator(
task_id="title_ratings_transform_csv",
startup_timeout_seconds=600,
name="title_ratings",
namespace="default",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
cluster_name="pdp-imdb-interfaces",
image_pull_policy="Always",
image="{{ var.json.imdb.container_registry.run_csv_transform_kub }}",
env_vars={
"SOURCE_URL": '{"url": "https://datasets.imdbws.com/title.ratings.tsv.gz"}',
"SOURCE_FILE": '{"url_data": "./files/title_ratings.tsv.gz"}',
"TARGET_CSV_FILE": "./files/data_output.csv",
"TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}",
"TARGET_GCS_PATH": "data/imdb/interfaces/title_ratings_data_output.csv",
"TABLE_NAME": "title_ratings",
"PIPELINE_NAME": "interfaces",
"CSV_HEADERS": '["tconst", "average_rating", "num_votes"]',
"RENAME_MAPPINGS": '{"tconst": "tconst", "averageRating": "average_rating", "numVotes": "num_votes"}',
},
)
# Task to load CSV data to a BigQuery table
load_title_ratings_to_bq = gcs_to_bigquery.GCSToBigQueryOperator(
task_id="load_title_ratings_to_bq",
bucket="{{ var.value.composer_bucket }}",
source_objects=["data/imdb/interfaces/title_ratings_data_output.csv"],
source_format="CSV",
destination_project_dataset_table="imdb.title_ratings",
skip_leading_rows=1,
write_disposition="WRITE_TRUNCATE",
schema_fields=[
{
"name": "tconst",
"type": "string",
"description": "Alphanumeric unique identifier for title.",
"mode": "nullable",
},
{
"name": "average_rating",
"type": "float",
"description": "Weighted average of all the individual user ratings.",
"mode": "nullable",
},
{
"name": "num_votes",
"type": "integer",
"description": "Number of votes the title has received.",
"mode": "nullable",
},
],
)
delete_cluster = kubernetes_engine.GKEDeleteClusterOperator(
task_id="delete_cluster",
project_id="{{ var.value.gcp_project }}",
location="us-central1-c",
name="pdp-imdb-interfaces",
)
(
create_cluster
>> [
name_basics_transform_csv,
title_akas_transform_csv,
title_basics_transform_csv,
title_crew_transform_csv,
title_episode_transform_csv,
title_principals_transform_csv,
title_ratings_transform_csv,
]
>> delete_cluster
>> [
load_name_basics_to_bq,
load_title_akas_to_bq,
load_title_basics_to_bq,
load_title_crew_to_bq,
load_title_episode_to_bq,
load_title_principals_to_bq,
load_title_ratings_to_bq,
]
)