in python/dataproc_templates/pubsublite/pubsublite_to_bigtable.py [0:0]
def parse_args(args: Optional[Sequence[str]] = None) -> Dict[str, Any]:
parser: argparse.ArgumentParser = argparse.ArgumentParser()
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_SUBSCRIPTION_PATH}",
dest=constants.PUBSUBLITE_BIGTABLE_SUBSCRIPTION_PATH,
required=True,
help="Pub/Sub Lite subscription path",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_STREAMING_TIMEOUT}",
dest=constants.PUBSUBLITE_BIGTABLE_STREAMING_TIMEOUT,
type=int,
default=60,
required=False,
help="Time duration after which the streaming query will be stopped (in seconds)",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_STREAMING_TRIGGER}",
dest=constants.PUBSUBLITE_BIGTABLE_STREAMING_TRIGGER,
default="0 seconds",
required=False,
help="Time interval at which the streaming query runs to process incoming data",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_STREAMING_CHECKPOINT_PATH}",
dest=constants.PUBSUBLITE_BIGTABLE_STREAMING_CHECKPOINT_PATH,
required=False,
help="Temporary folder path to store checkpoint information",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_OUTPUT_PROJECT}",
dest=constants.PUBSUBLITE_BIGTABLE_OUTPUT_PROJECT,
required=True,
help="GCP project containing the Bigtable instance",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_OUTPUT_INSTANCE}",
dest=constants.PUBSUBLITE_BIGTABLE_OUTPUT_INSTANCE,
required=True,
help="Bigtable instance ID, containing the output table",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_OUTPUT_TABLE}",
dest=constants.PUBSUBLITE_BIGTABLE_OUTPUT_TABLE,
required=True,
help="Table ID in Bigtable, to store the output",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_OUTPUT_COLUMN_FAMILIES}",
dest=constants.PUBSUBLITE_BIGTABLE_OUTPUT_COLUMN_FAMILIES,
required=False,
help="List of Column Family names to create a new table",
)
parser.add_argument(
f"--{constants.PUBSUBLITE_BIGTABLE_OUTPUT_MAX_VERSIONS}",
dest=constants.PUBSUBLITE_BIGTABLE_OUTPUT_MAX_VERSIONS,
default=1,
type=int,
required=False,
help="Maximum number of versions of cells in the new table (Garbage Collection Policy)",
)
known_args: argparse.Namespace
known_args, _ = parser.parse_known_args(args)
return vars(known_args)