in src/main/java/com/datacompare/service/CompareService.java [562:713]
private CompareResult compareDetailData(AppProperties appProperties, Connection sourceConn, Connection targetConn,
String schemaName, String tableName, List<String> columnList) {
String sourceDBType = appProperties.getSourceDBType().toUpperCase();
int maxNoofThreads = appProperties.getMaxNoofThreads();
boolean displayCompleteData = appProperties.isDisplayCompleteData();
CompareResult dto = new CompareResult();
long start = System.currentTimeMillis();
StringBuilder info = new StringBuilder();
// Get the Java runtime
// Runtime runtime = Runtime.getRuntime();
long usedMemory = 0;
try {
checkIfTableExistsInPg(schemaName.toLowerCase(), tableName.toLowerCase(), "POSTGRESQL", targetConn);
FetchMetadata fetchSourceMetadata = new FetchMetadata(sourceDBType, null, sourceConn,
schemaName.toUpperCase(), tableName.toUpperCase(), 0, null, null, false, null, columnList, appProperties);
FetchMetadata fetchTargetMetadata = new FetchMetadata("POSTGRESQL", sourceDBType, targetConn,
schemaName.toLowerCase(), tableName.toLowerCase(), fetchSourceMetadata.getRowCount(),
fetchSourceMetadata.getSortKey(), fetchSourceMetadata.getPrimaryKey(),
fetchSourceMetadata.isHasNoUniqueKey(),
fetchSourceMetadata.getTableMetadataMap(), columnList, appProperties);
List<String> sourceChunks = fetchSourceMetadata.getChunks();
info.append("Schema: ");
info.append(schemaName);
info.append(" , Table: ");
info.append(tableName);
info.append(" , No Of Chunks: ");
info.append(sourceChunks.size());
logger.info(info.toString());
int numChunks = sourceChunks.size();
int i;
info = new StringBuilder();
info.append("No Of Chunks to run: ");
info.append(numChunks);
info.append("\n###############################################################\n");
logger.info(info.toString());
Map<String, String> mismatchSourceData = new ConcurrentHashMap<String, String>();
Map<String, String> mismatchTargetData = new ConcurrentHashMap<String, String>();
List<String> failTuple = Collections.synchronizedList(new ArrayList<String>());
String result = "Completed";
List<Long> sourceCountList = Collections.synchronizedList(new ArrayList<Long>());
List<Long> targetCountList = Collections.synchronizedList(new ArrayList<Long>());
List<Long> sourceTimeTaken = Collections.synchronizedList(new ArrayList<Long>());
List<Long> targetTimeTaken = Collections.synchronizedList(new ArrayList<Long>());
ExecutorService executor = Executors.newFixedThreadPool(maxNoofThreads);
for (i = 0; i < numChunks; i++) {
String targetChunk = fetchSourceMetadata.isHasNoUniqueKey()
? getTargetChunkWhenNoUniqueKey(sourceChunks.get(i)) : sourceChunks.get(i);
ExecuteChunk executeChunk = new ExecuteChunk(sourceDBType, "POSTGRESQL", sourceChunks.get(i),
targetChunk, fetchSourceMetadata.getSql(), fetchTargetMetadata.getSql(), i, numChunks,
sourceConn, targetConn, fetchSourceMetadata.getTableMetadataMap(),
fetchTargetMetadata.getTableMetadataMap(), appProperties);
executeChunk.setSourceData(mismatchSourceData);
executeChunk.setTargetData(mismatchTargetData);
executeChunk.setFailTuple(failTuple);
executeChunk.setResult(result);
executeChunk.setSourceCount(sourceCountList);
executeChunk.setTargetCount(targetCountList);
executeChunk.setSourceTimeTaken(sourceTimeTaken);
executeChunk.setTargetTimeTaken(targetTimeTaken);
executor.execute(executeChunk);
}
executor.shutdown();
while (!executor.isTerminated()) {
}
logger.info("Finished all chunks");
logTimeTaken(sourceTimeTaken, targetTimeTaken);
dto.setResult(result);
dto.setFailTuple(failTuple);
info = new StringBuilder();
info.append("\n----------------------------------------------------\n");
info.append("Finished analyzing the data for ");
info.append(schemaName);
info.append(".");
info.append(tableName);
logger.info(info.toString());
long sourceCount = getCount(sourceCountList);
long targetCount = getCount(targetCountList);
// long sourceTotalRowCount = fetchSourceMetadata.getRowCount();
// dto.setRowCountSource(sourceTotalRowCount);
dto.setRowCountSource(sourceCount);
// long targetTotalRowCount = fetchTargetMetadata.getRowCount();
// dto.setRowCountTarget(targetTotalRowCount);
dto.setRowCountTarget(targetCount);
dto.setTableName(tableName);
writeDataToFile(fetchSourceMetadata, mismatchSourceData, fetchTargetMetadata, mismatchTargetData, dto,
schemaName, displayCompleteData);
} catch (Exception ex) {
logger.error(ex.getMessage(), ex);
if (dto.getReason() == null) {
dto.setReason(ex.getMessage());
}
dto.setTableName(tableName);
dto.setResult("Failed");
}
long end = System.currentTimeMillis();
long timeTaken = end - start;
dto.setTimeTaken(timeTaken/1000 );
dto.setUsedMemory(usedMemory);
info = new StringBuilder();
info.append("\n----------------------------------------------------\n");
info.append("Finished writing comparison results for ");
info.append(schemaName);
info.append(".");
info.append(tableName);
logger.info(info.toString());
return dto;
}