transcoder/Transcoder.py (184 lines of code) (raw):
#
# Copyright 2023 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pylint: disable=broad-except
import importlib
import logging
import os
import signal
import sys
from datetime import datetime
from transcoder.message import DatacastParser, NoParser
from transcoder.message.ErrorWriter import ErrorWriter, TranscodeStep
from transcoder.message.MessageUtil import get_message_parser, parse_handler_config
from transcoder.output import get_output_manager
from transcoder.source import get_message_source
# pylint: disable=invalid-name
class Transcoder: # pylint: disable=too-many-instance-attributes
""" Main entry point for transcodihg sessions, bounded by a schema, source and parser """
def __init__(self, # pylint: disable=too-many-arguments),too-many-locals
factory: str, schema_file_path: str, source_file_path: str, source_file_encoding: str,
source_file_format_type: str, source_file_endian: str, prefix_length: int, skip_lines: int,
skip_bytes: int, message_skip_bytes: int, quiet: bool, output_type: str, output_encoding: str,
output_path: str, error_output_path: str, destination_project_id: str, destination_dataset_id: str,
message_handlers: str, lazy_create_resources: bool, frame_only: bool, stats_only: bool,
create_schemas_only: bool, continue_on_error: bool, create_schema_enforcing_topics: bool,
sampling_count: int, message_type_inclusions: str, message_type_exclusions: str, fix_header_tags: str,
fix_separator: int, base64: bool, base64_urlsafe: bool):
signal.signal(signal.SIGINT, self.trap)
self.message_handler_spec = message_handlers
self.message_handlers = {}
self.all_message_type_handlers = []
self.all_handlers = []
self.handlers_enabled = False
self.continue_on_error = continue_on_error
self.error_output_path = error_output_path
self.output_encoding = output_encoding
self.frame_only = frame_only
self.create_schemas_only = create_schemas_only
self.lazy_create_resources = lazy_create_resources
self.prefix_length = prefix_length
self.quiet = quiet
self.start_time = None
self.stats_only = stats_only
self.sampling_count = sampling_count
self.transcoded_count = 0
self.output_prefix = os.path.basename(
os.path.splitext(source_file_path)[0]) if source_file_path else 'stdin'
self.error_writer = ErrorWriter(prefix=self.output_prefix,
output_path=self.error_output_path)
self.output_manager = get_output_manager(output_type, self.output_prefix, output_path,
output_encoding, self.prefix_length, destination_project_id,
destination_dataset_id, lazy_create_resources,
create_schema_enforcing_topics)
# TODO: think about this abstraction some more
if self.output_manager.supports_data_writing() is False:
self.create_schemas_only = True
else:
self.source = get_message_source(source_file_path, source_file_encoding,
source_file_format_type, source_file_endian,
skip_bytes, skip_lines, message_skip_bytes,
prefix_length, base64, base64_urlsafe)
self.message_parser: DatacastParser = NoParser() if self.frame_only else get_message_parser(
factory,
schema_file_path,
stats_only,
message_type_inclusions,
message_type_exclusions,
fix_header_tags,
fix_separator
)
self.setup_handlers()
def transcode(self):
"""Entry point for transcoding session"""
self.start_time = datetime.now()
if self.frame_only is False:
self.process_schemas()
with self.source:
for raw_msg in self.source.get_message_iterator():
if self.frame_only: # don't parse message
self.message_parser.process_message(raw_msg)
self.output_manager.write_record(None, raw_msg)
else: # parse message
if self.stats_only is False: # output message
self.transcode_message(raw_msg)
if self.transcoded_count == self.sampling_count:
break
self.print_summary()
def transcode_message(self, raw):
""" Transcoding steps executed on each source message """
self.error_writer.set_step(TranscodeStep.PARSE_MESSAGE)
msg = None
try:
msg = self.message_parser.process_message(raw)
if msg.exception is not None:
self.handle_exception(raw, msg, msg.exception)
if msg.ignored is False: # passed inclusions / exclusions
self.execute_handlers(msg)
if msg.ignored is False: # passed filters
self.error_writer.set_step(TranscodeStep.WRITE_OUTPUT_RECORD)
self.output_manager.write_record(msg.name, msg.dictionary)
self.transcoded_count += 1
except Exception as ex:
self.handle_exception(raw, msg, ex)
def execute_handlers(self, message):
""" Executes in sequence the message handlers specified for this transcoding instance """
if self.handlers_enabled is True: # execute handlers
self.error_writer.set_step(TranscodeStep.EXECUTE_HANDLERS)
for handler in self.all_message_type_handlers + self.message_handlers.get(message.type, []):
self.error_writer.set_step(TranscodeStep.EXECUTE_HANDLER, type(handler).__name__)
handler.handle(message)
def setup_handlers(self):
"""Initialize MessageHandler instances to employ at runtime"""
if self.message_handler_spec is None or self.message_handler_spec == "":
return
self.handlers_enabled = True
handler_strs = self.message_handler_spec.split(',')
for handler_spec in handler_strs:
cls_name = None
config_dict = None
if handler_spec.find(':') == -1: # no handler params
cls_name = handler_spec
else:
cls_name = handler_spec.split(':')[0]
config_dict = parse_handler_config(handler_spec)
module = importlib.import_module('transcoder.message.handler')
class_ = getattr(module, cls_name)
instance = class_(config_dict)
self.all_handlers.append(instance)
if instance.supports_all_message_types is True:
self.all_message_type_handlers.append(instance)
continue
supported_msg_types = instance.supported_message_types
for supported_type in supported_msg_types:
if supported_type in self.message_handlers:
handler_list = self.message_handlers[supported_type]
if instance not in handler_list:
self.message_handlers[supported_type].append(instance)
else:
self.message_handlers[supported_type] = [instance]
def print_summary(self):
"""Print summary of the messages that were processed"""
if logging.getLogger().isEnabledFor(logging.INFO):
end_time = datetime.now()
time_diff = end_time - self.start_time
total_seconds = time_diff.total_seconds()
if self.create_schemas_only is True:
logging.info('Run in create_schemas_only mode')
if self.output_manager.supports_data_writing() is False:
logging.info('Output manager \'%s\' does not support message writes',
self.output_manager.output_type_identifier())
if self.frame_only is False:
if self.message_parser.stats_only is True:
logging.info('Run in stats_only mode')
if self.sampling_count is not None:
logging.info('Sampled messages: %s', self.sampling_count)
if self.message_parser.message_type_inclusions is not None:
logging.info('Message type inclusions: %s', self.message_parser.message_type_inclusions)
elif self.message_parser.message_type_exclusions is not None:
logging.info('Message type exclusions: %s', self.message_parser.message_type_exclusions)
if self.create_schemas_only is False:
logging.info('Source message count: %s', self.source.record_count)
logging.info('Processed message count: %s', self.message_parser.record_count)
logging.info('Transcoded message count: %s', self.transcoded_count)
logging.info('Processed schema count: %s', self.message_parser.total_schema_count)
logging.info('Summary of message counts: %s', self.message_parser.record_type_count)
logging.info('Summary of error message counts: %s', self.message_parser.error_record_type_count)
logging.info('Message rate: %s per second', round(self.source.record_count / total_seconds, 6))
else:
logging.info('Source record count: %s', self.source.record_count)
logging.info('Total runtime in seconds: %s', round(total_seconds, 6))
logging.info('Total runtime in minutes: %s', round(total_seconds / 60, 6))
def process_schemas(self):
"""Process the schema specified at runtime"""
spec_schemas = self.message_parser.process_schema()
for schema in spec_schemas:
if self.output_manager.supports_zero_field_schemas() is False and len(schema.fields) == 0:
logging.info('Schema "%s" contains no field definitions, skipping schema creation', schema.name)
continue
for handler in self.all_handlers:
if handler.supports_all_message_types is True \
or schema.message_id in handler.supported_message_types:
handler.append_manufactured_fields(schema)
self.output_manager.enqueue_schema(schema)
# Only need to wait if lazy create is off, and you want to force creation before data is read
if self.lazy_create_resources is False:
self.output_manager.wait_for_schema_creation()
def handle_exception(self, raw_record, message, exception):
"""Process exceptions encountered in the message processing runtime"""
if message is not None:
self.message_parser.increment_error_summary_count(message.name)
else:
self.message_parser.increment_error_summary_count()
self.error_writer.write_error(raw_record, message, exception)
if self.continue_on_error is False:
raise exception
def trap(self, _signum, _frame):
"""Trap SIGINT to suppress noisy stack traces and show interim summary"""
print()
self.print_summary()
sys.exit(1)