private void inspectStreams()

in stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java [663:748]


        private void inspectStreams(List<String> streams,
                                    int tid,
                                    int numStreamsPerThreads,
                                    SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates)
                throws Exception {
            int startIdx = tid * numStreamsPerThreads;
            int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
            for (int i = startIdx; i < endIdx; i++) {
                String s = streams.get(i);
                BookKeeperClient bkc = getBookKeeperClient();
                DistributedLogManager dlm = getNamespace().openLog(s);
                try {
                    List<LogSegmentMetadata> segments = dlm.getLogSegments();
                    if (segments.size() <= 1) {
                        continue;
                    }
                    boolean isCandidate = false;
                    if (checkInprogressOnly) {
                        Set<Long> inprogressSeqNos = new HashSet<Long>();
                        for (LogSegmentMetadata segment : segments) {
                            if (segment.isInProgress()) {
                                inprogressSeqNos.add(segment.getLogSegmentSequenceNumber());
                            }
                        }
                        for (LogSegmentMetadata segment : segments) {
                            if (!segment.isInProgress()
                                    && inprogressSeqNos.contains(segment.getLogSegmentSequenceNumber())) {
                                isCandidate = true;
                            }
                        }
                    } else {
                        LogSegmentMetadata firstSegment = segments.get(0);
                        long lastSeqNo = firstSegment.getLogSegmentSequenceNumber();

                        for (int j = 1; j < segments.size(); j++) {
                            LogSegmentMetadata nextSegment = segments.get(j);
                            if (lastSeqNo + 1 != nextSegment.getLogSegmentSequenceNumber()) {
                                isCandidate = true;
                                break;
                            }
                            ++lastSeqNo;
                        }
                    }
                    if (isCandidate) {
                        if (orderByTime) {
                            Collections.sort(segments, LOGSEGMENT_COMPARATOR_BY_TIME);
                        }
                        List<Pair<LogSegmentMetadata, List<String>>> ledgers =
                                new ArrayList<Pair<LogSegmentMetadata, List<String>>>();
                        for (LogSegmentMetadata seg : segments) {
                            LogSegmentMetadata segment = seg;
                            List<String> dumpedEntries = new ArrayList<String>();
                            if (segment.isInProgress()) {
                                LedgerHandle lh = bkc.get().openLedgerNoRecovery(segment.getLogSegmentId(),
                                        BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes(UTF_8));
                                try {
                                    long lac = lh.readLastConfirmed();
                                    segment = segment.mutator().setLastEntryId(lac).build();
                                    if (printInprogressOnly && dumpEntries && lac >= 0) {
                                        Enumeration<LedgerEntry> entries = lh.readEntries(0L, lac);
                                        while (entries.hasMoreElements()) {
                                            LedgerEntry entry = entries.nextElement();
                                            dumpedEntries.add(new String(entry.getEntry(), UTF_8));
                                        }
                                    }
                                } finally {
                                    lh.close();
                                }
                            }
                            if (printInprogressOnly) {
                                if (segment.isInProgress()) {
                                    ledgers.add(Pair.of(segment, dumpedEntries));
                                }
                            } else {
                                ledgers.add(Pair.of(segment, EMPTY_LIST));
                            }
                        }
                        synchronized (corruptedCandidates) {
                            corruptedCandidates.put(s, ledgers);
                        }
                    }
                } finally {
                    dlm.close();
                }
            }
        }