protected List filterNotExistColumnFamilyEdits()

in hbase-endpoint/src/main/java/com/amazonaws/hbase/StreamingReplicationEndpoint.java [206:264]


	protected List<Entry> filterNotExistColumnFamilyEdits(final List<Entry> oldEntries) {
		if (configUtil.isDropOnDeletedColumnFamilies() == false) {
			return oldEntries;
		}
		List<Entry> entries = new ArrayList<Entry>();
		Map<TableName, Set<byte[]>> existColumnFamilyMap = new HashMap<>();


		for (Entry e : oldEntries ) {
			TableName tableName = e.getKey().getTableName();
			if (!existColumnFamilyMap.containsKey(tableName)) {
				try {
					Set<byte[]> cfs = localAdmin.getDescriptor(tableName).getColumnFamilyNames();
					existColumnFamilyMap.put(tableName, cfs);
				} catch (Exception ex) {
					LOG.warn("Exception getting cf names for local table {}", tableName, ex);
					// if catch any exception, we are not sure about table's description,
					// so replicate raw entry
					entries.add(e);
					continue;
				}
			}

			Set<byte[]> existColumnFamilies = existColumnFamilyMap.get(tableName);
			Set<byte[]> missingCFs = new HashSet<>();
			WALEdit walEdit = new WALEdit();

			for (Cell cell: e.getEdit().getCells() ) {
				if (configUtil.isReplicationBulkLoadDataEnabled() && Bytes.equals(cell.getQualifierArray(),"HBASE::BULK_LOAD".getBytes())) {
					walEdit.add(cell);
					continue;
				}
				if (existColumnFamilies.contains(CellUtil.cloneFamily(cell))) {
					walEdit.add(cell);
				} else {
					missingCFs.add(CellUtil.cloneFamily(cell));
				}
			}

			if (!walEdit.isEmpty()) {
				Entry newEntry = new Entry(e.getKey(), walEdit);
				entries.add(newEntry);
			}

			if (!missingCFs.isEmpty()) {
				// Would potentially be better to retry in one of the outer loops
				// and add a table filter there; but that would break the encapsulation,
				// so we're doing the filtering here.
				LOG.warn(
						"Missing column family detected at replicate, local column family also does not exist,"
								+ " filtering edits for table '{}',column family '{}'", tableName, missingCFs);
			}
		}
		long count = oldEntries.size() - entries.size();
		if (count > 0) {
			LOG.warn("ColumnFamilyFitred records: {}",Long.toString(count));
		}
		return entries;
	}