in hbase-endpoint/src/main/java/com/amazonaws/hbase/StreamingReplicationEndpoint.java [206:264]
protected List<Entry> filterNotExistColumnFamilyEdits(final List<Entry> oldEntries) {
if (configUtil.isDropOnDeletedColumnFamilies() == false) {
return oldEntries;
}
List<Entry> entries = new ArrayList<Entry>();
Map<TableName, Set<byte[]>> existColumnFamilyMap = new HashMap<>();
for (Entry e : oldEntries ) {
TableName tableName = e.getKey().getTableName();
if (!existColumnFamilyMap.containsKey(tableName)) {
try {
Set<byte[]> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames();
existColumnFamilyMap.put(tableName, cfs);
} catch (Exception ex) {
LOG.warn("Exception getting cf names for local table {}", tableName, ex);
// if catch any exception, we are not sure about table's description,
// so replicate raw entry
entries.add(e);
continue;
}
}
Set<byte[]> existColumnFamilies = existColumnFamilyMap.get(tableName);
Set<byte[]> missingCFs = new HashSet<>();
WALEdit walEdit = new WALEdit();
for (Cell cell: e.getEdit().getCells() ) {
if (configUtil.isReplicationBulkLoadDataEnabled() && Bytes.equals(cell.getQualifierArray(),"HBASE::BULK_LOAD".getBytes())) {
walEdit.add(cell);
continue;
}
if (existColumnFamilies.contains(CellUtil.cloneFamily(cell))) {
walEdit.add(cell);
} else {
missingCFs.add(CellUtil.cloneFamily(cell));
}
}
if (!walEdit.isEmpty()) {
Entry newEntry = new Entry(e.getKey(), walEdit);
entries.add(newEntry);
}
if (!missingCFs.isEmpty()) {
// Would potentially be better to retry in one of the outer loops
// and add a table filter there; but that would break the encapsulation,
// so we're doing the filtering here.
LOG.warn(
"Missing column family detected at replicate, local column family also does not exist,"
+ " filtering edits for table '{}',column family '{}'", tableName, missingCFs);
}
}
long count = oldEntries.size() - entries.size();
if (count > 0) {
LOG.warn("ColumnFamilyFitred records: {}",Long.toString(count));
}
return entries;
}