in streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java [126:172]
public void write(StreamsDatum streamsDatum) {
ObjectNode node;
byte[] row;
if (StringUtils.isNotBlank(streamsDatum.getId())) {
row = streamsDatum.getId().getBytes();
} else {
row = GuidUtils.generateGuid(streamsDatum.toString()).getBytes();
}
Put put = new Put(row);
if ( streamsDatum.getDocument() instanceof String ) {
try {
node = mapper.readValue((String)streamsDatum.getDocument(), ObjectNode.class);
} catch (IOException ex) {
ex.printStackTrace();
LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
return;
}
try {
byte[] value = node.binaryValue();
bufferedMutator.mutate(put);
} catch (IOException ex) {
ex.printStackTrace();
LOGGER.warn("Failure adding object: {}", streamsDatum.getDocument().toString());
return;
}
} else {
try {
node = mapper.valueToTree(streamsDatum.getDocument());
} catch (Exception ex) {
ex.printStackTrace();
LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
return;
}
put.setId(GuidUtils.generateGuid(node.toString()));
}
try {
bufferedMutator.mutate(put);
} catch (IOException ex) {
ex.printStackTrace();
LOGGER.warn("Failure preparing put: {}", streamsDatum.getDocument().toString());
return;
}
}