transcoder/message/DatacastParser.py (74 lines of code) (raw):
#
# Copyright 2023 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from transcoder.message import DatacastSchema, ParsedMessage
from transcoder.message.exception import ParserFunctionNotDefinedError
class DatacastParser:
"""Class encapsulating message parsing and processing functionality """
@staticmethod
def supported_factory_types():
"""Static method for retrieving list of provider-specific factory classes"""
raise ParserFunctionNotDefinedError
def __init__(self, stats_only: bool = False,
message_type_inclusions: str = None, message_type_exclusions: str = None):
self.stats_only = stats_only
self.message_type_inclusions = message_type_inclusions.split(
',') if message_type_inclusions is not None else None
self.message_type_exclusions = message_type_exclusions.split(
',') if message_type_exclusions is not None else None
self.use_message_type_filtering = message_type_inclusions is not None or message_type_exclusions is not None
self.record_count = 0
self.summary_count = {}
self.total_schema_count = 0
self.error_summary_count = {}
@property
def record_type_count(self):
"""Method returning count of record types in a given source"""
return self.summary_count
@property
def total_record_count(self):
"""Method returning count of all records in a given source"""
return self.record_count
@property
def error_record_type_count(self):
"""Method returning count of all errored records by type"""
return self.error_summary_count
def process_schema(self) -> [DatacastSchema]:
"""Gets message names from schema file, filters messages to include, sets count dict"""
schema_list = self._process_schema()
filtered_list = list(filter(lambda x: self.__include_message_type(x.name), schema_list))
self.total_schema_count = len(filtered_list)
for name in list(map(lambda x: x.name, filtered_list)):
self.summary_count[name] = 0
return filtered_list
def _process_schema(self) -> [DatacastSchema]:
"""Function for sub-class to implement for processing schema"""
raise ParserFunctionNotDefinedError
def process_message(self, raw_msg) -> ParsedMessage:
"""Wraps _process_message with count and inclusion behavior"""
message = self._process_message(raw_msg)
if message is None:
return None
if self.__include_message_type(message.name) is False:
message.ignored = True
return message
self.increment_summary_count(message.name)
if self.stats_only is True:
message.ignored = True
return message
message = self._parse_message(message)
return message
def __include_message_type(self, msg_type):
if self.use_message_type_filtering is True:
msg_type_str = str(msg_type)
if self.message_type_inclusions is not None and msg_type_str not in self.message_type_inclusions:
return False
if self.message_type_exclusions is not None and msg_type_str in self.message_type_exclusions:
return False
return True
def _process_message(self, raw_msg) -> ParsedMessage:
raise ParserFunctionNotDefinedError
def _parse_message(self, message: ParsedMessage) -> ParsedMessage:
raise ParserFunctionNotDefinedError
def _process_fields(self, message: ParsedMessage):
raise ParserFunctionNotDefinedError
def increment_summary_count(self, message_name: str):
"""Increments message count by type"""
self.record_count += 1
if message_name not in self.summary_count:
self.summary_count[message_name] = 0
self.summary_count[message_name] += 1
def increment_error_summary_count(self, message_name: str = 'UNKNOWN'):
"""Increments error count by message type"""
if message_name not in self.error_summary_count:
self.error_summary_count[message_name] = 0
self.error_summary_count[message_name] += 1
def get_summary_count(self, message_name: str):
"""Returns summary count by message type"""
return self.summary_count.get(message_name, 0)