python/main.py (98 lines of code) (raw):
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Any, Type
import logging
import sys
from pyspark.sql import SparkSession
from dataproc_templates import BaseTemplate, TemplateName
from dataproc_templates.gcs.gcs_to_jdbc import GCSToJDBCTemplate
from dataproc_templates.mongo.mongo_to_gcs import MongoToGCSTemplate
from dataproc_templates.mongo.mongo_to_bq import MongoToBigQueryTemplate
from dataproc_templates.util import get_template_name, get_log_level, track_template_invocation
from dataproc_templates.gcs.gcs_to_bigquery import GCSToBigQueryTemplate
from dataproc_templates.gcs.gcs_to_gcs import GCSToGCSTemplate
from dataproc_templates.gcs.gcs_to_mongo import GCSToMONGOTemplate
from dataproc_templates.gcs.gcs_to_bigtable import GCSToBigTableTemplate
from dataproc_templates.bigquery.bigquery_to_gcs import BigQueryToGCSTemplate
from dataproc_templates.bigquery.bigquery_to_memorystore import BigQueryToMemorystoreTemplate
from dataproc_templates.hive.hive_to_bigquery import HiveToBigQueryTemplate
from dataproc_templates.hive.hive_to_gcs import HiveToGCSTemplate
from dataproc_templates.gcs.text_to_bigquery import TextToBigQueryTemplate
from dataproc_templates.hbase.hbase_to_gcs import HbaseToGCSTemplate
from dataproc_templates.jdbc.jdbc_to_jdbc import JDBCToJDBCTemplate
from dataproc_templates.jdbc.jdbc_to_gcs import JDBCToGCSTemplate
from dataproc_templates.jdbc.jdbc_to_bigquery import JDBCToBigQueryTemplate
from dataproc_templates.snowflake.snowflake_to_gcs import SnowflakeToGCSTemplate
from dataproc_templates.redshift.redshift_to_gcs import RedshiftToGCSTemplate
from dataproc_templates.cassandra.cassandra_to_bigquery import CassandraToBQTemplate
from dataproc_templates.hive.util.hive_ddl_extractor import HiveDDLExtractorTemplate
from dataproc_templates.kafka.kafka_to_gcs import KafkaToGCSTemplate
from dataproc_templates.kafka.kafka_to_bq import KafkaToBigQueryTemplate
from dataproc_templates.s3.s3_to_bigquery import S3ToBigQueryTemplate
from dataproc_templates.cassandra.cassandra_to_gcs import CassandraToGCSTemplate
from dataproc_templates.pubsublite.pubsublite_to_gcs import PubSubLiteToGCSTemplate
from dataproc_templates.azure.azure_blob_storage_to_bigquery import AzureBlobStorageToBigQueryTemplate
from dataproc_templates.pubsublite.pubsublite_to_bigtable import PubSubLiteToBigtableTemplate
from dataproc_templates.elasticsearch.elasticsearch_to_gcs import ElasticsearchToGCSTemplate
from dataproc_templates.elasticsearch.elasticsearch_to_bq import ElasticsearchToBQTemplate
from dataproc_templates.elasticsearch.elasticsearch_to_bigtable import ElasticsearchToBigTableTemplate
LOGGER: logging.Logger = logging.getLogger('dataproc_templates')
# Maps each TemplateName to its corresponding implementation
# of BaseTemplate
TEMPLATE_IMPLS: Dict[TemplateName, Type[BaseTemplate]] = {
TemplateName.GCSTOBIGQUERY: GCSToBigQueryTemplate,
TemplateName.GCSTOGCS: GCSToGCSTemplate,
TemplateName.GCSTOBIGTABLE: GCSToBigTableTemplate,
TemplateName.BIGQUERYTOGCS: BigQueryToGCSTemplate,
TemplateName.BIGQUERYTOMEMORYSTORE: BigQueryToMemorystoreTemplate,
TemplateName.HIVETOBIGQUERY: HiveToBigQueryTemplate,
TemplateName.HIVETOGCS: HiveToGCSTemplate,
TemplateName.TEXTTOBIGQUERY: TextToBigQueryTemplate,
TemplateName.GCSTOJDBC: GCSToJDBCTemplate,
TemplateName.GCSTOMONGO: GCSToMONGOTemplate,
TemplateName.HBASETOGCS: HbaseToGCSTemplate,
TemplateName.JDBCTOJDBC: JDBCToJDBCTemplate,
TemplateName.JDBCTOGCS: JDBCToGCSTemplate,
TemplateName.JDBCTOBIGQUERY: JDBCToBigQueryTemplate,
TemplateName.MONGOTOGCS: MongoToGCSTemplate,
TemplateName.MONGOTOBIGQUERY: MongoToBigQueryTemplate,
TemplateName.SNOWFLAKETOGCS: SnowflakeToGCSTemplate,
TemplateName.REDSHIFTTOGCS: RedshiftToGCSTemplate,
TemplateName.CASSANDRATOBQ: CassandraToBQTemplate,
TemplateName.AZUREBLOBSTORAGETOBQ: AzureBlobStorageToBigQueryTemplate,
TemplateName.CASSANDRATOGCS: CassandraToGCSTemplate,
TemplateName.HIVEDDLEXTRACTOR: HiveDDLExtractorTemplate,
TemplateName.KAFKATOGCS: KafkaToGCSTemplate,
TemplateName.KAFKATOBQ: KafkaToBigQueryTemplate,
TemplateName.S3TOBIGQUERY: S3ToBigQueryTemplate,
TemplateName.PUBSUBLITETOGCS: PubSubLiteToGCSTemplate,
TemplateName.PUBSUBLITETOBIGTABLE: PubSubLiteToBigtableTemplate,
TemplateName.ELASTICSEARCHTOGCS: ElasticsearchToGCSTemplate,
TemplateName.ELASTICSEARCHTOBQ: ElasticsearchToBQTemplate,
TemplateName.ELASTICSEARCHTOBIGTABLE: ElasticsearchToBigTableTemplate
}
def create_spark_session(template_name: TemplateName) -> SparkSession:
"""
Creates the SparkSession object.
It also sets the Spark logging level to info. We could
consider parametrizing the log level in the future.
Args:
template_name (str): The name of the template being
run. Used to set the Spark app name.
Returns:
pyspark.sql.SparkSession: The set up SparkSession.
"""
spark = SparkSession.builder \
.appName(template_name.value) \
.enableHiveSupport() \
.getOrCreate()
log4j = spark.sparkContext._jvm.org.apache.log4j
log4j_level: log4j.Level = log4j.Level.toLevel(get_log_level())
log4j.LogManager.getRootLogger().setLevel(log4j_level)
log4j.LogManager.getLogger("org.apache.spark").setLevel(log4j_level)
spark.sparkContext.setLogLevel(get_log_level())
return spark
def run_template(template_name: TemplateName) -> None:
"""
Executes a template given it's template name.
Args:
template_name (TemplateName): The TemplateName of the template
that should be run.
Returns:
None
"""
# pylint: disable=broad-except
template_impl: Type[BaseTemplate] = TEMPLATE_IMPLS[template_name]
template_instance: BaseTemplate = template_impl.build()
try:
args: Dict[str, Any] = template_instance.parse_args()
spark: SparkSession = create_spark_session(template_name=template_name)
track_template_invocation(spark=spark, template_name=template_name)
template_instance.run(spark=spark, args=args)
except Exception:
LOGGER.exception(
'An error occurred while running %s template',
template_name
)
sys.exit(1)
if __name__ == '__main__':
run_template(
template_name=get_template_name()
)