extensions/python/pythonprocessors/nifiapi/componentstate.py (41 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import Dict
from minifi_native import StateManager as CppFlowFile
class Scope(Enum):
CLUSTER = 1
LOCAL = 2
class StateMap:
def __init__(self, state_map: Dict[str, str]):
self.state_map = state_map if state_map is not None else {}
def getStateVersion(self) -> int:
return 1
def get(self, key) -> str:
if key not in self.state_map:
return None
return self.state_map[key]
def toMap(self) -> Dict[str, str]:
return self.state_map
class StateManager:
"""
Python class wrapping the StateManager CPP implementation.
"""
def __init__(self, cpp_state_manager: CppFlowFile):
self.cpp_state_manager = cpp_state_manager
def setState(self, state: Dict[str, str], scope: Scope) -> bool:
try:
return self.cpp_state_manager.set(state)
except Exception as exception:
raise StateException("Set state failed") from exception
def getState(self, scope: Scope) -> StateMap:
try:
return StateMap(self.cpp_state_manager.get())
except Exception as exception:
raise StateException("Get state failed") from exception
def replace(self, old_state: StateMap, new_values: Dict[str, str], scope: Scope) -> bool:
try:
return self.cpp_state_manager.replace(old_state.toMap(), new_values)
except Exception as exception:
raise StateException("Replace state failed") from exception
def clear(self, scope: Scope):
try:
self.cpp_state_manager.clear()
except Exception as exception:
raise StateException("Clear state failed") from exception
class StateException(Exception):
"""
Exception class for any exception produced by the operations in StateManager.
"""