public void write()

in streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java [126:172]


  public void write(StreamsDatum streamsDatum) {

    ObjectNode node;
    byte[] row;
    if (StringUtils.isNotBlank(streamsDatum.getId())) {
      row = streamsDatum.getId().getBytes();
    } else {
      row = GuidUtils.generateGuid(streamsDatum.toString()).getBytes();
    }
    Put put = new Put(row);
    if ( streamsDatum.getDocument() instanceof String ) {
      try {
        node = mapper.readValue((String)streamsDatum.getDocument(), ObjectNode.class);
      } catch (IOException ex) {
        ex.printStackTrace();
        LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
        return;
      }
      try {
        byte[] value = node.binaryValue();
        bufferedMutator.mutate(put);
      } catch (IOException ex) {
        ex.printStackTrace();
        LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
        return;
      }
    } else {
      try {
        node = mapper.valueToTree(streamsDatum.getDocument());
      } catch (Exception ex) {
        ex.printStackTrace();
        LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
        return;
      }
      put.setId(GuidUtils.generateGuid(node.toString()));

    }

    try {
      bufferedMutator.mutate(put);
    } catch (IOException ex) {
      ex.printStackTrace();
      LOGGER.warn("Failure preparing put: {}", streamsDatum.getDocument().toString());
      return;
    }

  }