in src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java [283:371]
private ActionListener<MultiGetResponse> onMultiGetResponse(
ActionListener<GetAnomalyDetectorResponse> listener,
boolean returnJob,
boolean returnTask,
Optional<ADTask> realtimeAdTask,
Optional<ADTask> historicalAdTask,
String detectorId
) {
return new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse multiGetResponse) {
MultiGetItemResponse[] responses = multiGetResponse.getResponses();
AnomalyDetector detector = null;
AnomalyDetectorJob adJob = null;
String id = null;
long version = 0;
long seqNo = 0;
long primaryTerm = 0;
for (MultiGetItemResponse response : responses) {
if (ANOMALY_DETECTORS_INDEX.equals(response.getIndex())) {
if (response.getResponse() == null || !response.getResponse().isExists()) {
listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, RestStatus.NOT_FOUND));
return;
}
id = response.getId();
version = response.getResponse().getVersion();
primaryTerm = response.getResponse().getPrimaryTerm();
seqNo = response.getResponse().getSeqNo();
if (!response.getResponse().isSourceEmpty()) {
try (
XContentParser parser = RestHandlerUtils
.createXContentParserFromRegistry(xContentRegistry, response.getResponse().getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
detector = parser.namedObject(AnomalyDetector.class, AnomalyDetector.PARSE_FIELD_NAME, null);
} catch (Exception e) {
String message = "Failed to parse detector job " + detectorId;
listener.onFailure(buildInternalServerErrorResponse(e, message));
return;
}
}
}
if (ANOMALY_DETECTOR_JOB_INDEX.equals(response.getIndex())) {
if (response.getResponse() != null
&& response.getResponse().isExists()
&& !response.getResponse().isSourceEmpty()) {
try (
XContentParser parser = RestHandlerUtils
.createXContentParserFromRegistry(xContentRegistry, response.getResponse().getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
adJob = AnomalyDetectorJob.parse(parser);
} catch (Exception e) {
String message = "Failed to parse detector job " + detectorId;
listener.onFailure(buildInternalServerErrorResponse(e, message));
return;
}
}
}
}
listener
.onResponse(
new GetAnomalyDetectorResponse(
version,
id,
primaryTerm,
seqNo,
detector,
adJob,
returnJob,
realtimeAdTask.orElse(null),
historicalAdTask.orElse(null),
returnTask,
RestStatus.OK,
null,
null,
false
)
);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
}