in python/dataproc_templates/pubsublite/pubsublite_to_gcs.py [0:0]
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_INPUT_SUBSCRIPTION_URL}',
dest=constants.PUBSUBLITE_TO_GCS_INPUT_SUBSCRIPTION_URL,
required=True,
help='Pub/Sub Lite Input subscription url'
)
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_WRITE_MODE}',
dest=constants.PUBSUBLITE_TO_GCS_WRITE_MODE,
required=False,
default=constants.OUTPUT_MODE_APPEND,
help=(
'Output write mode '
'(one of: append, update, complete) '
'(Defaults to append)'
),
choices=[
constants.OUTPUT_MODE_APPEND,
constants.OUTPUT_MODE_UPDATE,
constants.OUTPUT_MODE_COMPLETE
]
)
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_OUTPUT_LOCATION}',
dest=constants.PUBSUBLITE_TO_GCS_OUTPUT_LOCATION,
required=True,
help='Cloud Storage output Bucket URL'
)
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_CHECKPOINT_LOCATION}',
dest=constants.PUBSUBLITE_TO_GCS_CHECKPOINT_LOCATION,
required=True,
help='Temporary folder for checkpoint location'
)
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_OUTPUT_FORMAT}',
dest=constants.PUBSUBLITE_TO_GCS_OUTPUT_FORMAT,
required=False,
default=constants.FORMAT_JSON,
help=(
'Output Format to Cloud Storage '
'(one of: json, csv, avro, parquet) '
'(Defaults to json)'
),
choices=[
constants.FORMAT_AVRO,
constants.FORMAT_CSV,
constants.FORMAT_JSON,
constants.FORMAT_PRQT
]
)
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_TIMEOUT}',
dest=constants.PUBSUBLITE_TO_GCS_TIMEOUT,
required=True,
type=int,
help=('Time for which subscriptions will be read')
)
parser.add_argument(
f'--{constants.PUBSUBLITE_TO_GCS_PROCESSING_TIME}',
dest=constants.PUBSUBLITE_TO_GCS_PROCESSING_TIME,
required=True,
help=('Time interval at which the query will be triggered to process input data')
)
add_spark_options(
parser,
constants.get_csv_output_spark_options("pubsublite.to.gcs.output."),
read_options=False
)
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
return vars(known_args)