data-mesh-banking-labs/setup/terraform/modules/composer/composer.tf (229 lines of code) (raw):

/** * Copyright 2022 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #################################################################################### # Variables #################################################################################### variable "project_id" {} variable "datastore_project_id" {} variable "project_number" {} variable "location" {} variable "network_id" {} variable "prefix" {} variable "dataplex_process_bucket_name" {} variable "date_partition" {} locals { _dataplex_process_bucket_name = format("%s_dataplex_process",var.project_id) } #################################################################################### # Composer 2 #################################################################################### # Cloud Composer v2 API Service Agent Extension # The below does not overwrite at the Org level like GCP docs: https://cloud.google.com/composer/docs/composer-2/create-environments#terraform resource "google_project_iam_member" "cloudcomposer_account_service_agent_v2_ext" { project = var.project_id role = "roles/composer.ServiceAgentV2Ext" member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com" } # Cloud Composer API Service Agent resource "google_project_iam_member" "cloudcomposer_account_service_agent" { project = var.project_id role = "roles/composer.serviceAgent" member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com" depends_on = [ google_project_iam_member.cloudcomposer_account_service_agent_v2_ext ] } resource "google_project_iam_member" "composer_service_account_worker_role" { project = var.project_id role = "roles/composer.worker" member = "serviceAccount:${google_service_account.composer_service_account.email}" depends_on = [ google_service_account.composer_service_account ] } resource "google_compute_subnetwork" "composer_subnet" { project = var.project_id name = "composer-subnet" ip_cidr_range = "10.2.0.0/16" region = var.location network = var.network_id } resource "google_service_account" "composer_service_account" { project = var.project_id account_id = "composer-service-account" display_name = "Service Account for Composer Environment" } # ActAs role resource "google_project_iam_member" "cloudcomposer_act_as" { project = var.project_id role = "roles/iam.serviceAccountUser" member = "serviceAccount:${google_service_account.composer_service_account.email}" depends_on = [ google_service_account.composer_service_account ] } # ActAs role resource "google_project_iam_member" "cloudcomposer_admin" { project = var.project_id role = "roles/composer.admin" member = "serviceAccount:${google_service_account.composer_service_account.email}" depends_on = [ google_project_iam_member.cloudcomposer_act_as ] } resource "google_project_iam_member" "cloudcomposer_editorrole" { project = var.project_id role = "roles/editor" member = "serviceAccount:${google_service_account.composer_service_account.email}" depends_on = [ google_project_iam_member.cloudcomposer_admin ] } resource "google_project_iam_member" "cloudcomposer_tokencreator" { project = var.project_id role = "roles/iam.serviceAccountTokenCreator" member = "serviceAccount:${google_service_account.composer_service_account.email}" depends_on = [ google_project_iam_member.cloudcomposer_editorrole ] } resource "google_composer_environment" "composer_env" { project = var.project_id name = format("%s-composer", var.project_id) region = var.location config { software_config { image_version = "composer-2.1.5-airflow-2.3.4" #"composer-2.0.7-airflow-2.2.3" pypi_packages = { #google-cloud-dataplex = ">=0.1.0" requests_oauth2 = "" # scipy = "==1.1.0" } #cloud_data_lineage_integration = { # enabled=true #} env_variables = { AIRFLOW_VAR_CUST_ENTITY_LIST_FILE_PATH = "/home/airflow/gcs/data/customer_data_products/entities.txt", AIRFLOW_VAR_CUSTOMER_DC_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/customer-source-configs", AIRFLOW_VAR_CUSTOMER_DC_INPUT_FILE = "data-product-classification-tag-auto.yaml", AIRFLOW_VAR_CUSTOMER_DP_INFO_INPUT_FILE = "data-product-info-tag-auto.yaml", AIRFLOW_VAR_CUSTOMER_DP_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/customer-source-configs", AIRFLOW_VAR_CUSTOMER_DPLX_LAKE_ID = "consumer-banking--customer--domain", AIRFLOW_VAR_CUSTOMER_DPLX_REGION = "${var.location}", AIRFLOW_VAR_CUSTOMER_DPLX_ZONE_ID = "customer-data-product-zone", AIRFLOW_VAR_CUSTOMER_DQ_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/customer-source-configs", AIRFLOW_VAR_CUSTOMER_DQ_INPUT_FILE = "data-product-quality-tag-auto.yaml", AIRFLOW_VAR_CUSTOMER_DQ_RAW_INPUT_YAML = "gs://${var.dataplex_process_bucket_name}/code/customer-source-configs/dq_customer_gcs_data.yaml", AIRFLOW_VAR_CUSTOMER_DX_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/customer-source-configs", AIRFLOW_VAR_CUSTOMER_DX_INPUT_FILE = "data-product-exchange-tag-manual.yaml", AIRFLOW_VAR_DATA_CLASSIFICATION_MAIN_CLASS = "com.google.cloud.dataplex.templates.dataclassification.DataProductClassification", AIRFLOW_VAR_DATA_EXCHANGE_MAIN_CLASS = "com.google.cloud.dataplex.templates.datapublication.DataProductPublicationInfo", AIRFLOW_VAR_DATA_QUALITY_MAIN_CLASS = "com.google.cloud.dataplex.templates.dataquality.DataProductQuality", AIRFLOW_VAR_DPLX_API_END_POINT = "https://dataplex.googleapis.com", AIRFLOW_VAR_DQ_BQ_REGION = "${var.location}", AIRFLOW_VAR_DQ_DATASET_ID = "central_dq_results", AIRFLOW_VAR_DQ_TARGET_SUMMARY_TABLE = "${var.project_id}.central_dq_results.dq_results", AIRFLOW_VAR_GCP_CUSTOMER_SA_ACCT = "customer-sa@${var.project_id}.iam.gserviceaccount.com", AIRFLOW_VAR_GCP_DG_PROJECT = "${var.project_id}", AIRFLOW_VAR_GCP_DG_NUMBER = "${var.project_number}", AIRFLOW_VAR_GCP_DW_PROJECT = "${var.datastore_project_id}", AIRFLOW_VAR_GCP_MERCHANTS_SA_ACCT = "merchant-sa@${var.project_id}.iam.gserviceaccount.com", AIRFLOW_VAR_GCP_PROJECT_REGION = "${var.location}", AIRFLOW_VAR_GCP_SUB_NET = "projects/${var.project_id}/regions/${var.location}/subnetworks/dataplex-default", AIRFLOW_VAR_GCP_TRANSACTIONS_CONSUMER_SA_ACCT = "cc-trans-consumer-sa@${var.project_id}.iam.gserviceaccount.com", AIRFLOW_VAR_GCP_TRANSACTIONS_SA_ACCT = "cc-trans-sa@${var.project_id}.iam.gserviceaccount.com", AIRFLOW_VAR_GCS_DEST_BUCKET = "test", AIRFLOW_VAR_GCS_SOURCE_BUCKET = "test", AIRFLOW_VAR_GDC_TAG_JAR = "gs://${var.dataplex_process_bucket_name}/common/tagmanager-1.0-SNAPSHOT.jar", AIRFLOW_VAR_INPUT_TBL_CC_CUST = "cc_customers_data", AIRFLOW_VAR_INPUT_TBL_CUST = "customers_data", AIRFLOW_VAR_MERCHANT_DC_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/merchant-source-configs/", AIRFLOW_VAR_MERCHANT_DC_INPUT_FILE = "data-product-classification-tag-auto.yaml", AIRFLOW_VAR_MERCHANT_DP_INFO_INPUT_FILE = "data-product-info-tag-auto.yaml", AIRFLOW_VAR_MERCHANT_DP_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/merchant-source-configs", AIRFLOW_VAR_MERCHANT_DPLX_LAKE_ID = "consumer-banking--merchant--domain", AIRFLOW_VAR_MERCHANT_DPLX_REGION = "${var.location}", AIRFLOW_VAR_MERCHANT_DPLX_ZONE_ID = "merchant-data-product-zone", AIRFLOW_VAR_MERCHANT_DQ_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/merchant-source-configs", AIRFLOW_VAR_MERCHANT_DQ_INPUT_FILE = "data-product-quality-tag-auto.yaml", AIRFLOW_VAR_MERCHANT_DQ_RAW_INPUT_YAML = "gs://${var.dataplex_process_bucket_name}/code/merchant-source-configs/dq_merchant_gcs_data.yaml", AIRFLOW_VAR_MERCHANT_DX_INPUT_FILE = "data-product-exchange-tag-manual.yaml", AIRFLOW_VAR_MERCHANT_DX_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/merchant-source-configs", AIRFLOW_VAR_MERCHANT_ENTITY_LIST_FILE_PATH = "/home/airflow/gcs/data/merchant_data_products/entities.txt", AIRFLOW_VAR_MERCHANT_PARTITION_DATE = "${var.date_partition}", AIRFLOW_VAR_PARTITION_DATE = "${var.date_partition}", AIRFLOW_VAR_TABLE_LIST_FILE_PATH = "/home/airflow/gcs/data/tablelist.txt", AIRFLOW_VAR_TAG_TEMPLATE_DATA_PRODUCT_CLASSIFICATION = "projects/${var.project_id}/locations/${var.location}/tagTemplates/data_product_classification", AIRFLOW_VAR_TAG_TEMPLATE_DATA_PRODUCT_EXCHANGE = "projects/${var.project_id}/locations/${var.location}/tagTemplates/data_product_exchange", AIRFLOW_VAR_TAG_TEMPLATE_DATA_PRODUCT_INFO = "projects/${var.project_id}/locations/${var.location}/tagTemplates/data_product_information", AIRFLOW_VAR_TAG_TEMPLATE_DATA_PRODUCT_QUALITY = "projects/${var.project_id}/locations/${var.location}/tagTemplates/data_product_quality", AIRFLOW_VAR_TRANSACTIONS_DC_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs", AIRFLOW_VAR_TRANSACTIONS_DC_INPUT_FILE = "data-product-classification-tag-auto.yaml", AIRFLOW_VAR_TRANSACTIONS_DP_INFO_INPUT_FILE = "data-product-info-tag-auto.yaml", AIRFLOW_VAR_TRANSACTIONS_DP_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs", AIRFLOW_VAR_TRANSACTIONS_DPLX_LAKE_ID = "consumer-banking--creditcards--transaction--domain", AIRFLOW_VAR_TRANSACTIONS_DPLX_REGION = "us-central1", AIRFLOW_VAR_TRANSACTIONS_DPLX_ZONE_ID = "authorizations-data-product-zone", AIRFLOW_VAR_TRANSACTIONS_DQ_INFO_INPUT_FILE = "data-product-quality-tag-auto.yaml", AIRFLOW_VAR_TRANSACTIONS_DQ_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs", AIRFLOW_VAR_TRANSACTIONS_DQ_INPUT_FILE = "data-product-quality-tag-auto.yaml", AIRFLOW_VAR_TRANSACTIONS_DQ_RAW_INPUT_YAML = "gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs/dq_transactions_gcs_data.yaml", AIRFLOW_VAR_TRANSACTIONS_DX_INFO_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs", AIRFLOW_VAR_TRANSACTIONS_DX_INPUT_FILE = "data-product-exchange-tag-manual.yaml", AIRFLOW_VAR_TRANSACTIONS_DX_INPUT_PATH = "gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs/", AIRFLOW_VAR_TRANSACTIONS_ENTITY_LIST_FILE_PATH = "/home/airflow/gcs/data/transactions_data_products/entities.txt", AIRFLOW_VAR_TRANSACTIONS_PARTITION_DATE = "${var.date_partition}", AIRFLOW_VAR_CUSTOMER_DQ_DP_INPUT_YAML="gs://${var.dataplex_process_bucket_name}/code/customer-source-configs/dq_customer_data_product.yaml", AIRFLOW_VAR_TOKCUSTOMER_DQ_DP_INPUT_YAML="gs://${var.dataplex_process_bucket_name}/code/customer-source-configs/dq_tokenized_customer_data_product.yaml", AIRFLOW_VAR_MERCHANT_DQ_DP_INPUT_YAML="gs://${var.dataplex_process_bucket_name}/code/merchant-source-configs/dq_merchant_data_product.yaml", AIRFLOW_VAR_TRANS_SRC_DQ_DP_INPUT_YAML="gs://${var.dataplex_process_bucket_name}/code/transactions-source-configs/dq_transactions_data_product.yaml", AIRFLOW_VAR_TRANS_CON_DQ_DP_INPUT_YAML="gs://${var.dataplex_process_bucket_name}/code/transactions-consumer-configs/dq_cc_analytics_data_product.yaml", } } # this is designed to be the smallest cheapest Composer for demo purposes workloads_config { scheduler { cpu = 4 memory_gb = 10 storage_gb = 10 count = 1 } web_server { cpu = 0.5 memory_gb = 1 storage_gb = 1 } worker { cpu = 2 memory_gb = 10 storage_gb = 10 min_count = 1 max_count = 4 } } environment_size = "ENVIRONMENT_SIZE_SMALL" node_config { network = var.network_id subnetwork = google_compute_subnetwork.composer_subnet.id service_account = google_service_account.composer_service_account.name } } depends_on = [ google_project_iam_member.cloudcomposer_account_service_agent_v2_ext, google_project_iam_member.cloudcomposer_account_service_agent, google_compute_subnetwork.composer_subnet, google_service_account.composer_service_account, google_project_iam_member.composer_service_account_worker_role, ## google_project_iam_member.composer_service_account_bq_admin_role ] timeouts { create = "90m" } } resource "null_resource" "dag_setup" { provisioner "local-exec" { command = <<-EOT export airflow_dag_folder=$(gcloud composer environments describe ${var.project_id}-composer --location="us-central1" | grep dagGcsPrefix | awk '{print $2}') export airflow_data_folder=$(gcloud composer environments describe ${var.project_id}-composer --location="us-central1" | grep dagGcsPrefix | awk '{print $2}' | sed -e 's/dags/data/') gsutil mv gs://${local._dataplex_process_bucket_name}/composer/dags/* $airflow_dag_folder gsutil mv gs://${local._dataplex_process_bucket_name}/composer/data/* $airflow_data_folder/ EOT } depends_on = [ google_composer_environment.composer_env] }