public void importfromCSVtoES()

in core/src/main/java/org/apache/sdap/mudrod/ssearch/ClickStreamImporter.java [79:116]


  public void importfromCSVtoES() {
    String clickStreamMatrixType = props.getProperty(MudrodConstants.CLICK_STREAM_MATRIX_TYPE);
    String esIndexName = props.getProperty(MudrodConstants.ES_INDEX_NAME);
    es.deleteType(esIndexName, clickStreamMatrixType);
    es.createBulkProcessor();

    BufferedReader br = null;
    String cvsSplitBy = ",";

    try {
      br = new BufferedReader(new InputStreamReader(new FileInputStream(props.getProperty(MudrodConstants.CLICKSTREAM_PATH)), StandardCharsets.UTF_8));
      String line = br.readLine();
      // first item needs to be skipped
      String[] dataList = line.split(cvsSplitBy);
      while ((line = br.readLine()) != null) {
        String[] clicks = line.split(cvsSplitBy);
        for (int i = 1; i < clicks.length; i++) {
          if (!"0.0".equals(clicks[i])) {
            IndexRequest ir = new IndexRequest(esIndexName, clickStreamMatrixType)
                .source(jsonBuilder().startObject().field("query", clicks[0]).field(
                        "dataID", dataList[i]).field("clicks", clicks[i]).endObject());
            es.getBulkProcessor().add(ir);
          }
        }
      }
    } catch (IOException e) {
      LOG.error("Import click stream CSV into Elasticsearch failed : ", e);
    } finally {
      if (br != null) {
        try {
          br.close();
          es.destroyBulkProcessor();
        } catch (IOException e) {
          //Ignore
        }
      }
    }
  }