public void flush()

in flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java [199:244]


    public void flush() throws Exception {
        if (states.values().stream().noneMatch(State::isNeedFlush)) {
            // No any state needs to be flushed.
            return;
        }

        // Build the data that need to be flushed.
        var flushData = new HashMap<State, List<StateType>>(3);
        for (Map.Entry<StateType, State> stateEntry : states.entrySet()) {
            State state = stateEntry.getValue();
            if (!state.isNeedFlush()) {
                continue;
            }
            StateType stateType = stateEntry.getKey();
            flushData.compute(
                    state,
                    (st, list) -> {
                        if (list == null) {
                            list = new LinkedList<>();
                        }
                        list.add(stateType);
                        return list;
                    });
        }

        for (var entry : flushData.entrySet()) {
            State state = entry.getKey();
            List<StateType> stateTypes = entry.getValue();
            switch (state) {
                case NEEDS_CREATE:
                    jdbcStateInteractor.createData(jobKey, stateTypes, data);
                    break;
                case NEEDS_DELETE:
                    jdbcStateInteractor.deleteData(jobKey, stateTypes);
                    break;
                case NEEDS_UPDATE:
                    jdbcStateInteractor.updateData(jobKey, stateTypes, data);
                    break;
                default:
                    throw new IllegalStateException(String.format("Unknown state : %s", state));
            }
            for (var stateType : stateTypes) {
                updateState(stateType, STATE_TRANSITIONER::flushTransition);
            }
        }
    }