in python/dataproc_templates/kafka/kafka_to_bq.py [0:0]
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f'--{constants.KAFKA_BQ_CHECKPOINT_LOCATION}',
dest=constants.KAFKA_BQ_CHECKPOINT_LOCATION,
required=True,
help='GCS location of the checkpoint folder'
)
parser.add_argument(
f'--{constants.KAFKA_BOOTSTRAP_SERVERS}',
dest=constants.KAFKA_BOOTSTRAP_SERVERS,
required=True,
help='Kafka topic address from where data is coming'
)
parser.add_argument(
f'--{constants.KAFKA_BQ_TOPIC}',
dest=constants.KAFKA_BQ_TOPIC,
required=True,
help='Kafka Topic Name'
)
parser.add_argument(
f'--{constants.KAFKA_BQ_STARTING_OFFSET}',
dest=constants.KAFKA_BQ_STARTING_OFFSET,
required=True,
help='Offset to start reading from. Accepted values: "earliest", "latest","{json string}"}'
)
parser.add_argument(
f'--{constants.KAFKA_BQ_DATASET}',
dest=constants.KAFKA_BQ_DATASET,
required=True,
help='Bigquery Dataset'
)
parser.add_argument(
f'--{constants.KAFKA_BQ_TABLE_NAME}',
dest=constants.KAFKA_BQ_TABLE_NAME,
required=True,
help="Bigquery Table Name"
)
parser.add_argument(
f'--{constants.KAFKA_BQ_OUTPUT_MODE}',
dest=constants.KAFKA_BQ_OUTPUT_MODE,
required=True,
help="Bigquery Table Output Mode (append , complete, update)"
)
parser.add_argument(
f'--{constants.KAFKA_BQ_TEMP_BUCKET_NAME}',
dest=constants.KAFKA_BQ_TEMP_BUCKET_NAME,
required=True,
help="GCS Temp Bucket Name"
)
parser.add_argument(
f'--{constants.KAFKA_BQ_TERMINATION_TIMEOUT}',
dest=constants.KAFKA_BQ_TERMINATION_TIMEOUT,
required=True,
help="Timeout for termination of kafka subscription"
)
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
return vars(known_args)