private synchronized void startRecoveryRedoPhase()

in asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java [277:453]


    private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader,
            long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException {
        int redoCount = 0;
        long txnId = 0;

        long resourceId;
        long maxDiskLastLsn;
        long lsn = -1;
        ILSMIndex index = null;
        LocalResource localResource = null;
        DatasetLocalResource localResourceMetadata = null;
        boolean foundWinner = false;
        JobEntityCommits jobEntityWinners = null;

        IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager();
        final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
                ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();

        Map<Long, LocalResource> resourcesMap = localResourceRepository.getResources(r -> true, partitions);
        final Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
        TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false);

        ILogRecord logRecord = null;
        Set<Integer> flushRedoDatasets = new HashSet<>();
        try {
            logReader.setPosition(lowWaterMarkLSN);
            logRecord = logReader.next();
            while (logRecord != null) {
                if (IS_DEBUG_MODE) {
                    LOGGER.info(logRecord.getLogRecordForDisplay());
                }
                lsn = logRecord.getLSN();
                txnId = logRecord.getTxnId();
                foundWinner = false;
                switch (logRecord.getLogType()) {
                    case LogType.UPDATE:
                        if (partitions.contains(logRecord.getResourcePartition())) {
                            if (winnerTxnSet.contains(txnId)) {
                                foundWinner = true;
                            } else if (jobId2WinnerEntitiesMap.containsKey(txnId)) {
                                jobEntityWinners = jobId2WinnerEntitiesMap.get(txnId);
                                tempKeyTxnEntityId.setTxnId(txnId, logRecord.getDatasetId(), logRecord.getPKHashValue(),
                                        logRecord.getPKValue(), logRecord.getPKValueSize());
                                if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnEntityId)) {
                                    foundWinner = true;
                                }
                            }
                            if (!foundWinner) {
                                break;
                            }
                        }
                        //fall through as FILTER is a subset of UPDATE
                    case LogType.FILTER:
                        if (partitions.contains(logRecord.getResourcePartition())) {
                            resourceId = logRecord.getResourceId();
                            localResource = resourcesMap.get(resourceId);
                            /*******************************************************************
                             * [Notice]
                             * -> Issue
                             * Delete index may cause a problem during redo.
                             * The index operation to be redone couldn't be redone because the corresponding index
                             * may not exist in NC due to the possible index drop DDL operation.
                             * -> Approach
                             * Avoid the problem during redo.
                             * More specifically, the problem will be detected when the localResource of
                             * the corresponding index is retrieved, which will end up with 'null'.
                             * If null is returned, then just go and process the next
                             * log record.
                             *******************************************************************/
                            if (localResource == null) {
                                LOGGER.log(Level.WARN, "resource was not found for resource id " + resourceId);
                                logRecord = logReader.next();
                                continue;
                            }
                            /*******************************************************************/

                            //get index instance from IndexLifeCycleManager
                            //if index is not registered into IndexLifeCycleManager,
                            //create the index using LocalMetadata stored in LocalResourceRepository
                            //get partition path in this node
                            localResourceMetadata = (DatasetLocalResource) localResource.getResource();
                            index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath());
                            if (index == null) {
                                //#. create index instance and register to indexLifeCycleManager
                                index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx);
                                datasetLifecycleManager.register(localResource.getPath(), index);
                                datasetLifecycleManager.open(localResource.getPath());
                                try {
                                    maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
                                            indexCheckpointManagerProvider);
                                } catch (HyracksDataException e) {
                                    datasetLifecycleManager.close(localResource.getPath());
                                    throw e;
                                }
                                //#. set resourceId and maxDiskLastLSN to the map
                                resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn);
                            } else {
                                if (!resourceId2MaxLSNMap.containsKey(resourceId)) {
                                    maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager,
                                            indexCheckpointManagerProvider);
                                } else {
                                    maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId);
                                }
                            }
                            // lsn @ maxDiskLastLsn is either a flush log or a master replica log
                            if (lsn >= maxDiskLastLsn) {
                                redo(logRecord, datasetLifecycleManager);
                                redoCount++;
                            }
                        }
                        break;
                    case LogType.FLUSH:
                        int partition = logRecord.getResourcePartition();
                        if (partitions.contains(partition)) {
                            int datasetId = logRecord.getDatasetId();
                            if (!datasetLifecycleManager.isRegistered(datasetId)) {
                                // it's possible this dataset has been dropped
                                logRecord = logReader.next();
                                continue;
                            }
                            DatasetInfo dsInfo = datasetLifecycleManager.getDatasetInfo(datasetId);
                            // we only need to flush open indexes here (opened by previous update records)
                            // if an index has no ongoing updates, then it's memory component must be empty
                            // and there is nothing to flush
                            for (final IndexInfo iInfo : dsInfo.getIndexes().values()) {
                                if (iInfo.isOpen() && iInfo.getPartition() == partition) {
                                    Long maxLsnBeforeFlush = resourceId2MaxLSNMap.get(iInfo.getResourceId());
                                    if (maxLsnBeforeFlush != null) {
                                        // If there was at least one update to the resource.
                                        // IMPORTANT: Don't remove the check above
                                        // This check is to support indexes without transaction logs
                                        maxDiskLastLsn = maxLsnBeforeFlush;
                                        index = iInfo.getIndex();
                                        if (logRecord.getLSN() > maxDiskLastLsn
                                                && !index.isCurrentMutableComponentEmpty()) {
                                            // schedule flush
                                            redoFlush(index, logRecord);
                                            flushRedoDatasets.add(datasetId);
                                            redoCount++;
                                        } else {
                                            // TODO: update checkpoint file?
                                        }
                                    } else {
                                        // TODO: update checkpoint file?
                                    }
                                }
                            }
                        }
                        break;
                    case LogType.JOB_COMMIT:
                    case LogType.ENTITY_COMMIT:
                    case LogType.ABORT:
                    case LogType.WAIT:
                    case LogType.WAIT_FOR_FLUSHES:
                    case LogType.MARKER:
                        //do nothing
                        break;
                    default:
                        throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
                }
                logRecord = logReader.next();
            }
            LOGGER.info("Logs REDO phase completed. Redo logs count: " + redoCount);
        } finally {
            txnSubsystem.getTransactionManager().ensureMaxTxnId(txnId);
            //close all indexes
            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
            for (long r : resourceIdList) {
                datasetLifecycleManager.close(resourcesMap.get(r).getPath());
            }
            if (closeOnFlushRedo) {
                // close datasets of indexes to ensure any cached state that might've been changed by recovery is cleared
                // e.g. when redoing a flush, the component id generator needs to be reinitialized
                datasetLifecycleManager.closeDatasets(flushRedoDatasets);
            }
        }
    }