python/main.py (98 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict, Any, Type import logging import sys from pyspark.sql import SparkSession from dataproc_templates import BaseTemplate, TemplateName from dataproc_templates.gcs.gcs_to_jdbc import GCSToJDBCTemplate from dataproc_templates.mongo.mongo_to_gcs import MongoToGCSTemplate from dataproc_templates.mongo.mongo_to_bq import MongoToBigQueryTemplate from dataproc_templates.util import get_template_name, get_log_level, track_template_invocation from dataproc_templates.gcs.gcs_to_bigquery import GCSToBigQueryTemplate from dataproc_templates.gcs.gcs_to_gcs import GCSToGCSTemplate from dataproc_templates.gcs.gcs_to_mongo import GCSToMONGOTemplate from dataproc_templates.gcs.gcs_to_bigtable import GCSToBigTableTemplate from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate from dataproc_templates.bigquery.bigquery_to_memorystore import BigQueryToMemorystoreTemplate from dataproc_templates.hive.hive_to_bigquery import HiveToBigQueryTemplate from dataproc_templates.hive.hive_to_gcs import HiveToGCSTemplate from dataproc_templates.gcs.text_to_bigquery import TextToBigQueryTemplate from dataproc_templates.hbase.hbase_to_gcs import HbaseToGCSTemplate from dataproc_templates.jdbc.jdbc_to_jdbc import JDBCToJDBCTemplate from dataproc_templates.jdbc.jdbc_to_gcs import JDBCToGCSTemplate from dataproc_templates.jdbc.jdbc_to_bigquery import JDBCToBigQueryTemplate from dataproc_templates.snowflake.snowflake_to_gcs import SnowflakeToGCSTemplate from dataproc_templates.redshift.redshift_to_gcs import RedshiftToGCSTemplate from dataproc_templates.cassandra.cassandra_to_bigquery import CassandraToBQTemplate from dataproc_templates.hive.util.hive_ddl_extractor import HiveDDLExtractorTemplate from dataproc_templates.kafka.kafka_to_gcs import KafkaToGCSTemplate from dataproc_templates.kafka.kafka_to_bq import KafkaToBigQueryTemplate from dataproc_templates.s3.s3_to_bigquery import S3ToBigQueryTemplate from dataproc_templates.cassandra.cassandra_to_gcs import CassandraToGCSTemplate from dataproc_templates.pubsublite.pubsublite_to_gcs import PubSubLiteToGCSTemplate from dataproc_templates.azure.azure_blob_storage_to_bigquery import AzureBlobStorageToBigQueryTemplate from dataproc_templates.pubsublite.pubsublite_to_bigtable import PubSubLiteToBigtableTemplate from dataproc_templates.elasticsearch.elasticsearch_to_gcs import ElasticsearchToGCSTemplate from dataproc_templates.elasticsearch.elasticsearch_to_bq import ElasticsearchToBQTemplate from dataproc_templates.elasticsearch.elasticsearch_to_bigtable import ElasticsearchToBigTableTemplate LOGGER: logging.Logger = logging.getLogger('dataproc_templates') # Maps each TemplateName to its corresponding implementation # of BaseTemplate TEMPLATE_IMPLS: Dict[TemplateName, Type[BaseTemplate]] = { TemplateName.GCSTOBIGQUERY: GCSToBigQueryTemplate, TemplateName.GCSTOGCS: GCSToGCSTemplate, TemplateName.GCSTOBIGTABLE: GCSToBigTableTemplate, TemplateName.BIGQUERYTOGCS: BigQueryToGCSTemplate, TemplateName.BIGQUERYTOMEMORYSTORE: BigQueryToMemorystoreTemplate, TemplateName.HIVETOBIGQUERY: HiveToBigQueryTemplate, TemplateName.HIVETOGCS: HiveToGCSTemplate, TemplateName.TEXTTOBIGQUERY: TextToBigQueryTemplate, TemplateName.GCSTOJDBC: GCSToJDBCTemplate, TemplateName.GCSTOMONGO: GCSToMONGOTemplate, TemplateName.HBASETOGCS: HbaseToGCSTemplate, TemplateName.JDBCTOJDBC: JDBCToJDBCTemplate, TemplateName.JDBCTOGCS: JDBCToGCSTemplate, TemplateName.JDBCTOBIGQUERY: JDBCToBigQueryTemplate, TemplateName.MONGOTOGCS: MongoToGCSTemplate, TemplateName.MONGOTOBIGQUERY: MongoToBigQueryTemplate, TemplateName.SNOWFLAKETOGCS: SnowflakeToGCSTemplate, TemplateName.REDSHIFTTOGCS: RedshiftToGCSTemplate, TemplateName.CASSANDRATOBQ: CassandraToBQTemplate, TemplateName.AZUREBLOBSTORAGETOBQ: AzureBlobStorageToBigQueryTemplate, TemplateName.CASSANDRATOGCS: CassandraToGCSTemplate, TemplateName.HIVEDDLEXTRACTOR: HiveDDLExtractorTemplate, TemplateName.KAFKATOGCS: KafkaToGCSTemplate, TemplateName.KAFKATOBQ: KafkaToBigQueryTemplate, TemplateName.S3TOBIGQUERY: S3ToBigQueryTemplate, TemplateName.PUBSUBLITETOGCS: PubSubLiteToGCSTemplate, TemplateName.PUBSUBLITETOBIGTABLE: PubSubLiteToBigtableTemplate, TemplateName.ELASTICSEARCHTOGCS: ElasticsearchToGCSTemplate, TemplateName.ELASTICSEARCHTOBQ: ElasticsearchToBQTemplate, TemplateName.ELASTICSEARCHTOBIGTABLE: ElasticsearchToBigTableTemplate } def create_spark_session(template_name: TemplateName) -> SparkSession: """ Creates the SparkSession object. It also sets the Spark logging level to info. We could consider parametrizing the log level in the future. Args: template_name (str): The name of the template being run. Used to set the Spark app name. Returns: pyspark.sql.SparkSession: The set up SparkSession. """ spark = SparkSession.builder \ .appName(template_name.value) \ .enableHiveSupport() \ .getOrCreate() log4j = spark.sparkContext._jvm.org.apache.log4j log4j_level: log4j.Level = log4j.Level.toLevel(get_log_level()) log4j.LogManager.getRootLogger().setLevel(log4j_level) log4j.LogManager.getLogger("org.apache.spark").setLevel(log4j_level) spark.sparkContext.setLogLevel(get_log_level()) return spark def run_template(template_name: TemplateName) -> None: """ Executes a template given it's template name. Args: template_name (TemplateName): The TemplateName of the template that should be run. Returns: None """ # pylint: disable=broad-except template_impl: Type[BaseTemplate] = TEMPLATE_IMPLS[template_name] template_instance: BaseTemplate = template_impl.build() try: args: Dict[str, Any] = template_instance.parse_args() spark: SparkSession = create_spark_session(template_name=template_name) track_template_invocation(spark=spark, template_name=template_name) template_instance.run(spark=spark, args=args) except Exception: LOGGER.exception( 'An error occurred while running %s template', template_name ) sys.exit(1) if __name__ == '__main__': run_template( template_name=get_template_name() )