transcoder/message/ErrorWriter.py (65 lines of code) (raw):
#
# Copyright 2022 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import os
import time
from enum import Enum
from transcoder.message import ParsedMessage
class TranscodeStep(Enum):
"""Enum of steps at which an error may be encountered"""
UNKNOWN = 'UNKNOWN'
PARSE_MESSAGE = 'parse_message'
EXECUTE_HANDLERS = 'execute_handlers'
EXECUTE_HANDLER = 'execute_handler'
WRITE_OUTPUT_RECORD = 'write_output_record'
@classmethod
def _missing_(cls, value):
return TranscodeStep.UNKNOWN
def __str__(self):
return self.value
def __repr__(self):
return self.value
class ErrorWriter:
"""Persist data about errors to file"""
def __init__(self, prefix: str, output_path: str = None):
self.prefix: str = prefix
self.step: TranscodeStep = TranscodeStep.UNKNOWN
self.note: str = ''
if output_path is None:
rel_path = "errorOut"
main_script_dir = os.getcwd()
self.output_path = os.path.join(main_script_dir, rel_path)
else:
self.output_path = output_path
self.file = None
def __get_file_name(self, name, extension):
epoch_time = str(time.time())
return self.output_path + '/' + name + '-' + epoch_time + '.' + extension
def set_step(self, step: TranscodeStep, note: str = ''):
"""Sets step during which error is encountered"""
self.step = step
self.note = note
def write_error(self, raw_record, message: ParsedMessage, exception: Exception):
"""Write data about error to file"""
if self.file is None:
exists = os.path.exists(self.output_path)
if not exists:
os.makedirs(self.output_path)
# Only create error file if errors exist
self.file = open(self.__get_file_name(self.prefix, 'out'), # pylint: disable=consider-using-with
mode='w', encoding='utf-8')
self.file.write('time, message_type, message_name, failed_step, exception, data\n')
encoded = self.__encode_source_message(raw_record)
ex_str = str(exception).replace('\r', '').replace('\n', '').replace(',', ' ')
epoch_time = time.time()
current_step = f'{self.step}-{self.note}'
if message is not None:
out_str = f'{epoch_time}, {message.type}, {message.name}, {current_step}, {ex_str}, '
else:
out_str = f'{epoch_time},,, {self.step}, {ex_str}, '
self.file.write(out_str + encoded + '\n')
@staticmethod
def __encode_source_message(record):
if record is None:
return ''
if isinstance(record, bytes):
return base64.b64encode(record).decode('utf-8')
if isinstance(record, str):
return base64.b64encode(record.encode('utf-8')).decode('utf-8')
return base64.b64encode(record)
def __del__(self):
if self.file is not None:
self.file.close()