in lang/java/tools/src/main/java/org/apache/avro/tool/DataFileRepairTool.java [149:264]
private int innerRecover(DataFileReader<Object> fileReader, DataFileWriter<Object> fileWriter, PrintStream out,
PrintStream err, boolean recoverPrior, boolean recoverAfter, Schema schema, File outfile) {
int numBlocks = 0;
int numCorruptBlocks = 0;
int numRecords = 0;
int numCorruptRecords = 0;
int recordsWritten = 0;
long position = fileReader.previousSync();
long blockSize = 0;
long blockCount = 0;
boolean fileWritten = false;
try {
while (true) {
try {
if (!fileReader.hasNext()) {
out.println("File Summary: ");
out.println(" Number of blocks: " + numBlocks + " Number of corrupt blocks: " + numCorruptBlocks);
out.println(" Number of records: " + numRecords + " Number of corrupt records: " + numCorruptRecords);
if (recoverAfter || recoverPrior) {
out.println(" Number of records written " + recordsWritten);
}
out.println();
return 0;
}
position = fileReader.previousSync();
blockCount = fileReader.getBlockCount();
blockSize = fileReader.getBlockSize();
numRecords += blockCount;
long blockRemaining = blockCount;
numBlocks++;
boolean lastRecordWasBad = false;
long badRecordsInBlock = 0;
while (blockRemaining > 0) {
try {
Object datum = fileReader.next();
if ((recoverPrior && numCorruptBlocks == 0) || (recoverAfter && numCorruptBlocks > 0)) {
if (!fileWritten) {
try {
fileWriter.create(schema, outfile);
fileWritten = true;
} catch (Exception e) {
e.printStackTrace(err);
return 1;
}
}
try {
fileWriter.append(datum);
recordsWritten++;
} catch (Exception e) {
e.printStackTrace(err);
throw e;
}
}
blockRemaining--;
lastRecordWasBad = false;
} catch (Exception e) {
long pos = blockCount - blockRemaining;
if (badRecordsInBlock == 0) {
// first corrupt record
numCorruptBlocks++;
err.println("Corrupt block: " + numBlocks + " Records in block: " + blockCount
+ " uncompressed block size: " + blockSize);
err.println("Corrupt record at position: " + (pos));
} else {
// second bad record in block, if consecutive skip block.
err.println("Corrupt record at position: " + (pos));
if (lastRecordWasBad) {
// consecutive bad record
err.println(
"Second consecutive bad record in block: " + numBlocks + ". Skipping remainder of block. ");
numCorruptRecords += blockRemaining;
badRecordsInBlock += blockRemaining;
try {
fileReader.sync(position);
} catch (Exception e2) {
err.println("failed to sync to sync marker, aborting");
e2.printStackTrace(err);
return 1;
}
break;
}
}
blockRemaining--;
lastRecordWasBad = true;
numCorruptRecords++;
badRecordsInBlock++;
}
}
if (badRecordsInBlock != 0) {
err.println("** Number of unrecoverable records in block: " + (badRecordsInBlock));
}
position = fileReader.previousSync();
} catch (Exception e) {
err.println("Failed to read block " + numBlocks + ". Unknown record " + "count in block. Skipping. Reason: "
+ e.getMessage());
numCorruptBlocks++;
try {
fileReader.sync(position);
} catch (Exception e2) {
err.println("failed to sync to sync marker, aborting");
e2.printStackTrace(err);
return 1;
}
}
}
} finally {
if (fileWritten) {
try {
fileWriter.close();
} catch (Exception e) {
e.printStackTrace(err);
return 1;
}
}
}
}