in oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java [276:401]
public int run() {
Stopwatch watch = Stopwatch.createStarted();
SegmentStoreType srcType = storeTypeFromPathOrUri(source);
SegmentStoreType destType = storeTypeFromPathOrUri(destination);
String srcDescription = storeDescription(srcType, source);
String destDescription = storeDescription(destType, destination);
if (flat && destType == SegmentStoreType.TAR) {
try {
srcPersistence = newSegmentNodeStorePersistence(srcType, source);
SegmentArchiveManager sourceManager = srcPersistence.createArchiveManager(false, false,
new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
int maxArchives = maxSizeGb * 4;
int count = 0;
List<String> archivesList = sourceManager.listArchives();
archivesList.sort(Collections.reverseOrder());
for (String archiveName : archivesList) {
if (count == maxArchives - 1) {
printMessage(outWriter, "Stopping transfer after reaching {0} GB at archive {1}", maxSizeGb,
archiveName);
break;
}
printMessage(outWriter, "{0}/{1} -> {2}", source, archiveName, destination);
SegmentArchiveReader reader = sourceManager.forceOpen(archiveName);
List<Future<Segment>> futures = new ArrayList<>();
for (SegmentArchiveEntry entry : reader.listSegments()) {
futures.add(executor.submit(() -> RETRIER.execute(() -> {
Segment segment = new Segment(entry);
segment.read(reader);
return segment;
})));
}
File directory = new File(destination);
directory.mkdir();
for (Future<Segment> future : futures) {
Segment segment = future.get();
RETRIER.execute(() -> {
final byte[] array = segment.data.array();
String segmentId = new UUID(segment.entry.getMsb(), segment.entry.getLsb()).toString();
File segmentFile = new File(directory, segmentId);
File tempSegmentFile = new File(directory, segmentId + System.nanoTime() + ".part");
Buffer buffer = Buffer.wrap(array);
Buffer bufferCopy = buffer.duplicate();
try {
try (FileChannel channel = new FileOutputStream(tempSegmentFile).getChannel()) {
bufferCopy.write(channel);
}
try {
Files.move(tempSegmentFile.toPath(), segmentFile.toPath(),
StandardCopyOption.ATOMIC_MOVE);
} catch (AtomicMoveNotSupportedException e) {
Files.move(tempSegmentFile.toPath(), segmentFile.toPath());
}
} catch (Exception e) {
printMessage(errWriter, "Error writing segment {0} to cache: {1} ", segmentId, e);
e.printStackTrace(errWriter);
try {
Files.deleteIfExists(segmentFile.toPath());
Files.deleteIfExists(tempSegmentFile.toPath());
} catch (IOException i) {
printMessage(errWriter, "Error while deleting corrupted segment file {0} {1}",
segmentId, i);
}
}
});
}
count++;
}
} catch (IOException | InterruptedException | ExecutionException e) {
watch.stop();
printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source,
destination);
e.printStackTrace(errWriter);
return 1;
}
} else {
try {
if (srcPersistence == null || destPersistence == null) {
srcPersistence = newSegmentNodeStorePersistence(srcType, source);
destPersistence = newSegmentNodeStorePersistence(destType, destination);
}
printMessage(outWriter, "Started segment-copy transfer!");
printMessage(outWriter, "Source: {0}", srcDescription);
printMessage(outWriter, "Destination: {0}", destDescription);
SegmentStoreMigrator.Builder migratorBuilder = new SegmentStoreMigrator.Builder()
.withSourcePersistence(srcPersistence, srcDescription)
.withTargetPersistence(destPersistence, destDescription)
.withRevisionCount(revisionCount);
if (appendMode) {
migratorBuilder.setAppendMode();
}
migratorBuilder.build().migrate();
} catch (Exception e) {
watch.stop();
printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source,
destination);
e.printStackTrace(errWriter);
return 1;
}
}
watch.stop();
printMessage(outWriter, "Segment-copy succeeded in {0}", printableStopwatch(watch));
return 0;
}