in flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java [199:244]
public void flush() throws Exception {
if (states.values().stream().noneMatch(State::isNeedFlush)) {
// No any state needs to be flushed.
return;
}
// Build the data that need to be flushed.
var flushData = new HashMap<State, List<StateType>>(3);
for (Map.Entry<StateType, State> stateEntry : states.entrySet()) {
State state = stateEntry.getValue();
if (!state.isNeedFlush()) {
continue;
}
StateType stateType = stateEntry.getKey();
flushData.compute(
state,
(st, list) -> {
if (list == null) {
list = new LinkedList<>();
}
list.add(stateType);
return list;
});
}
for (var entry : flushData.entrySet()) {
State state = entry.getKey();
List<StateType> stateTypes = entry.getValue();
switch (state) {
case NEEDS_CREATE:
jdbcStateInteractor.createData(jobKey, stateTypes, data);
break;
case NEEDS_DELETE:
jdbcStateInteractor.deleteData(jobKey, stateTypes);
break;
case NEEDS_UPDATE:
jdbcStateInteractor.updateData(jobKey, stateTypes, data);
break;
default:
throw new IllegalStateException(String.format("Unknown state : %s", state));
}
for (var stateType : stateTypes) {
updateState(stateType, STATE_TRANSITIONER::flushTransition);
}
}
}