extensions/python/pythonprocessors/nifiapi/recordtransform.py (114 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import traceback import json from abc import abstractmethod from minifi_native import ProcessContext, ProcessSession, Processor from .processorbase import ProcessorBase from .properties import FlowFile as FlowFileProxy from .properties import ProcessContext as ProcessContextProxy from .properties import PropertyDescriptor class __RecordTransformResult__: def __init__(self, processor_result, recordJson): self.processor_result = processor_result self.recordJson = recordJson def getRecordJson(self): return self.recordJson def getSchema(self): return self.processor_result.schema def getRelationship(self): return self.processor_result.relationship def getPartition(self): return self.processor_result.partition class RecordTransformResult: def __init__(self, record=None, schema=None, relationship="success", partition=None): self.record = record self.schema = schema self.relationship = relationship self.partition = partition def getRecord(self): return self.record def getSchema(self): return self.schema def getRelationship(self): return self.relationship def getPartition(self): return self.partition class RecordTransform(ProcessorBase): RECORD_READER = PropertyDescriptor( name='Record Reader', display_name='Record Reader', description='''Specifies the Controller Service to use for reading incoming data''', required=True, controller_service_definition='RecordSetReader' ) RECORD_WRITER = PropertyDescriptor( name='Record Writer', display_name='Record Writer', description='''Specifies the Controller Service to use for writing out the records''', required=True, controller_service_definition='RecordSetWriter', ) def onInitialize(self, processor: Processor): super(RecordTransform, self).onInitialize(processor) processor.addProperty(self.RECORD_READER.name, self.RECORD_READER.description, None, self.RECORD_READER.required, False, False, None, None, self.RECORD_READER.controllerServiceDefinition) processor.addProperty(self.RECORD_WRITER.name, self.RECORD_WRITER.description, None, self.RECORD_WRITER.required, False, False, None, None, self.RECORD_WRITER.controllerServiceDefinition) def onTrigger(self, context: ProcessContext, session: ProcessSession): flow_file = session.get() if not flow_file: return context_proxy = ProcessContextProxy(context, self) record_reader = context_proxy.getProperty(self.RECORD_READER).asControllerService() if not record_reader: self.logger.error("Record Reader property is invalid") session.transfer(flow_file, self.REL_FAILURE) return record_writer = context_proxy.getProperty(self.RECORD_WRITER).asControllerService() if not record_writer: self.logger.error("Record Writer property is invalid") session.transfer(flow_file, self.REL_FAILURE) return try: record_list = record_reader.read(flow_file, session) if record_list is None: self.logger.error("Reading flow file records returned None") session.transfer(flow_file, self.REL_FAILURE) return except Exception: self.logger.error("Failed to read flow file records due to the following error:\n{}".format(traceback.format_exc())) session.transfer(flow_file, self.REL_FAILURE) return flow_file_proxy = FlowFileProxy(session, flow_file) results = [] for record in record_list: record_json = json.loads(record) try: result = self.transform(context_proxy, record_json, None, flow_file_proxy) result_record = result.getRecord() resultjson = None if result_record is None else json.dumps(result_record) results.append(__RecordTransformResult__(result, resultjson)) except Exception: self.logger.error("Failed to transform record due to the following error:\n{}".format(traceback.format_exc())) session.transfer(flow_file, self.REL_FAILURE) return partitions = [] partitioned_results_list = [] for result in results: if result.getRecordJson() is None: continue record_partition = result.getPartition() if record_partition in partitions: partition_index = partitions.index(record_partition) partitioned_results_list[partition_index].append(result) else: partitions.append(record_partition) partitioned_results_list.append([result]) for single_partition_results in partitioned_results_list: partitioned_flow_file = session.create(flow_file) record_writer.write([result.getRecordJson() for result in single_partition_results], partitioned_flow_file, session) if result.getRelationship() == "success": session.transfer(partitioned_flow_file, self.REL_SUCCESS) else: session.transferToCustomRelationship(partitioned_flow_file, result.getRelationship()) session.transfer(flow_file, self.REL_ORIGINAL) @abstractmethod def transform(self, context: ProcessContextProxy, record_json, schema, flowFile: FlowFileProxy) -> RecordTransformResult: pass