transcoder/message/MessageUtil.py (33 lines of code) (raw):

# # Copyright 2023 Google LLC # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from third_party.pyfixmsg.parser import FixParser from third_party.sbedecoder import SBEParser from transcoder.message import DatacastParser from transcoder.message.exception import MessageParserNotDefinedError from transcoder.message.factory.MessageFactory import get_message_factory def get_message_parser(factory: str, schema_file_path: str, # pylint: disable=too-many-arguments stats_only: bool = False, message_type_inclusions: str = None, message_type_exclusions: str = None, fix_header_tags: str = None, fix_separator: int = 1) -> DatacastParser: """Returns a DatacastParser instance based on the supplied factory name""" message_parser: DatacastParser = None if factory in SBEParser.supported_factory_types(): message_factory = get_message_factory(factory, schema_file_path) message_parser = SBEParser(message_factory, message_type_inclusions=message_type_inclusions, message_type_exclusions=message_type_exclusions, stats_only=stats_only) elif factory in FixParser.supported_factory_types(): message_parser = FixParser(schema_file_path=schema_file_path, message_type_inclusions=message_type_inclusions, message_type_exclusions=message_type_exclusions, fix_header_tags=fix_header_tags, fix_separator=fix_separator, stats_only=stats_only) else: raise MessageParserNotDefinedError return message_parser def parse_handler_config(handler_config_string: str) -> dict: """ Extracts the configuration parameters attached to the CLI handler option, in the format: FirstHandler:<param>=<value>,SecondHandler:<param>=<value> For example: --message_handlers SequencerHandler,FilterHandler:field=value would run a SequencerHandler without a config, then pass a single-element dict of field: value to FilterHandler via the config object. This routine manufactures the configuration for the Handler from the CLI string """ if handler_config_string.find(':') != -1: config = {} params = handler_config_string.split(':')[1] keyval = params.split('=') config[keyval[0]] = keyval[1] return config return None