in core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java [79:116]
public void importfromCSVtoES() {
String clickStreamMatrixType = props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE);
String esIndexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
es.deleteType(esIndexName, clickStreamMatrixType);
es.createBulkProcessor();
BufferedReader br = null;
String cvsSplitBy = ",";
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream(props.getProperty(MudrodConstants.CLICKSTREAM_PATH)), StandardCharsets.UTF_8));
String line = br.readLine();
// first item needs to be skipped
String[] dataList = line.split(cvsSplitBy);
while ((line = br.readLine()) != null) {
String[] clicks = line.split(cvsSplitBy);
for (int i = 1; i < clicks.length; i++) {
if (!"0.0".equals(clicks[i])) {
IndexRequest ir = new IndexRequest(esIndexName, clickStreamMatrixType)
.source(jsonBuilder().startObject().field("query", clicks[0]).field(
"dataID", dataList[i]).field("clicks", clicks[i]).endObject());
es.getBulkProcessor().add(ir);
}
}
}
} catch (IOException e) {
LOG.error("Import click stream CSV into Elasticsearch failed : ", e);
} finally {
if (br != null) {
try {
br.close();
es.destroyBulkProcessor();
} catch (IOException e) {
//Ignore
}
}
}
}