in python/dataproc_templates/elasticsearch/elasticsearch_to_bq.py [0:0]
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f'--{constants.ES_BQ_INPUT_NODE}',
dest=constants.ES_BQ_INPUT_NODE,
required=True,
help='Elasticsearch Node Uri'
)
parser.add_argument(
f'--{constants.ES_BQ_INPUT_INDEX}',
dest=constants.ES_BQ_INPUT_INDEX,
required=True,
help='Elasticsearch Index Name'
)
parser.add_argument(
f'--{constants.ES_BQ_NODE_USER}',
dest=constants.ES_BQ_NODE_USER,
help='Elasticsearch Node User'
)
parser.add_argument(
f'--{constants.ES_BQ_NODE_PASSWORD}',
dest=constants.ES_BQ_NODE_PASSWORD,
help='Elasticsearch Node Password'
)
parser.add_argument(
f'--{constants.ES_BQ_NODE_API_KEY}',
dest=constants.ES_BQ_NODE_API_KEY,
help='Elasticsearch Node API Key'
)
add_es_spark_connector_options(parser, constants.get_es_spark_connector_input_options("es.bq.input."))
parser.add_argument(
f'--{constants.ES_BQ_FLATTEN_STRUCT}',
dest=constants.ES_BQ_FLATTEN_STRUCT,
action='store_true',
required=False,
help='Flatten the struct fields'
)
parser.add_argument(
f'--{constants.ES_BQ_FLATTEN_ARRAY}',
dest=constants.ES_BQ_FLATTEN_ARRAY,
action='store_true',
required=False,
help=(
'Flatten the n-D array fields to 1-D array fields,'
f' it needs {constants.ES_BQ_FLATTEN_STRUCT} to be true'
)
)
parser.add_argument(
f'--{constants.ES_BQ_OUTPUT_DATASET}',
dest=constants.ES_BQ_OUTPUT_DATASET,
required=True,
help='BigQuery Output Dataset Name'
)
parser.add_argument(
f'--{constants.ES_BQ_OUTPUT_TABLE}',
dest=constants.ES_BQ_OUTPUT_TABLE,
required=True,
help='BigQuery Output Table Name'
)
parser.add_argument(
f'--{constants.ES_BQ_OUTPUT_MODE}',
dest=constants.ES_BQ_OUTPUT_MODE,
required=False,
default=constants.OUTPUT_MODE_APPEND,
help=(
'BigQuery Output write mode '
'(one of: append,overwrite,ignore,errorifexists) '
'(Defaults to append)'
),
choices=[
constants.OUTPUT_MODE_OVERWRITE,
constants.OUTPUT_MODE_APPEND,
constants.OUTPUT_MODE_IGNORE,
constants.OUTPUT_MODE_ERRORIFEXISTS
]
)
add_spark_options(parser, constants.get_bq_output_spark_options("es.bq.output."))
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
if (not getattr(known_args, constants.ES_BQ_NODE_API_KEY)
and (not getattr(known_args, constants.ES_BQ_NODE_USER)
or not getattr(known_args, constants.ES_BQ_NODE_PASSWORD))):
sys.exit("ArgumentParser Error: Either of es.bq.input.user and es.bq.input.password "
+ "OR es.bq.input.api.key needs to be provided as argument to read data from Elasticsearch")
elif (getattr(known_args, constants.ES_BQ_NODE_API_KEY)
and (getattr(known_args, constants.ES_BQ_NODE_USER)
or getattr(known_args, constants.ES_BQ_NODE_PASSWORD))):
sys.exit("ArgumentParser Error: Both es.bq.input.user and es.bq.input.password "
+ "AND es.bq.input.api.key cannot be provided as arguments at the same time.")
return vars(known_args)