in protocol-ldap/src/main/java/org/apache/directory/server/ldap/replication/provider/SyncReplRequestHandler.java [483:638]
private void doInitialRefresh( LdapSession session, SearchRequest request ) throws Exception
{
PROVIDER_LOG.debug( "Starting an initial refresh" );
SortRequest ctrl = ( SortRequest ) request.getControl( SortRequest.OID );
if ( ctrl != null )
{
PROVIDER_LOG
.warn( "Removing the received sort control from the syncrepl search request during initial refresh" );
request.removeControl( ctrl );
}
PROVIDER_LOG
.debug( "Adding sort control to sort the entries by entryDn attribute to preserve order of insertion" );
SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT );
// matchingrule for "entryDn"
sk.setMatchingRuleId( SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
sk.setReverseOrder( true );
ctrl = new SortRequestImpl();
ctrl.addSortKey( sk );
request.addControl( ctrl );
String originalFilter = request.getFilter().toString();
InetSocketAddress address = ( InetSocketAddress ) session.getIoSession().getRemoteAddress();
String hostName = address.getAddress().getHostName();
ExprNode modifiedFilter = modifyFilter( session, request );
Partition partition = dirService.getPartitionNexus().getPartition( request.getBase() );
String contextCsn;
try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
{
contextCsn = partition.getContextCsn( partitionTxn );
}
boolean refreshNPersist = isRefreshNPersist( request );
// first register a ReplicaEventLog before starting the initial content refresh
// this is to log all the operations happen on DIT during initial content refresh
ReplicaEventLog replicaLog = null;
try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
{
replicaLog = createReplicaEventLog( partitionTxn, hostName, originalFilter );
}
replicaLog.setRefreshNPersist( refreshNPersist );
Value contexCsnValue = new Value( dirService.getAtProvider().getEntryCSN(), contextCsn );
// modify the filter to include the context Csn
GreaterEqNode csnGeNode = new GreaterEqNode( csnAT, contexCsnValue );
ExprNode postInitContentFilter = new AndNode( modifiedFilter, csnGeNode );
request.setFilter( postInitContentFilter );
// now we process entries forever as they change
// irrespective of the sync mode set the 'isRealtimePush' to false initially so that we can
// store the modifications in the queue and later if it is a persist mode
PROVIDER_LOG.debug( "Starting the replicaLog {}", replicaLog );
// we push this queue's content and switch to realtime mode
SyncReplSearchListener replicationListener = new SyncReplSearchListener( session, request, replicaLog, false );
replicaLog.setPersistentListener( replicationListener );
// compose notification criteria and add the listener to the event
// service using that notification criteria to determine which events
// are to be delivered to the persistent search issuing client
NotificationCriteria criteria = new NotificationCriteria( dirService.getSchemaManager() );
criteria.setAliasDerefMode( request.getDerefAliases() );
criteria.setBase( request.getBase() );
criteria.setFilter( request.getFilter() );
criteria.setScope( request.getScope() );
criteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
replicaLog.setSearchCriteria( criteria );
dirService.getEventService().addListener( replicationListener, criteria );
// then start pushing initial content
LessEqNode csnNode = new LessEqNode( csnAT, contexCsnValue );
// modify the filter to include the context Csn
ExprNode initialContentFilter = new AndNode( modifiedFilter, csnNode );
request.setFilter( initialContentFilter );
// Now, do a search to get all the entries
SearchResultDone searchDoneResp = doSimpleSearch( session, request, replicaLog );
if ( searchDoneResp.getLdapResult().getResultCode() == ResultCodeEnum.SUCCESS )
{
if ( replicaLog.getLastSentCsn() == null )
{
replicaLog.setLastSentCsn( contextCsn );
}
if ( refreshNPersist ) // refreshAndPersist mode
{
PROVIDER_LOG
.debug( "Refresh&Persist requested : send the data being modified since the initial refresh" );
// Now, send the modified entries since the search has started
sendContentFromLog( session, request, replicaLog, contextCsn );
byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), replicaLog.getLastSentCsn() );
SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
syncInfoValue.setSyncInfoValueType( SynchronizationInfoEnum.NEW_COOKIE );
syncInfoValue.setMessageId( request.getMessageId() );
syncInfoValue.setCookie( cookie );
PROVIDER_LOG.info( "Sending the intermediate response to consumer {}, {}",
replicaLog, syncInfoValue );
session.getIoSession().write( syncInfoValue );
// switch the handler mode to realtime push
replicationListener.setPushInRealTime( refreshNPersist );
PROVIDER_LOG.debug( "e waiting for any modification for {}", replicaLog );
}
else
{
PROVIDER_LOG.debug( "RefreshOnly requested" );
byte[] cookie = LdapProtocolUtils.createCookie( replicaLog.getId(), contextCsn );
// no need to send from the log, that will be done in the next refreshOnly session
SyncDoneValue syncDone = new SyncDoneValueImpl();
syncDone.setCookie( cookie );
searchDoneResp.addControl( syncDone );
PROVIDER_LOG.info( "Sending the searchResultDone response to consumer {}, {}", replicaLog,
searchDoneResp );
session.getIoSession().write( searchDoneResp );
}
}
else
// if not succeeded return
{
PROVIDER_LOG.warn( "initial content refresh didn't succeed due to {}", searchDoneResp.getLdapResult()
.getResultCode() );
replicaLog.stop();
replicaLog = null;
// remove the listener
dirService.getEventService().removeListener( replicationListener );
return;
}
// if all is well then store the consumer information
replicaUtil.addConsumerEntry( replicaLog );
// add to the map only after storing in the DIT, else the Replica update thread barfs
replicaLogMap.put( replicaLog.getId(), replicaLog );
}