private void exitComponents()

in hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java [189:327]


    private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent,
            boolean failedOperation) throws HyracksDataException, IndexException {
        /**
         * FLUSH and MERGE operations should always exit the components
         * to notify waiting threads.
         */
        if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
            return;
        }
        List<ILSMComponent> inactiveDiskComponents = null;
        List<ILSMComponent> inactiveDiskComponentsToBeDeleted = null;
        try {
            synchronized (opTracker) {
                try {
                    int i = 0;
                    // First check if there is any action that is needed to be taken based on the state of each component.
                    for (ILSMComponent c : ctx.getComponentHolder()) {
                        boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
                        c.threadExit(opType, failedOperation, isMutableComponent);
                        if (c.getType() == LSMComponentType.MEMORY) {
                            switch (c.getState()) {
                                case READABLE_UNWRITABLE:
                                    if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
                                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
                                        lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
                                    }
                                    break;
                                case INACTIVE:
                                    ((AbstractMemoryLSMComponent) c).reset();
                                    // Notify all waiting threads whenever the mutable component's has change to inactive. This is important because
                                    // even though we switched the mutable components, it is possible that the component that we just switched
                                    // to is still busy flushing its data to disk. Thus, the notification that was issued upon scheduling the flush
                                    // is not enough.
                                    opTracker.notifyAll();
                                    break;
                                default:
                                    break;
                            }
                        } else {
                            switch (c.getState()) {
                                case INACTIVE:
                                    lsmIndex.addInactiveDiskComponent(c);
                                    break;
                                default:
                                    break;
                            }
                        }
                        i++;
                    }
                    ctx.setAccessingComponents(false);
                    // Then, perform any action that is needed to be taken based on the operation type.
                    switch (opType) {
                        case FLUSH:
                            // newComponent is null if the flush op. was not performed.
                            if (newComponent != null) {
                                lsmIndex.addComponent(newComponent);
                                if (replicationEnabled) {
                                    componentsToBeReplicated.clear();
                                    componentsToBeReplicated.add(newComponent);
                                    triggerReplication(componentsToBeReplicated, false, opType);
                                }
                                mergePolicy.diskComponentAdded(lsmIndex, false);
                            }
                            break;
                        case MERGE:
                            // newComponent is null if the merge op. was not performed.
                            if (newComponent != null) {
                                lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
                                if (replicationEnabled) {
                                    componentsToBeReplicated.clear();
                                    componentsToBeReplicated.add(newComponent);
                                    triggerReplication(componentsToBeReplicated, false, opType);
                                }
                                mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
                            }
                            break;
                        default:
                            break;
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                    throw e;
                } finally {
                    if (failedOperation && (opType == LSMOperationType.MODIFICATION
                            || opType == LSMOperationType.FORCE_MODIFICATION)) {
                        //When the operation failed, completeOperation() method must be called
                        //in order to decrement active operation count which was incremented in beforeOperation() method.
                        opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
                                ctx.getModificationCallback());
                    } else {
                        opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
                                ctx.getModificationCallback());
                    }

                    /*
                     * = Inactive disk components lazy cleanup if any =
                     * Prepare to cleanup inactive diskComponents which were old merged components
                     * and not anymore accessed.
                     * This cleanup is done outside of optracker synchronized block.
                     */
                    inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
                    if (!inactiveDiskComponents.isEmpty()) {
                        for (ILSMComponent inactiveComp : inactiveDiskComponents) {
                            if (((AbstractDiskLSMComponent) inactiveComp).getFileReferenceCount() == 1) {
                                if (inactiveDiskComponentsToBeDeleted == null) {
                                    inactiveDiskComponentsToBeDeleted = new LinkedList<ILSMComponent>();
                                }
                                inactiveDiskComponentsToBeDeleted.add(inactiveComp);
                            }
                        }
                        if (inactiveDiskComponentsToBeDeleted != null) {
                            inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
                        }
                    }
                }
            }
        } finally {
            /*
             * cleanup inactive disk components if any
             */
            if (inactiveDiskComponentsToBeDeleted != null) {
                try {
                    //schedule a replication job to delete these inactive disk components from replicas
                    if (replicationEnabled) {
                        lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
                                ReplicationOperation.DELETE, opType);
                    }

                    for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
                        ((AbstractDiskLSMComponent) c).destroy();
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                    throw e;
                }
            }
        }

    }