public static synchronized boolean progressChannelExecutionTimestamps()

in asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java [48:76]


    public static synchronized boolean progressChannelExecutionTimestamps(JobId jobId, String channelName,
            String nodeId) {
        if (channelName.equals("")) {
            return false;
        }
        // In distributed cases, channel name would be sufficient. Since in BADExecutionTest, all nodes share the same
        // JVM, we would need to add the node id as part of the key
        String channelTimeKey = nodeId + channelName;
        if (channelTimeStateMap.containsKey(channelTimeKey)) {
            ChannelTimeState state = channelTimeStateMap.get(channelTimeKey);
            if (state.maxJobId.compareTo(jobId) < 0) {
                state.previousChannelExecutionTimestamp = state.currentChannelExecutionTimestamp;
                state.currentChannelExecutionTimestamp = System.currentTimeMillis();
                state.maxJobId = jobId;
                LOGGER.log(Level.FINE, "CHN TS UPD " + channelName + " at " + jobId + " on " + nodeId + " "
                        + state.previousChannelExecutionTimestamp + "  -  " + state.currentChannelExecutionTimestamp);
                // System.err.println("CHN TS UPD " + channelName + " at " + jobId + " on " + nodeId + " "
                //         + state.previousChannelExecutionTimestamp + "  -  " + state.currentChannelExecutionTimestamp);
                return true;
            }
        } else {
            LOGGER.log(Level.FINE,
                    "CHN TS INIT " + channelName + " at " + jobId + " 0 - " + System.currentTimeMillis());
            // System.err.println("CHN TS INIT " + channelName + " at " + jobId + " on node " + nodeId + " 0 - "
            //         + System.currentTimeMillis());
            channelTimeStateMap.put(channelTimeKey, new ChannelTimeState(0, System.currentTimeMillis(), jobId));
        }
        return false;
    }