in src/main/java/com/datacompare/service/ExecuteChunk.java [412:483]
public void run() {
try {
new MemoryUtil().displayMemoryInfo();
Thread.currentThread().setName("Executing Chunk No " + getChunkNo()+1);
FetchData fetchSourceData = new FetchData(getSourceDBType(), null, getSourceSql(), getSourceChunk(),
getSourceConnection(), getSourceTableMetadata(), null, getAppProperties());
fetchSourceData.setTimeTaken(getSourceTimeTaken());
FetchData fetchTargetData = new FetchData(getTargetDBType(), getSourceDBType(), getTargetSql(),
getTargetChunk(), getTargetConnection(), getTargetTableMetadata(), getSourceTableMetadata(),
getAppProperties());
fetchTargetData.setTimeTaken(getTargetTimeTaken());
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(fetchSourceData);
executor.execute(fetchTargetData);
executor.shutdown();
while (!executor.isTerminated()) {
}
Long srcCnt = Long.valueOf(fetchSourceData.getHashMap().size());
getSourceCount().add(srcCnt);
Long tarCnt = Long.valueOf(fetchTargetData.getHashMap().size());
getTargetCount().add(tarCnt);
CompareData compareData = new CompareData(fetchSourceData.getHashMap(), fetchTargetData.getHashMap(),
getChunkNo(), getNumberOfChunks());
executor = Executors.newFixedThreadPool(1);
executor.execute(compareData);
executor.shutdown();
while (!executor.isTerminated()) {
}
String result = compareData.getResult();
if(!"Completed".equals(result)) {
setResult(result);
}
List<String> failTuple = compareData.getFailTuple();
getFailTuple().addAll(failTuple);
Map<String, String> sourceData = compareData.getSourceData();
getSourceData().putAll(sourceData);
Map<String, String> targetData = compareData.getTargetData();
getTargetData().putAll(targetData);
fetchSourceData.getHashMap().clear();
fetchTargetData.getHashMap().clear();
fetchSourceData = null;
fetchTargetData = null;
executor = null;
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}