private void doInitialRefresh()

in protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java [483:638]


    private void doInitialRefresh( LdapSession session, SearchRequest request ) throws Exception
    {
        PROVIDER_LOG.debug( "Starting an initial refresh" );

        SortRequest ctrl = ( SortRequest ) request.getControl( SortRequest.OID );

        if ( ctrl != null )
        {
            PROVIDER_LOG
                .warn( "Removing the received sort control from the syncrepl search request during initial refresh" );
            request.removeControl( ctrl );
        }

        PROVIDER_LOG
            .debug( "Adding sort control to sort the entries by entryDn attribute to preserve order of insertion" );
        SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT );
        // matchingrule for "entryDn"
        sk.setMatchingRuleId( SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
        sk.setReverseOrder( true );

        ctrl = new SortRequestImpl();
        ctrl.addSortKey( sk );

        request.addControl( ctrl );

        String originalFilter = request.getFilter().toString();
        InetSocketAddress address = ( InetSocketAddress ) session.getIoSession().getRemoteAddress();
        String hostName = address.getAddress().getHostName();

        ExprNode modifiedFilter = modifyFilter( session, request );

        Partition partition = dirService.getPartitionNexus().getPartition( request.getBase() );
        String contextCsn;
        
        try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
        {
            contextCsn = partition.getContextCsn( partitionTxn );
        }

        boolean refreshNPersist = isRefreshNPersist( request );

        // first register a ReplicaEventLog before starting the initial content refresh
        // this is to log all the operations happen on DIT during initial content refresh
        ReplicaEventLog replicaLog = null;
        
        try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
        {
            replicaLog = createReplicaEventLog( partitionTxn, hostName, originalFilter );
        }

        replicaLog.setRefreshNPersist( refreshNPersist );
        Value contexCsnValue = new Value( dirService.getAtProvider().getEntryCSN(), contextCsn );

        // modify the filter to include the context Csn
        GreaterEqNode csnGeNode = new GreaterEqNode( csnAT, contexCsnValue );
        ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
        request.setFilter( postInitContentFilter );

        // now we process entries forever as they change
        // irrespective of the sync mode set the 'isRealtimePush' to false initially so that we can
        // store the modifications in the queue and later if it is a persist mode
        PROVIDER_LOG.debug( "Starting the replicaLog {}", replicaLog );

        // we push this queue's content and switch to realtime mode
        SyncReplSearchListener replicationListener = new SyncReplSearchListener( session, request, replicaLog, false );
        replicaLog.setPersistentListener( replicationListener );

        // compose notification criteria and add the listener to the event
        // service using that notification criteria to determine which events
        // are to be delivered to the persistent search issuing client
        NotificationCriteria criteria = new NotificationCriteria( dirService.getSchemaManager() );
        criteria.setAliasDerefMode( request.getDerefAliases() );
        criteria.setBase( request.getBase() );
        criteria.setFilter( request.getFilter() );
        criteria.setScope( request.getScope() );
        criteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );

        replicaLog.setSearchCriteria( criteria );

        dirService.getEventService().addListener( replicationListener, criteria );

        // then start pushing initial content
        LessEqNode csnNode = new LessEqNode( csnAT, contexCsnValue );

        // modify the filter to include the context Csn
        ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
        request.setFilter( initialContentFilter );

        // Now, do a search to get all the entries
        SearchResultDone searchDoneResp = doSimpleSearch( session, request, replicaLog );

        if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
        {
            if ( replicaLog.getLastSentCsn() == null )
            {
                replicaLog.setLastSentCsn( contextCsn );
            }

            if ( refreshNPersist ) // refreshAndPersist mode
            {
                PROVIDER_LOG
                    .debug( "Refresh&Persist requested : send the data being modified since the initial refresh" );
                // Now, send the modified entries since the search has started
                sendContentFromLog( session, request, replicaLog, contextCsn );

                byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), replicaLog.getLastSentCsn() );

                SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
                syncInfoValue.setSyncInfoValueType( SynchronizationInfoEnum.NEW_COOKIE );
                syncInfoValue.setMessageId( request.getMessageId() );
                syncInfoValue.setCookie( cookie );

                PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}", 
                    replicaLog, syncInfoValue );

                session.getIoSession().write( syncInfoValue );

                // switch the handler mode to realtime push
                replicationListener.setPushInRealTime( refreshNPersist );
                PROVIDER_LOG.debug( "e waiting for any modification for {}", replicaLog );
            }
            else
            {
                PROVIDER_LOG.debug( "RefreshOnly requested" );
                byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );

                // no need to send from the log, that will be done in the next refreshOnly session
                SyncDoneValue syncDone = new SyncDoneValueImpl();
                syncDone.setCookie( cookie );
                searchDoneResp.addControl( syncDone );
                PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog,
                    searchDoneResp );

                session.getIoSession().write( searchDoneResp );
            }
        }
        else
        // if not succeeded return
        {
            PROVIDER_LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult()
                .getResultCode() );
            replicaLog.stop();
            replicaLog = null;

            // remove the listener
            dirService.getEventService().removeListener( replicationListener );

            return;
        }

        // if all is well then store the consumer information
        replicaUtil.addConsumerEntry( replicaLog );

        // add to the map only after storing in the DIT, else the Replica update thread barfs
        replicaLogMap.put( replicaLog.getId(), replicaLog );
    }