transcoder/main.py (144 lines of code) (raw):

#! /usr/bin/env python3 # # Copyright 2023 Google LLC # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # pylint: disable=invalid-name # pylint: disable=too-many-locals # pylint: disable=too-many-statements """ CLI entry point for the market data transcoding API """ import argparse import logging import os from transcoder.message.factory import all_supported_factory_types from transcoder.output import all_output_identifiers from transcoder.source import all_source_identifiers from transcoder import Transcoder, __version__ script_dir = os.path.dirname(__file__) def main(): """main entry point for Datacast Transcoder""" arg_parser = argparse.ArgumentParser(description='Datacast Transcoder process input arguments', allow_abbrev=False) source_options_group = arg_parser.add_argument_group('Input source arguments') source_options_group.add_argument('--factory', choices=all_supported_factory_types(), help='Message factory for decoding') source_options_group.add_argument('--schema_file', type=str, help='Path to the schema file') source_options_group.add_argument('--source_file', type=str, help='Path to the source file') source_options_group.add_argument('--source_file_encoding', type=str, default='utf-8', help='The source file ' 'character encoding') source_options_group.add_argument('--source_file_format_type', required=True, choices=all_source_identifiers(), help='The source file format') base64_group = source_options_group.add_mutually_exclusive_group() base64_group.add_argument('--base64', action='store_true', help='Indicates if each individual message extracted from ' 'the source is base 64 encoded') base64_group.add_argument('--base64_urlsafe', action='store_true', help='Indicates if each individual message extracted from ' 'the source is base 64 url safe encoded') source_options_group.add_argument('--fix_header_tags', type=str, help='Comma delimited list of fix header tags') source_options_group.add_argument('--fix_separator', type=int, default=1, help='The unicode int representing the ' 'fix message separator') source_options_group.add_argument('--message_handlers', type=str, help='Comma delimited list of message ' 'handlers in priority order') source_options_group.add_argument('--message_skip_bytes', type=int, default=0, help='Number of bytes to skip before processing individual messages within a ' 'repeated ' 'length delimited file message source') source_options_group.add_argument('--prefix_length', type=int, default=2, help='How many bytes to use for the length prefix of length-delimited binary ' 'sources') message_filter_group = source_options_group.add_mutually_exclusive_group() message_filter_group.add_argument('--message_type_exclusions', type=str, help='Comma-delimited list of message types to exclude ' 'when processing') message_filter_group.add_argument('--message_type_inclusions', type=str, help='Comma-delimited list of message types to include ' 'when processing') source_options_group.add_argument('--sampling_count', type=int, default=None, help='Halt processing after reaching this number of messages. Applied after all Handlers are executed per message') source_options_group.add_argument('--skip_bytes', type=int, default=0, help='Number of bytes to skip before processing the file. Useful for skipping ' 'file-level headers') source_options_group.add_argument('--skip_lines', type=int, default=0, help='Number of lines to skip before processing the file') source_options_group.add_argument('--source_file_endian', choices=['big', 'little'], default='big', help='Source file endianness') output_options_group = arg_parser.add_argument_group('Output arguments') output_options_group.add_argument('--output_path', help='Output file path. Defaults to avroOut') output_options_group.add_argument('--output_type', choices=all_output_identifiers(), default='diag', help='Output format type') output_options_group.add_argument('--error_output_path', help='Error output file path if --continue_on_error flag enabled. Defaults to ' 'errorOut') output_options_group.add_argument('--lazy_create_resources', action='store_true', help='Flag indicating that output resources for message types ' 'should be only created as messages of each type are encountered in the ' 'source data. Default behavior is to create resources for each message ' 'type before messages are processed. Particularly useful when working with ' 'FIX but only processing a limited set of message types in the source data') output_options_group.add_argument('--frame_only', action='store_true', help='Flag indicating that transcoder should only frame messages to an output ' 'source') output_options_group.add_argument('--stats_only', action='store_true', help='Flag indicating that transcoder should only report on message type counts ' 'without parsing messages further') output_options_group.add_argument('--create_schemas_only', action='store_true', help='Flag indicating that transcoder should only create output resource ' 'schemas and not output message data') gcp_options_group = arg_parser.add_argument_group('Google Cloud arguments') gcp_options_group.add_argument('--destination_project_id', help='The Google Cloud project ID for the destination ' 'resource') bigquery_options_group = arg_parser.add_argument_group('BigQuery arguments') bigquery_options_group.add_argument('--destination_dataset_id', help='The BigQuery dataset for the destination. ' 'If it does not exist, it will be created') pubsub_options_group = arg_parser.add_argument_group('Pub/Sub arguments') pubsub_options_group.add_argument('--output_encoding', default='binary', choices=['binary', 'json'], help='The encoding of the output') pubsub_options_group.add_argument('--create_schema_enforcing_topics', type=bool, default=True, action=argparse.BooleanOptionalAction, help='Indicates if Pub/Sub schemas should be created and used to validate ' 'messages sent to a topic') arg_parser.add_argument('--continue_on_error', action='store_true', help='Indicates if an exception file should ' 'be created, and records continued to be ' 'processed upon message level ' 'exceptions') arg_parser.add_argument('--log', choices=['notset', 'debug', 'info', 'warning', 'error', 'critical'], default='info', help='The default logging level') arg_parser.add_argument('-q', '--quiet', action='store_true', help='Suppress message output to console') arg_parser.add_argument('-v', '--version', action='version', version=f'Datacast Transcoder {__version__}') args = arg_parser.parse_args() logging.basicConfig(level=args.log.upper()) logging.debug(args) factory = args.factory schema_file_path = os.path.expanduser(args.schema_file) if args.schema_file else None source_file_path = os.path.expanduser(args.source_file) if args.source_file else None source_file_encoding = args.source_file_encoding source_file_format_type = args.source_file_format_type source_file_endian = args.source_file_endian prefix_length = args.prefix_length skip_lines = args.skip_lines skip_bytes = args.skip_bytes message_skip_bytes = args.message_skip_bytes quiet = args.quiet output_type = args.output_type output_encoding = args.output_encoding output_path = os.path.expanduser(args.output_path) if args.output_path is not None else None error_output_path = os.path.expanduser(args.error_output_path) if args.error_output_path is not None else None destination_project_id = args.destination_project_id destination_dataset_id = args.destination_dataset_id message_handlers = args.message_handlers lazy_create_resources = args.lazy_create_resources frame_only = args.frame_only stats_only = args.stats_only create_schemas_only = args.create_schemas_only continue_on_error = args.continue_on_error create_schema_enforcing_topics = args.create_schema_enforcing_topics sampling_count = args.sampling_count message_type_inclusions = args.message_type_inclusions message_type_exclusions = args.message_type_exclusions fix_header_tags = args.fix_header_tags fix_separator = args.fix_separator base64 = args.base64 base64_urlsafe = args.base64_urlsafe txcode = Transcoder(factory, schema_file_path, source_file_path, source_file_encoding, source_file_format_type, source_file_endian, prefix_length, skip_lines, skip_bytes, message_skip_bytes, quiet, output_type, output_encoding, output_path, error_output_path, destination_project_id, destination_dataset_id, message_handlers, lazy_create_resources, frame_only, stats_only, create_schemas_only, continue_on_error, create_schema_enforcing_topics, sampling_count, message_type_inclusions, message_type_exclusions, fix_header_tags, fix_separator, base64, base64_urlsafe) txcode.transcode() if __name__ == "__main__": main()