private void scanInternal()

in storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java [369:543]


    private void scanInternal(FilterOptions filter, ScanCallback scanCallback, RocksDbScanCallback rawCallback) throws MetricException {

        Map<String, Integer> stringToIdCache = new HashMap<>();
        Map<Integer, String> idToStringCache = new HashMap<>();

        int startTopologyId = 0;
        int endTopologyId = 0xFFFFFFFF;
        String filterTopologyId = filter.getTopologyId();
        if (filterTopologyId != null) {
            int topologyId = lookupMetadataString(KeyType.TOPOLOGY_STRING, filterTopologyId, stringToIdCache);
            if (INVALID_METADATA_STRING_ID == topologyId) {
                return;  // string does not exist in database
            }
            startTopologyId = topologyId;
            endTopologyId = topologyId;
        }

        long startTime = filter.getStartTime();
        long endTime = filter.getEndTime();

        int startMetricId = 0;
        int endMetricId = 0xFFFFFFFF;
        String filterMetricName = filter.getMetricName();
        if (filterMetricName != null) {
            int metricId = lookupMetadataString(KeyType.METRIC_STRING, filterMetricName, stringToIdCache);
            if (INVALID_METADATA_STRING_ID == metricId) {
                return;  // string does not exist in database
            }
            startMetricId = metricId;
            endMetricId = metricId;
        }

        int startComponentId = 0;
        int endComponentId = 0xFFFFFFFF;
        String filterComponentId = filter.getComponentId();
        if (filterComponentId != null) {
            int componentId = lookupMetadataString(KeyType.COMPONENT_STRING, filterComponentId, stringToIdCache);
            if (INVALID_METADATA_STRING_ID == componentId) {
                return;  // string does not exist in database
            }
            startComponentId = componentId;
            endComponentId = componentId;
        }

        int startExecutorId = 0;
        int endExecutorId = 0xFFFFFFFF;
        String filterExecutorName = filter.getExecutorId();
        if (filterExecutorName != null) {
            int executorId = lookupMetadataString(KeyType.EXEC_ID_STRING, filterExecutorName, stringToIdCache);
            if (INVALID_METADATA_STRING_ID == executorId) {
                return;  // string does not exist in database
            }
            startExecutorId = executorId;
            endExecutorId = executorId;
        }

        int startHostId = 0;
        int endHostId = 0xFFFFFFFF;
        String filterHostId = filter.getHostId();
        if (filterHostId != null) {
            int hostId = lookupMetadataString(KeyType.HOST_STRING, filterHostId, stringToIdCache);
            if (INVALID_METADATA_STRING_ID == hostId) {
                return;  // string does not exist in database
            }
            startHostId = hostId;
            endHostId = hostId;
        }

        int startPort = 0;
        int endPort = 0xFFFFFFFF;
        Integer filterPort = filter.getPort();
        if (filterPort != null) {
            startPort = filterPort;
            endPort = filterPort;
        }

        int startStreamId = 0;
        int endStreamId = 0xFFFFFFFF;
        String filterStreamId = filter.getStreamId();
        if (filterStreamId != null) {
            int streamId = lookupMetadataString(KeyType.HOST_STRING, filterStreamId, stringToIdCache);
            if (INVALID_METADATA_STRING_ID == streamId) {
                return;  // string does not exist in database
            }
            startStreamId = streamId;
            endStreamId = streamId;
        }

        try (ReadOptions ro = new ReadOptions()) {
            ro.setTotalOrderSeek(true);

            for (AggLevel aggLevel : filter.getAggLevels()) {

                RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
                        startComponentId, startExecutorId, startHostId, startPort, startStreamId);
                RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
                        endComponentId, endExecutorId, endHostId, endPort, endStreamId);

                try (RocksIterator iterator = db.newIterator(ro)) {
                    for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) {
                        RocksDbKey key = new RocksDbKey(iterator.key());

                        if (key.compareTo(endKey) > 0) { // past limit, quit
                            break;
                        }

                        if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
                            continue;
                        }

                        long timestamp = key.getTimestamp();
                        if (timestamp < startTime || timestamp > endTime) {
                            continue;
                        }

                        if (startMetricId != 0 && key.getMetricId() != startMetricId) {
                            continue;
                        }

                        if (startComponentId != 0 && key.getComponentId() != startComponentId) {
                            continue;
                        }

                        if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
                            continue;
                        }

                        if (startHostId != 0 && key.getHostnameId() != startHostId) {
                            continue;
                        }

                        if (startPort != 0 && key.getPort() != startPort) {
                            continue;
                        }

                        if (startStreamId != 0 && key.getStreamId() != startStreamId) {
                            continue;
                        }

                        RocksDbValue val = new RocksDbValue(iterator.value());

                        if (scanCallback != null) {
                            try {
                                // populate a metric
                                String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
                                String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
                                String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
                                String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
                                String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
                                String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);

                                Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
                                        streamId, key.getPort(), aggLevel);

                                val.populateMetric(metric);

                                // callback to caller
                                scanCallback.cb(metric);
                            } catch (MetricException e) {
                                LOG.warn("Failed to report found metric: {}", e.getMessage());
                            }
                        } else {
                            try {
                                if (!rawCallback.cb(key, val)) {
                                    return;
                                }
                            } catch (RocksDBException e) {
                                throw new MetricException("Error reading metrics data", e);
                            }
                        }
                    }
                }
            }
        }
    }