in src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt [252:353]
fun parse(
xcp: XContentParser,
id: String = NO_ID,
seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
): Rollup {
var schedule: Schedule? = null
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
var lastUpdatedTime: Instant? = null
var enabledTime: Instant? = null
var enabled = true
var description: String? = null
var sourceIndex: String? = null
var targetIndex: String? = null
var metadataID: String? = null
var pageSize: Int? = null
var delay: Long? = null
var continuous = false
val dimensions = mutableListOf<Dimension>()
val metrics = mutableListOf<RollupMetrics>()
var user: User? = null
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
ROLLUP_ID_FIELD -> { requireNotNull(xcp.text()) { "The rollup_id field is null" } /* Just used for searching */ }
ENABLED_FIELD -> enabled = xcp.booleanValue()
SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.longValue()
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = xcp.instant()
DESCRIPTION_FIELD -> description = xcp.text()
SOURCE_INDEX_FIELD -> sourceIndex = xcp.text()
TARGET_INDEX_FIELD -> targetIndex = xcp.text()
METADATA_ID_FIELD -> metadataID = xcp.textOrNull()
ROLES_FIELD -> {
// Parsing but not storing the field, deprecated
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
xcp.text()
}
}
PAGE_SIZE_FIELD -> pageSize = xcp.intValue()
DELAY_FIELD -> delay = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.longValue()
CONTINUOUS_FIELD -> continuous = xcp.booleanValue()
DIMENSIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
dimensions.add(Dimension.parse(xcp))
}
}
METRICS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
metrics.add(RollupMetrics.parse(xcp))
}
}
USER_FIELD -> {
user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp)
}
else -> throw IllegalArgumentException("Invalid field [$fieldName] found in Rollup.")
}
}
if (enabled && enabledTime == null) {
enabledTime = Instant.now()
} else if (!enabled) {
enabledTime = null
}
// If the seqNo/primaryTerm are unassigned this job hasn't been created yet so we instantiate the startTime
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
}
}
return Rollup(
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
enabled = enabled,
schemaVersion = schemaVersion,
jobSchedule = requireNotNull(schedule) { "Rollup schedule is null" },
jobLastUpdatedTime = lastUpdatedTime ?: Instant.now(),
jobEnabledTime = enabledTime,
description = requireNotNull(description) { "Rollup description is null" },
sourceIndex = requireNotNull(sourceIndex) { "Rollup source index is null" },
targetIndex = requireNotNull(targetIndex) { "Rollup target index is null" },
metadataID = metadataID,
pageSize = requireNotNull(pageSize) { "Rollup page size is null" },
delay = delay,
continuous = continuous,
dimensions = dimensions,
metrics = metrics,
user = user
)
}