public boolean replicate()

in hbase-endpoint/src/main/java/com/amazonaws/hbase/StreamingReplicationEndpoint.java [107:175]


	public boolean replicate(ReplicateContext replicateContext) {
		LOG.info("replication length: " + replicateContext.getSize());
		LOG.info("replication entries: " + replicateContext.getEntries().size());
		String WALId = replicateContext.getWalGroupId();

		List<Entry> entries = new LinkedList<Entry>();
		//List<Entry> oldEntries = this.filterNotMappedToStream(replicateContext.getEntries());
		List<Entry> oldEntries=this.filterNotExistColumnFamilyEdits(this.filterNotExistTableEdits(replicateContext.getEntries()));
			
		for (Entry e: this.dataSink.filter(oldEntries)) {
			Entry tmp = filters.filter(e);
			if ( tmp != null ) { 
				entries.add(tmp); 
				LOG.debug("Replication entry added:" + e.getKey().toString());
			} else {
				LOG.debug("Replication entry Filtered:" + e.getKey().toString());
				metrics.incrLogEditsFiltered();
				
			}
		}

		LOG.debug("entry size Before filter " + oldEntries.size() + " after filters:" + entries.size());

		try {
			if ( dataSink.supportsTransaction()) {
				dataSink.beginTransaction();
			}

			for (Entry entry : entries) {
				String tname = entry.getKey().getTableName().getNameAsString();
				HBaseWALEntry hbaseWALEntry = new HBaseWALEntry(entry);
				
				ByteBuffer data;
				try {
					byte[] outputData;
					if (this.configUtil.isCompressionEnabled()) {
						outputData = gzipCompress(objectMapper.writeValueAsString(hbaseWALEntry).getBytes("UTF-8"));
					} else {
						outputData = objectMapper.writeValueAsString(hbaseWALEntry).getBytes("UTF-8");
					}
					
					data = ByteBuffer.wrap(outputData);
					dataSink.putRecord(data,tname);
					metrics.incrCompletedWAL();
					metrics.setAgeOfLastShippedOp(hbaseWALEntry.getWalKey().getWriteTime(), WALId);
					metrics.setAgeOfLastShippedOpByTable(hbaseWALEntry.getWalKey().getWriteTime(),hbaseWALEntry.getWalKey().getTableName());
				} catch (UnsupportedEncodingException e1) {
					LOG.error("Encoding is set to UTF-8 but it is not supported. " + " " + formatStackTrace(e1));
					dataSink.abortTransaction();
					throw new RuntimeException(e1.getStackTrace().toString());
				} catch (JsonProcessingException e1) {
					LOG.error("Object could not be converted to json" + " " + formatStackTrace(e1));
					dataSink.abortTransaction();
					throw new RuntimeException(e1.getStackTrace().toString());
				} catch (IOException e1) {
					e1.printStackTrace();
					dataSink.abortTransaction();
					return false;
				}
			}
			dataSink.commitTransaction();
		} catch ( Exception e ) {
			e.printStackTrace();
			LOG.error("Unhandled Exception: " + e.getMessage() + " " + formatStackTrace(e));
			return false;
		}

		return true;
	}