in stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java [663:748]
private void inspectStreams(List<String> streams,
int tid,
int numStreamsPerThreads,
SortedMap<String, List<Pair<LogSegmentMetadata, List<String>>>> corruptedCandidates)
throws Exception {
int startIdx = tid * numStreamsPerThreads;
int endIdx = Math.min(streams.size(), (tid + 1) * numStreamsPerThreads);
for (int i = startIdx; i < endIdx; i++) {
String s = streams.get(i);
BookKeeperClient bkc = getBookKeeperClient();
DistributedLogManager dlm = getNamespace().openLog(s);
try {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
if (segments.size() <= 1) {
continue;
}
boolean isCandidate = false;
if (checkInprogressOnly) {
Set<Long> inprogressSeqNos = new HashSet<Long>();
for (LogSegmentMetadata segment : segments) {
if (segment.isInProgress()) {
inprogressSeqNos.add(segment.getLogSegmentSequenceNumber());
}
}
for (LogSegmentMetadata segment : segments) {
if (!segment.isInProgress()
&& inprogressSeqNos.contains(segment.getLogSegmentSequenceNumber())) {
isCandidate = true;
}
}
} else {
LogSegmentMetadata firstSegment = segments.get(0);
long lastSeqNo = firstSegment.getLogSegmentSequenceNumber();
for (int j = 1; j < segments.size(); j++) {
LogSegmentMetadata nextSegment = segments.get(j);
if (lastSeqNo + 1 != nextSegment.getLogSegmentSequenceNumber()) {
isCandidate = true;
break;
}
++lastSeqNo;
}
}
if (isCandidate) {
if (orderByTime) {
Collections.sort(segments, LOGSEGMENT_COMPARATOR_BY_TIME);
}
List<Pair<LogSegmentMetadata, List<String>>> ledgers =
new ArrayList<Pair<LogSegmentMetadata, List<String>>>();
for (LogSegmentMetadata seg : segments) {
LogSegmentMetadata segment = seg;
List<String> dumpedEntries = new ArrayList<String>();
if (segment.isInProgress()) {
LedgerHandle lh = bkc.get().openLedgerNoRecovery(segment.getLogSegmentId(),
BookKeeper.DigestType.CRC32, dlConf.getBKDigestPW().getBytes(UTF_8));
try {
long lac = lh.readLastConfirmed();
segment = segment.mutator().setLastEntryId(lac).build();
if (printInprogressOnly && dumpEntries && lac >= 0) {
Enumeration<LedgerEntry> entries = lh.readEntries(0L, lac);
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
dumpedEntries.add(new String(entry.getEntry(), UTF_8));
}
}
} finally {
lh.close();
}
}
if (printInprogressOnly) {
if (segment.isInProgress()) {
ledgers.add(Pair.of(segment, dumpedEntries));
}
} else {
ledgers.add(Pair.of(segment, EMPTY_LIST));
}
}
synchronized (corruptedCandidates) {
corruptedCandidates.put(s, ledgers);
}
}
} finally {
dlm.close();
}
}
}