in hbase-endpoint/src/main/java/com/amazonaws/hbase/StreamingReplicationEndpoint.java [107:175]
public boolean replicate(ReplicateContext replicateContext) {
LOG.info("replication length: " + replicateContext.getSize());
LOG.info("replication entries: " + replicateContext.getEntries().size());
String WALId = replicateContext.getWalGroupId();
List<Entry> entries = new LinkedList<Entry>();
//List<Entry> oldEntries = this.filterNotMappedToStream(replicateContext.getEntries());
List<Entry> oldEntries=this.filterNotExistColumnFamilyEdits(this.filterNotExistTableEdits(replicateContext.getEntries()));
for (Entry e: this.dataSink.filter(oldEntries)) {
Entry tmp = filters.filter(e);
if ( tmp != null ) {
entries.add(tmp);
LOG.debug("Replication entry added:" + e.getKey().toString());
} else {
LOG.debug("Replication entry Filtered:" + e.getKey().toString());
metrics.incrLogEditsFiltered();
}
}
LOG.debug("entry size Before filter " + oldEntries.size() + " after filters:" + entries.size());
try {
if ( dataSink.supportsTransaction()) {
dataSink.beginTransaction();
}
for (Entry entry : entries) {
String tname = entry.getKey().getTableName().getNameAsString();
HBaseWALEntry hbaseWALEntry = new HBaseWALEntry(entry);
ByteBuffer data;
try {
byte[] outputData;
if (this.configUtil.isCompressionEnabled()) {
outputData = gzipCompress(objectMapper.writeValueAsString(hbaseWALEntry).getBytes("UTF-8"));
} else {
outputData = objectMapper.writeValueAsString(hbaseWALEntry).getBytes("UTF-8");
}
data = ByteBuffer.wrap(outputData);
dataSink.putRecord(data,tname);
metrics.incrCompletedWAL();
metrics.setAgeOfLastShippedOp(hbaseWALEntry.getWalKey().getWriteTime(), WALId);
metrics.setAgeOfLastShippedOpByTable(hbaseWALEntry.getWalKey().getWriteTime(),hbaseWALEntry.getWalKey().getTableName());
} catch (UnsupportedEncodingException e1) {
LOG.error("Encoding is set to UTF-8 but it is not supported. " + " " + formatStackTrace(e1));
dataSink.abortTransaction();
throw new RuntimeException(e1.getStackTrace().toString());
} catch (JsonProcessingException e1) {
LOG.error("Object could not be converted to json" + " " + formatStackTrace(e1));
dataSink.abortTransaction();
throw new RuntimeException(e1.getStackTrace().toString());
} catch (IOException e1) {
e1.printStackTrace();
dataSink.abortTransaction();
return false;
}
}
dataSink.commitTransaction();
} catch ( Exception e ) {
e.printStackTrace();
LOG.error("Unhandled Exception: " + e.getMessage() + " " + formatStackTrace(e));
return false;
}
return true;
}