public int run()

in oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java [276:401]


    public int run() {
        Stopwatch watch = Stopwatch.createStarted();

        SegmentStoreType srcType = storeTypeFromPathOrUri(source);
        SegmentStoreType destType = storeTypeFromPathOrUri(destination);

        String srcDescription = storeDescription(srcType, source);
        String destDescription = storeDescription(destType, destination);

        if (flat && destType == SegmentStoreType.TAR) {
            try {
                srcPersistence = newSegmentNodeStorePersistence(srcType, source);

                SegmentArchiveManager sourceManager = srcPersistence.createArchiveManager(false, false,
                        new IOMonitorAdapter(), new FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());

                int maxArchives = maxSizeGb * 4;
                int count = 0;

                List<String> archivesList = sourceManager.listArchives();
                archivesList.sort(Collections.reverseOrder());

                for (String archiveName : archivesList) {
                    if (count == maxArchives - 1) {
                        printMessage(outWriter, "Stopping transfer after reaching {0} GB at archive {1}", maxSizeGb,
                                archiveName);
                        break;
                    }

                    printMessage(outWriter, "{0}/{1} -> {2}", source, archiveName, destination);

                    SegmentArchiveReader reader = sourceManager.forceOpen(archiveName);

                    List<Future<Segment>> futures = new ArrayList<>();
                    for (SegmentArchiveEntry entry : reader.listSegments()) {
                        futures.add(executor.submit(() -> RETRIER.execute(() -> {
                            Segment segment = new Segment(entry);
                            segment.read(reader);
                            return segment;
                        })));
                    }

                    File directory = new File(destination);
                    directory.mkdir();

                    for (Future<Segment> future : futures) {
                        Segment segment = future.get();
                        RETRIER.execute(() -> {
                            final byte[] array = segment.data.array();
                            String segmentId = new UUID(segment.entry.getMsb(), segment.entry.getLsb()).toString();
                            File segmentFile = new File(directory, segmentId);
                            File tempSegmentFile = new File(directory, segmentId + System.nanoTime() + ".part");
                            Buffer buffer = Buffer.wrap(array);

                            Buffer bufferCopy = buffer.duplicate();

                            try {
                                try (FileChannel channel = new FileOutputStream(tempSegmentFile).getChannel()) {
                                    bufferCopy.write(channel);
                                }
                                try {
                                    Files.move(tempSegmentFile.toPath(), segmentFile.toPath(),
                                            StandardCopyOption.ATOMIC_MOVE);
                                } catch (AtomicMoveNotSupportedException e) {
                                    Files.move(tempSegmentFile.toPath(), segmentFile.toPath());
                                }
                            } catch (Exception e) {
                                printMessage(errWriter, "Error writing segment {0} to cache: {1} ", segmentId, e);
                                e.printStackTrace(errWriter);
                                try {
                                    Files.deleteIfExists(segmentFile.toPath());
                                    Files.deleteIfExists(tempSegmentFile.toPath());
                                } catch (IOException i) {
                                    printMessage(errWriter, "Error while deleting corrupted segment file {0} {1}",
                                            segmentId, i);
                                }
                            }
                        });
                    }

                    count++;
                }
            } catch (IOException | InterruptedException | ExecutionException e) {
                watch.stop();
                printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source,
                        destination);
                e.printStackTrace(errWriter);
                return 1;
            }
        } else {
            try {
                if (srcPersistence == null || destPersistence == null) {
                    srcPersistence = newSegmentNodeStorePersistence(srcType, source);
                    destPersistence = newSegmentNodeStorePersistence(destType, destination);
                }

                printMessage(outWriter, "Started segment-copy transfer!");
                printMessage(outWriter, "Source: {0}", srcDescription);
                printMessage(outWriter, "Destination: {0}", destDescription);

                SegmentStoreMigrator.Builder migratorBuilder = new SegmentStoreMigrator.Builder()
                        .withSourcePersistence(srcPersistence, srcDescription)
                        .withTargetPersistence(destPersistence, destDescription)
                        .withRevisionCount(revisionCount);

                if (appendMode) {
                    migratorBuilder.setAppendMode();
                }

                migratorBuilder.build().migrate();

            } catch (Exception e) {
                watch.stop();
                printMessage(errWriter, "A problem occured while copying archives from {0} to {1} ", source,
                        destination);
                e.printStackTrace(errWriter);
                return 1;
            }

        }

        watch.stop();
        printMessage(outWriter, "Segment-copy succeeded in {0}", printableStopwatch(watch));

        return 0;
    }