in asterix-bad/src/main/java/org/apache/asterix/bad/runtime/ActiveTimestampManager.java [48:76]
public static synchronized boolean progressChannelExecutionTimestamps(JobId jobId, String channelName,
String nodeId) {
if (channelName.equals("")) {
return false;
}
// In distributed cases, channel name would be sufficient. Since in BADExecutionTest, all nodes share the same
// JVM, we would need to add the node id as part of the key
String channelTimeKey = nodeId + channelName;
if (channelTimeStateMap.containsKey(channelTimeKey)) {
ChannelTimeState state = channelTimeStateMap.get(channelTimeKey);
if (state.maxJobId.compareTo(jobId) < 0) {
state.previousChannelExecutionTimestamp = state.currentChannelExecutionTimestamp;
state.currentChannelExecutionTimestamp = System.currentTimeMillis();
state.maxJobId = jobId;
LOGGER.log(Level.FINE, "CHN TS UPD " + channelName + " at " + jobId + " on " + nodeId + " "
+ state.previousChannelExecutionTimestamp + " - " + state.currentChannelExecutionTimestamp);
// System.err.println("CHN TS UPD " + channelName + " at " + jobId + " on " + nodeId + " "
// + state.previousChannelExecutionTimestamp + " - " + state.currentChannelExecutionTimestamp);
return true;
}
} else {
LOGGER.log(Level.FINE,
"CHN TS INIT " + channelName + " at " + jobId + " 0 - " + System.currentTimeMillis());
// System.err.println("CHN TS INIT " + channelName + " at " + jobId + " on node " + nodeId + " 0 - "
// + System.currentTimeMillis());
channelTimeStateMap.put(channelTimeKey, new ChannelTimeState(0, System.currentTimeMillis(), jobId));
}
return false;
}