in python/dataproc_templates/kafka/kafka_to_gcs.py [0:0]
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f'--{constants.KAFKA_GCS_CHECKPOINT_LOCATION}',
dest=constants.KAFKA_GCS_CHECKPOINT_LOCATION,
required=True,
help='Checkpoint location for Kafka to GCS Template'
)
parser.add_argument(
f'--{constants.KAFKA_GCS_OUTPUT_LOCATION}',
dest=constants.KAFKA_GCS_OUTPUT_LOCATION,
required=True,
help='GCS location of the destination folder'
)
parser.add_argument(
f'--{constants.KAFKA_GCS_BOOTSTRAP_SERVERS}',
dest=constants.KAFKA_GCS_BOOTSTRAP_SERVERS,
required=True,
help='Kafka topic address from where data is coming'
)
parser.add_argument(
f'--{constants.KAFKA_TOPIC}',
dest=constants.KAFKA_TOPIC,
required=True,
help='Kafka Topic Name'
)
parser.add_argument(
f'--{constants.KAFKA_STARTING_OFFSET}',
dest=constants.KAFKA_STARTING_OFFSET,
required=True,
help='Starting offset value (earliest, latest, json_string)'
)
parser.add_argument(
f'--{constants.KAFKA_GCS_OUTPUT_FORMAT}',
dest=constants.KAFKA_GCS_OUTPUT_FORMAT,
required=True,
help='Ouput format of the data (json , csv, avro, parquet)'
)
parser.add_argument(
f'--{constants.KAFKA_GCS_OUPUT_MODE}',
dest=constants.KAFKA_GCS_OUPUT_MODE,
required=True,
help="Ouput write mode (append, update, complete)",
choices=[
constants.OUTPUT_MODE_APPEND,
constants.OUTPUT_MODE_UPDATE,
constants.OUTPUT_MODE_COMPLETE
]
)
parser.add_argument(
f'--{constants.KAFKA_GCS_TERMINATION_TIMEOUT}',
dest=constants.KAFKA_GCS_TERMINATION_TIMEOUT,
required=True,
help="Timeout for termination of kafka subscription"
)
add_spark_options(
parser,
constants.get_csv_output_spark_options("kafka.gcs.output."),
read_options=False
)
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
return vars(known_args)