in hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java [570:689]
public Response getContainerMisMatchInsights(
@DefaultValue(DEFAULT_FETCH_COUNT)
@QueryParam(RECON_QUERY_LIMIT) int limit,
@DefaultValue(PREV_CONTAINER_ID_DEFAULT_VALUE)
@QueryParam(RECON_QUERY_PREVKEY) long prevKey,
@DefaultValue(DEFAULT_FILTER_FOR_MISSING_CONTAINERS)
@QueryParam(RECON_QUERY_FILTER) String missingIn) {
if (prevKey < 0 || limit < 0) {
// Send back an empty response
return Response.status(Response.Status.NOT_ACCEPTABLE).build();
}
List<ContainerDiscrepancyInfo> containerDiscrepancyInfoList =
new ArrayList<>();
Long minContainerID = prevKey + 1;
Iterator<ContainerInfo> scmNonDeletedContainers =
containerManager.getContainers().stream()
.filter(containerInfo -> (containerInfo.getContainerID() >= minContainerID))
.filter(containerInfo -> containerInfo.getState() != HddsProtos.LifeCycleState.DELETED)
.sorted(Comparator.comparingLong(ContainerInfo::getContainerID)).iterator();
ContainerInfo scmContainerInfo = scmNonDeletedContainers.hasNext() ?
scmNonDeletedContainers.next() : null;
DataFilter dataFilter = DataFilter.fromValue(missingIn.toUpperCase());
try (SeekableIterator<Long, ContainerMetadata> omContainers =
reconContainerMetadataManager.getContainersIterator()) {
omContainers.seek(minContainerID);
ContainerMetadata containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
switch (dataFilter) {
case SCM:
List<ContainerMetadata> notSCMContainers = new ArrayList<>();
while (containerMetadata != null && notSCMContainers.size() < limit) {
Long omContainerID = containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo == null ? null : scmContainerInfo.getContainerID();
if (omContainerID.equals(scmContainerID)) {
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
} else if (scmContainerID == null || omContainerID.compareTo(scmContainerID) < 0) {
notSCMContainers.add(containerMetadata);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else {
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
}
}
notSCMContainers.forEach(nonSCMContainer -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo =
new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(nonSCMContainer.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(
nonSCMContainer.getNumberOfKeys());
containerDiscrepancyInfo.setPipelines(
nonSCMContainer.getPipelines());
containerDiscrepancyInfo.setExistsAt("OM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;
case OM:
List<ContainerInfo> nonOMContainers = new ArrayList<>();
while (scmContainerInfo != null && nonOMContainers.size() < limit) {
Long omContainerID = containerMetadata == null ? null : containerMetadata.getContainerID();
Long scmContainerID = scmContainerInfo.getContainerID();
if (scmContainerID.equals(omContainerID)) {
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
} else if (omContainerID == null || scmContainerID.compareTo(omContainerID) < 0) {
nonOMContainers.add(scmContainerInfo);
scmContainerInfo = scmNonDeletedContainers.hasNext() ? scmNonDeletedContainers.next() : null;
} else {
//Seeking directly to SCM containerId sequential read is just wasteful here if there are too many values
// to be read in b/w omContainerID & scmContainerID since (omContainerId<scmContainerID)
omContainers.seek(scmContainerID);
containerMetadata = omContainers.hasNext() ? omContainers.next() : null;
}
}
List<Pipeline> pipelines = new ArrayList<>();
nonOMContainers.forEach(containerInfo -> {
ContainerDiscrepancyInfo containerDiscrepancyInfo = new ContainerDiscrepancyInfo();
containerDiscrepancyInfo.setContainerID(containerInfo.getContainerID());
containerDiscrepancyInfo.setNumberOfKeys(0);
PipelineID pipelineID = null;
try {
pipelineID = containerInfo.getPipelineID();
if (pipelineID != null) {
pipelines.add(pipelineManager.getPipeline(pipelineID));
}
} catch (PipelineNotFoundException e) {
LOG.debug(
"Pipeline not found for container: {} and pipelineId: {}",
containerInfo, pipelineID, e);
}
containerDiscrepancyInfo.setPipelines(pipelines);
containerDiscrepancyInfo.setExistsAt("SCM");
containerDiscrepancyInfoList.add(containerDiscrepancyInfo);
});
break;
default:
// Invalid filter parameter value
return Response.status(Response.Status.BAD_REQUEST).build();
}
} catch (IllegalArgumentException e) {
throw new WebApplicationException(e, Response.Status.BAD_REQUEST);
} catch (Exception ex) {
throw new WebApplicationException(ex,
Response.Status.INTERNAL_SERVER_ERROR);
}
Map<String, Object> response = new HashMap<>();
if (!containerDiscrepancyInfoList.isEmpty()) {
response.put("lastKey", containerDiscrepancyInfoList.get(
containerDiscrepancyInfoList.size() - 1).getContainerID());
} else {
response.put("lastKey", null);
}
response.put("containerDiscrepancyInfo", containerDiscrepancyInfoList);
return Response.ok(response).build();
}