in src/main/kotlin/org/opensearch/replication/action/replay/TransportReplayChangesAction.kt [111:136]
suspend fun performOnPrimary(request: ReplayChangesRequest, primaryShard: IndexShard)
: WritePrimaryResult<ReplayChangesRequest, ReplayChangesResponse> {
checkIfIndexBlockedWithLevel(clusterService, request.index(), ClusterBlockLevel.WRITE)
var location: Translog.Location? = null
request.changes.asSequence().map {
it.withPrimaryTerm(primaryShard.operationPrimaryTerm).unSetAutoGenTimeStamp()
}.forEach { op ->
if(primaryShard.maxSeqNoOfUpdatesOrDeletes < request.maxSeqNoOfUpdatesOrDeletes) {
primaryShard.advanceMaxSeqNoOfUpdatesOrDeletes(request.maxSeqNoOfUpdatesOrDeletes)
}
var result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
if (result.resultType == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
waitForMappingUpdate {
// fetch mappings from the leader cluster when applying on PRIMARY...
syncRemoteMapping(request.leaderAlias, request.leaderIndex, request.shardId()!!.indexName,
op.docType())
}
result = primaryShard.applyTranslogOperation(op, Engine.Operation.Origin.PRIMARY)
}
location = syncOperationResultOrThrow(result, location)
}
val response = ReplayChangesResponse() // TODO: Figure out what to add to response
return WritePrimaryResult(request, response, location, null, primaryShard, log)
}