private void load0()

in modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java [814:1097]


    private void load0(
        Collection<? extends DataStreamerEntry> entries,
        final GridFutureAdapter<Object> resFut,
        @Nullable final Collection<KeyCacheObjectWrapper> activeKeys,
        final int remaps,
        ClusterNode remapNode,
        AffinityTopologyVersion remapTopVer
    ) {
        try {
            assert entries != null;

            final boolean remap = remaps > 0;

            if (!remap) { // Failed data should be processed prior to new data.
                acquireRemapSemaphore();
            }

            if (!isWarningPrinted) {
                synchronized (this) {
                    if (!allowOverwrite() && !isWarningPrinted) {
                        U.warn(log, "Data streamer will not overwrite existing cache entries for better performance " +
                            "(to change, set allowOverwrite to true)");
                    }

                    isWarningPrinted = true;
                }
            }

            Map<ClusterNode, Collection<DataStreamerEntry>> mappings = new HashMap<>();

            boolean initPda = ctx.deploy().enabled() && jobPda == null;

            GridCacheAdapter cache = ctx.cache().internalCache(cacheName);

            if (cache == null)
                throw new IgniteCheckedException("Cache not created or already destroyed.");

            GridCacheContext cctx = cache.context();

            GridCacheGateway gate = null;

            AffinityTopologyVersion topVer;

            GridDhtPartitionsExchangeFuture exchFut = ctx.cache().context().exchange().lastTopologyFuture();

            if (!exchFut.isDone()) {
                ExchangeActions acts = exchFut.exchangeActions();

                if (acts != null && acts.cacheStopped(CU.cacheId(cacheName)))
                    throw new CacheStoppedException(cacheName);
            }

            // It is safe to block here even if the cache gate is acquired.
            topVer = exchFut.get();

            List<List<ClusterNode>> assignments = cctx.affinity().assignments(topVer);

            if (!allowOverwrite()) { // Cases where cctx required.
                gate = cctx.gate();

                gate.enter();
            }

            try {
                for (DataStreamerEntry entry : entries) {
                    List<ClusterNode> nodes;

                    try {
                        KeyCacheObject key = entry.getKey();

                        assert key != null;

                        if (initPda) {
                            if (cacheObjCtx.addDeploymentInfo())
                                jobPda = new DataStreamerPda(key.value(cacheObjCtx, false),
                                    entry.getValue() != null ? entry.getValue().value(cacheObjCtx, false) : null,
                                    rcvr);
                            else if (rcvr != null)
                                jobPda = new DataStreamerPda(rcvr);

                            initPda = false;
                        }

                        if (key.partition() == -1)
                            key.partition(cctx.affinity().partition(key, false));

                        if (!allowOverwrite() && remapNode != null && Objects.equals(topVer, remapTopVer))
                            nodes = Collections.singletonList(remapNode);
                        else
                            nodes = nodes(key, topVer, cctx);
                    }
                    catch (IgniteCheckedException e) {
                        resFut.onDone(e);

                        return;
                    }

                    if (F.isEmpty(nodes)) {
                        resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
                            "(no nodes with cache found in topology) [infos=" + entries.size() +
                            ", cacheName=" + cacheName + ']'));

                        return;
                    }

                    for (ClusterNode node : nodes) {
                        Collection<DataStreamerEntry> col = mappings.get(node);

                        if (col == null)
                            mappings.put(node, col = new ArrayList<>());

                        col.add(entry);
                    }
                }

                for (final Map.Entry<ClusterNode, Collection<DataStreamerEntry>> e : mappings.entrySet()) {
                    final ClusterNode node = e.getKey();
                    final UUID nodeId = e.getKey().id();

                    Buffer buf = bufMappings.get(nodeId);

                    if (buf == null) {
                        Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));

                        if (old != null)
                            buf = old;
                    }

                    final Collection<DataStreamerEntry> entriesForNode = e.getValue();

                    IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
                        @Override public void apply(IgniteInternalFuture<?> t) {
                            try {
                                t.get();

                                if (activeKeys != null) {
                                    for (DataStreamerEntry e : entriesForNode)
                                        activeKeys.remove(new KeyCacheObjectWrapper(e.getKey()));

                                    if (activeKeys.isEmpty())
                                        resFut.onDone();
                                }
                                else {
                                    assert entriesForNode.size() == 1;

                                    // That has been a single key,
                                    // so complete result future right away.
                                    resFut.onDone();
                                }
                            }
                            catch (IgniteClientDisconnectedCheckedException e1) {
                                if (log.isDebugEnabled())
                                    log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');

                                resFut.onDone(e1);
                            }
                            catch (IgniteCheckedException e1) {
                                if (log.isDebugEnabled())
                                    log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');

                                if (cancelled) {
                                    resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
                                        DataStreamerImpl.this, e1));
                                }
                                else if (remaps + 1 > maxRemapCnt) {
                                    resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
                                        + remaps, e1));
                                }
                                else if (X.hasCause(e1, IgniteClusterReadOnlyException.class)) {
                                    resFut.onDone(new IgniteClusterReadOnlyException(
                                        "Failed to finish operation. Cluster in read-only mode!",
                                        e1
                                    ));
                                }
                                else {
                                    try {
                                        remapSem.acquire();

                                        final Runnable r = new Runnable() {
                                            @Override public void run() {
                                                try {
                                                    if (cancelled)
                                                        closedException();

                                                    load0(entriesForNode, resFut, activeKeys, remaps + 1, node, topVer);
                                                }
                                                catch (Throwable ex) {
                                                    resFut.onDone(
                                                        new IgniteCheckedException("DataStreamer remapping failed. ", ex));
                                                }
                                                finally {
                                                    remapSem.release();
                                                }
                                            }
                                        };

                                        dataToRemap.add(r);

                                        if (!remapOwning.get() && remapOwning.compareAndSet(false, true)) {
                                            ctx.closure().callLocalSafe(new GPC<Boolean>() {
                                                @Override public Boolean call() {
                                                    boolean locked = true;

                                                    while (locked || !dataToRemap.isEmpty()) {
                                                        if (!locked && !remapOwning.compareAndSet(false, true))
                                                            return false;

                                                        try {
                                                            Runnable r = dataToRemap.poll();

                                                            if (r != null)
                                                                r.run();
                                                        }
                                                        finally {
                                                            if (!dataToRemap.isEmpty())
                                                                locked = true;
                                                            else {
                                                                remapOwning.set(false);

                                                                locked = false;
                                                            }
                                                        }
                                                    }

                                                    return true;
                                                }
                                            }, true);
                                        }
                                    }
                                    catch (InterruptedException e2) {
                                        resFut.onDone(e2);
                                    }
                                }
                            }
                        }
                    };

                    GridCompoundFuture opFut = new SilentCompoundFuture();

                    opFut.listen(lsnr);

                    final List<GridFutureAdapter<?>> futs;

                    try {
                        futs = buf.update(entriesForNode, topVer, assignments, opFut, remap);

                        opFut.markInitialized();
                    }
                    catch (IgniteInterruptedCheckedException e1) {
                        resFut.onDone(e1);

                        return;
                    }

                    if (ctx.discovery().node(nodeId) == null) {
                        if (bufMappings.remove(nodeId, buf)) {
                            final Buffer buf0 = buf;

                            waitAffinityAndRun(new GridPlainRunnable() {
                                @Override public void run() {
                                    buf0.onNodeLeft();

                                    if (futs != null) {
                                        Throwable ex = new ClusterTopologyCheckedException(
                                                "Failed to wait for request completion (node has left): " + nodeId);

                                        for (int i = 0; i < futs.size(); i++)
                                            futs.get(i).onDone(ex);
                                    }
                                }
                            }, ctx.discovery().topologyVersion(), false);
                        }
                    }
                }
            }
            finally {
                if (gate != null)
                    gate.leave();
            }
        }
        catch (Exception ex) {
            resFut.onDone(new IgniteCheckedException("DataStreamer data loading failed.", ex));
        }
    }