in plugin/src/main/java/org/opensearch/ml/indices/MLInputDatasetHandler.java [56:87]
public void parseSearchQueryInput(MLInputDataset mlInputDataset, ActionListener<DataFrame> listener) {
if (!mlInputDataset.getInputDataType().equals(MLInputDataType.SEARCH_QUERY)) {
throw new IllegalArgumentException("Input dataset is not SEARCH_QUERY type.");
}
SearchQueryInputDataset inputDataset = (SearchQueryInputDataset) mlInputDataset;
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(inputDataset.getSearchSourceBuilder());
List<String> indicesList = inputDataset.getIndices();
String[] indices = new String[indicesList.size()];
indices = indicesList.toArray(indices);
searchRequest.indices(indices);
client.search(searchRequest, ActionListener.wrap(r -> {
if (r == null || r.getHits() == null || r.getHits().getTotalHits() == null || r.getHits().getTotalHits().value == 0) {
listener.onFailure(new IllegalArgumentException("No document found"));
return;
}
SearchHits hits = r.getHits();
List<Map<String, Object>> input = new ArrayList<>();
SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
input.add(hit.getSourceAsMap());
}
DataFrame dataFrame = DataFrameBuilder.load(input);
listener.onResponse(dataFrame);
return;
}, e -> {
log.error("Failed to search" + e);
listener.onFailure(e);
}));
return;
}