extensions/python/pythonprocessors/nifiapi/properties.py (237 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from enum import Enum from typing import List, Dict from minifi_native import ProcessSession, timePeriodStringToMilliseconds, dataSizeStringToBytes from minifi_native import FlowFile as CppFlowFile from minifi_native import ProcessContext as CppProcessContext from .componentstate import StateManager # This is a mock for NiFi's StandardValidators class methods, that return the property type equivalent in MiNiFi C++ if exists class ValidatorGenerator: def createNonNegativeFloatingPointValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createDirectoryExistsValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createURLValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createListValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createTimePeriodValidator(self, *args) -> int: return StandardValidators.TIME_PERIOD_VALIDATOR def createAttributeExpressionLanguageValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createDataSizeBoundsValidator(self, *args) -> int: return StandardValidators.DATA_SIZE_VALIDATOR def createRegexMatchingValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createRegexValidator(self, *args) -> int: return StandardValidators.ALWAYS_VALID def createLongValidator(self, *args) -> int: return StandardValidators.LONG_VALIDATOR class StandardValidators: _standard_validators = ValidatorGenerator() ALWAYS_VALID = 0 NON_EMPTY_VALIDATOR = 1 INTEGER_VALIDATOR = 2 POSITIVE_INTEGER_VALIDATOR = 3 POSITIVE_LONG_VALIDATOR = 4 NON_NEGATIVE_INTEGER_VALIDATOR = 5 NUMBER_VALIDATOR = 6 LONG_VALIDATOR = 7 PORT_VALIDATOR = 8 NON_EMPTY_EL_VALIDATOR = 9 HOSTNAME_PORT_LIST_VALIDATOR = 10 BOOLEAN_VALIDATOR = 11 URL_VALIDATOR = 12 URI_VALIDATOR = 13 REGULAR_EXPRESSION_VALIDATOR = 14 REGULAR_EXPRESSION_WITH_EL_VALIDATOR = 15 TIME_PERIOD_VALIDATOR = 16 DATA_SIZE_VALIDATOR = 17 FILE_EXISTS_VALIDATOR = 18 NON_NEGATIVE_FLOATING_POINT_VALIDATOR = 19 class MinifiPropertyTypes: INTEGER_TYPE = 0 LONG_TYPE = 1 BOOLEAN_TYPE = 2 DATA_SIZE_TYPE = 3 TIME_PERIOD_TYPE = 4 NON_BLANK_TYPE = 5 PORT_TYPE = 6 def translateStandardValidatorToMiNiFiPropertype(validators: List[int]) -> int: if validators is None or len(validators) == 0 or len(validators) > 1: return None validator = validators[0] if validator == StandardValidators.INTEGER_VALIDATOR: return MinifiPropertyTypes.INTEGER_TYPE if validator == StandardValidators.LONG_VALIDATOR: return MinifiPropertyTypes.LONG_TYPE if validator == StandardValidators.BOOLEAN_VALIDATOR: return MinifiPropertyTypes.BOOLEAN_TYPE if validator == StandardValidators.DATA_SIZE_VALIDATOR: return MinifiPropertyTypes.DATA_SIZE_TYPE if validator == StandardValidators.TIME_PERIOD_VALIDATOR: return MinifiPropertyTypes.TIME_PERIOD_TYPE if validator == StandardValidators.NON_EMPTY_VALIDATOR: return MinifiPropertyTypes.NON_BLANK_TYPE if validator == StandardValidators.PORT_VALIDATOR: return MinifiPropertyTypes.PORT_TYPE return None class PropertyDependency: def __init__(self, property_descriptor, *dependent_values): if dependent_values is None: dependent_values = [] self.property_descriptor = property_descriptor self.dependent_values = dependent_values class ResourceDefinition: def __init__(self, allow_multiple=False, allow_file=True, allow_url=False, allow_directory=False, allow_text=False): self.allow_multiple = allow_multiple self.allow_file = allow_file self.allow_url = allow_url self.allow_directory = allow_directory self.allow_text = allow_text class ExpressionLanguageScope(Enum): NONE = 1 ENVIRONMENT = 2 FLOWFILE_ATTRIBUTES = 3 class PropertyDescriptor: def __init__(self, name: str, description: str, required: bool = False, sensitive: bool = False, display_name: str = None, default_value: str = None, allowable_values: List[str] = None, dependencies: List[PropertyDependency] = None, expression_language_scope: ExpressionLanguageScope = ExpressionLanguageScope.NONE, dynamic: bool = False, validators: List[int] = None, resource_definition: ResourceDefinition = None, controller_service_definition: str = None): if validators is None: validators = [StandardValidators.ALWAYS_VALID] self.name = name self.description = description self.required = required self.sensitive = sensitive self.displayName = display_name self.defaultValue = default_value self.allowableValues = allowable_values self.dependencies = dependencies self.expressionLanguageScope = expression_language_scope self.dynamic = dynamic self.validators = validators self.resourceDefinition = resource_definition self.controllerServiceDefinition = controller_service_definition class TimeUnit(Enum): NANOSECONDS = "NANOSECONDS", MICROSECONDS = "MICROSECONDS", MILLISECONDS = "MILLISECONDS", SECONDS = "SECONDS", MINUTES = "MINUTES", HOURS = "HOURS", DAYS = "DAYS" class DataUnit(Enum): B = "B", KB = "KB", MB = "MB", GB = "GB", TB = "TB" class FlowFile: def __init__(self, session: ProcessSession, cpp_flow_file: CppFlowFile): self.session = session self.cpp_flow_file = cpp_flow_file def getContentsAsBytes(self): return self.session.getContentsAsBytes(self.cpp_flow_file) def getAttribute(self, name: str): return self.cpp_flow_file.getAttribute(name) def getSize(self): return self.cpp_flow_file.getSize() def getAttributes(self): return self.cpp_flow_file.getAttributes() class PythonPropertyValue: def __init__(self, cpp_context: CppProcessContext, name: str, string_value: str, el_supported: bool, controller_service_definition: str): self.cpp_context = cpp_context self.value = None self.name = name if string_value is not None: self.value = string_value self.el_supported = el_supported self.controller_service_definition = controller_service_definition def getValue(self) -> str: return self.value def isSet(self) -> bool: return self.value is not None def asInteger(self) -> int: if not self.value: return None return int(self.value) def asBoolean(self) -> bool: if not self.value: return None return self.value.lower() == 'true' def asFloat(self) -> float: if not self.value: return None return float(self.value) def asTimePeriod(self, time_unit: TimeUnit) -> int: if not self.value: return None milliseconds = timePeriodStringToMilliseconds(self.value) if time_unit == TimeUnit.NANOSECONDS: return milliseconds * 1000000 if time_unit == TimeUnit.MICROSECONDS: return milliseconds * 1000 if time_unit == TimeUnit.MILLISECONDS: return milliseconds if time_unit == TimeUnit.SECONDS: return int(round(milliseconds / 1000)) if time_unit == TimeUnit.MINUTES: return int(round(milliseconds / 1000 / 60)) if time_unit == TimeUnit.HOURS: return int(round(milliseconds / 1000 / 60 / 60)) if time_unit == TimeUnit.DAYS: return int(round(milliseconds / 1000 / 60 / 60 / 24)) return 0 def asDataSize(self, data_unit: DataUnit) -> float: if not self.value: return None bytes = dataSizeStringToBytes(self.value) if data_unit == DataUnit.B: return float(bytes) if data_unit == DataUnit.KB: return float(bytes / 1024) if data_unit == DataUnit.MB: return float(bytes / 1024 / 1024) if data_unit == DataUnit.GB: return float(bytes / 1024 / 1024 / 1024) if data_unit == DataUnit.TB: return float(bytes / 1024 / 1024 / 1024 / 1024) return 0 def evaluateAttributeExpressions(self, flow_file: FlowFile = None): if flow_file is None or not self.el_supported: return self # If Expression Language is supported and present, evaluate it and return a new PropertyValue. # Otherwise just return self, in order to avoid the cost of making the call to cpp for getProperty new_string_value = self.cpp_context.getProperty(self.name, flow_file.cpp_flow_file) return PythonPropertyValue(self.cpp_context, self.name, new_string_value, self.el_supported, self.controller_service_definition) def asControllerService(self): if not self.controller_service_definition: raise Exception("Controller Service definition is not set, getProperty must be called with a property descriptor instead of string value") return self.cpp_context.getControllerService(self.value, self.controller_service_definition) class ProcessContext: def __init__(self, cpp_context: CppProcessContext, processor): self.cpp_context = cpp_context self.processor = processor def getProperty(self, descriptor) -> PythonPropertyValue: if descriptor is None: return None if isinstance(descriptor, str): property_name = descriptor expression_language_support = True controller_service_definition = None else: property_name = descriptor.name expression_language_support = descriptor.expressionLanguageScope != ExpressionLanguageScope.NONE controller_service_definition = descriptor.controllerServiceDefinition property_value = self.cpp_context.getProperty(property_name) return PythonPropertyValue(self.cpp_context, property_name, property_value, expression_language_support, controller_service_definition) def getStateManager(self) -> StateManager: return StateManager(self.cpp_context.getStateManager()) def getName(self) -> str: return self.cpp_context.getName() def getProperties(self) -> Dict[PropertyDescriptor, str]: properties = dict() cpp_properties = self.cpp_context.getProperties() for property_descriptor in self.processor.getPropertyDescriptors(): if property_descriptor.name in cpp_properties: properties[property_descriptor] = cpp_properties[property_descriptor.name] return properties def yield_resources(self): self.cpp_context.yieldResources()