in hbase-endpoint/src/main/java/com/amazonaws/hbase/StreamingReplicationEndpoint.java [271:308]
protected List<Entry> filterNotExistTableEdits(final List<Entry> oldEntries) {
if (configUtil.isDropOnDeletedTables() == false) {
return oldEntries;
}
List<Entry> entries = new ArrayList<>();
Map<TableName, Boolean> existMap = new HashMap<>();
for (Entry e : oldEntries) {
TableName tableName = e.getKey().getTableName();
boolean exist = true;
if (existMap.containsKey(tableName)) {
exist = existMap.get(tableName);
} else {
try {
exist = localAdmin.tableExists(tableName);
existMap.put(tableName, exist);
} catch (IOException iox) {
LOG.warn("Exception checking for local table " + tableName + " " + formatStackTrace(iox));
// we can't drop edits without full assurance, so we assume table exists.
exist = true;
}
}
if (exist) {
entries.add(e);
} else {
// Would potentially be better to retry in one of the outer loops
// and add a table filter there; but that would break the encapsulation,
// so we're doing the filtering here.
LOG.warn("Missing table detected at replication, local table does not exist, "
+ "filtering edits for table '{}'", tableName);
}
}
long count = oldEntries.size() - entries.size();
if (count > 0) {
LOG.warn("DropppedTableFitred records: {}",Long.toString(count));
}
return entries;
}