in storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java [369:543]
private void scanInternal(FilterOptions filter, ScanCallback scanCallback, RocksDbScanCallback rawCallback) throws MetricException {
Map<String, Integer> stringToIdCache = new HashMap<>();
Map<Integer, String> idToStringCache = new HashMap<>();
int startTopologyId = 0;
int endTopologyId = 0xFFFFFFFF;
String filterTopologyId = filter.getTopologyId();
if (filterTopologyId != null) {
int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, filterTopologyId, stringToIdCache);
if (INVALID_METADATA_STRING_ID == topologyId) {
return; // string does not exist in database
}
startTopologyId = topologyId;
endTopologyId = topologyId;
}
long startTime = filter.getStartTime();
long endTime = filter.getEndTime();
int startMetricId = 0;
int endMetricId = 0xFFFFFFFF;
String filterMetricName = filter.getMetricName();
if (filterMetricName != null) {
int metricId = lookupMetadataString(KeyType.METRIC_STRING, filterMetricName, stringToIdCache);
if (INVALID_METADATA_STRING_ID == metricId) {
return; // string does not exist in database
}
startMetricId = metricId;
endMetricId = metricId;
}
int startComponentId = 0;
int endComponentId = 0xFFFFFFFF;
String filterComponentId = filter.getComponentId();
if (filterComponentId != null) {
int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, filterComponentId, stringToIdCache);
if (INVALID_METADATA_STRING_ID == componentId) {
return; // string does not exist in database
}
startComponentId = componentId;
endComponentId = componentId;
}
int startExecutorId = 0;
int endExecutorId = 0xFFFFFFFF;
String filterExecutorName = filter.getExecutorId();
if (filterExecutorName != null) {
int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, filterExecutorName, stringToIdCache);
if (INVALID_METADATA_STRING_ID == executorId) {
return; // string does not exist in database
}
startExecutorId = executorId;
endExecutorId = executorId;
}
int startHostId = 0;
int endHostId = 0xFFFFFFFF;
String filterHostId = filter.getHostId();
if (filterHostId != null) {
int hostId = lookupMetadataString(KeyType.HOST_STRING, filterHostId, stringToIdCache);
if (INVALID_METADATA_STRING_ID == hostId) {
return; // string does not exist in database
}
startHostId = hostId;
endHostId = hostId;
}
int startPort = 0;
int endPort = 0xFFFFFFFF;
Integer filterPort = filter.getPort();
if (filterPort != null) {
startPort = filterPort;
endPort = filterPort;
}
int startStreamId = 0;
int endStreamId = 0xFFFFFFFF;
String filterStreamId = filter.getStreamId();
if (filterStreamId != null) {
int streamId = lookupMetadataString(KeyType.HOST_STRING, filterStreamId, stringToIdCache);
if (INVALID_METADATA_STRING_ID == streamId) {
return; // string does not exist in database
}
startStreamId = streamId;
endStreamId = streamId;
}
try (ReadOptions ro = new ReadOptions()) {
ro.setTotalOrderSeek(true);
for (AggLevel aggLevel : filter.getAggLevels()) {
RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
startComponentId, startExecutorId, startHostId, startPort, startStreamId);
RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
endComponentId, endExecutorId, endHostId, endPort, endStreamId);
try (RocksIterator iterator = db.newIterator(ro)) {
for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) {
RocksDbKey key = new RocksDbKey(iterator.key());
if (key.compareTo(endKey) > 0) { // past limit, quit
break;
}
if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
continue;
}
long timestamp = key.getTimestamp();
if (timestamp < startTime || timestamp > endTime) {
continue;
}
if (startMetricId != 0 && key.getMetricId() != startMetricId) {
continue;
}
if (startComponentId != 0 && key.getComponentId() != startComponentId) {
continue;
}
if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
continue;
}
if (startHostId != 0 && key.getHostnameId() != startHostId) {
continue;
}
if (startPort != 0 && key.getPort() != startPort) {
continue;
}
if (startStreamId != 0 && key.getStreamId() != startStreamId) {
continue;
}
RocksDbValue val = new RocksDbValue(iterator.value());
if (scanCallback != null) {
try {
// populate a metric
String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);
Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
streamId, key.getPort(), aggLevel);
val.populateMetric(metric);
// callback to caller
scanCallback.cb(metric);
} catch (MetricException e) {
LOG.warn("Failed to report found metric: {}", e.getMessage());
}
} else {
try {
if (!rawCallback.cb(key, val)) {
return;
}
} catch (RocksDBException e) {
throw new MetricException("Error reading metrics data", e);
}
}
}
}
}
}
}