in src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt [242:351]
fun parse(
xcp: XContentParser,
id: String = NO_ID,
seqNo: Long = SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm: Long = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
): Transform {
var schedule: Schedule? = null
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
var updatedAt: Instant? = null
var enabledAt: Instant? = null
var enabled = true
var description: String? = null
var sourceIndex: String? = null
var dataSelectionQuery: QueryBuilder = MatchAllQueryBuilder()
var targetIndex: String? = null
var metadataId: String? = null
var pageSize: Int? = null
val groups = mutableListOf<Dimension>()
var aggregations: AggregatorFactories.Builder = AggregatorFactories.builder()
var user: User? = null
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
TRANSFORM_ID_FIELD -> {
requireNotNull(xcp.text()) { "The transform_id field is null" }
}
JOB_SCHEDULE_FIELD -> schedule = ScheduleParser.parse(xcp)
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.longValue()
UPDATED_AT_FIELD -> updatedAt = xcp.instant()
ENABLED_AT_FIELD -> enabledAt = xcp.instant()
ENABLED_FIELD -> enabled = xcp.booleanValue()
DESCRIPTION_FIELD -> description = xcp.text()
SOURCE_INDEX_FIELD -> sourceIndex = xcp.text()
DATA_SELECTION_QUERY_FIELD -> {
val registry = xcp.xContentRegistry
val source = xcp.mapOrdered()
val xContentBuilder = XContentFactory.jsonBuilder().map(source)
val sourceParser = XContentType.JSON.xContent().createParser(
registry, LoggingDeprecationHandler.INSTANCE,
BytesReference
.bytes(xContentBuilder).streamInput()
)
dataSelectionQuery = AbstractQueryBuilder.parseInnerQueryBuilder(sourceParser)
}
TARGET_INDEX_FIELD -> targetIndex = xcp.text()
METADATA_ID_FIELD -> metadataId = xcp.textOrNull()
ROLES_FIELD -> {
// Parsing but not storing the field, deprecated
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
xcp.text()
}
}
PAGE_SIZE_FIELD -> pageSize = xcp.intValue()
GROUPS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
groups.add(Dimension.parse(xcp))
}
}
AGGREGATIONS_FIELD -> aggregations = AggregatorFactories.parseAggregators(xcp)
USER_FIELD -> {
user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp)
}
else -> throw IllegalArgumentException("Invalid field [$fieldName] found in Transforms.")
}
}
if (enabled && enabledAt == null) {
enabledAt = Instant.now()
} else if (!enabled) {
enabledAt = null
}
// If the seqNo/primaryTerm are unassigned this job hasn't been created yet
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// we instantiate the start time
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit)
}
// we clear out metadata if its a new job
metadataId = null
}
return Transform(
id = id,
seqNo = seqNo,
primaryTerm = primaryTerm,
schemaVersion = schemaVersion,
jobSchedule = requireNotNull(schedule) { "Transform schedule is null" },
metadataId = metadataId,
updatedAt = updatedAt ?: Instant.now(),
enabled = enabled,
enabledAt = enabledAt,
description = requireNotNull(description) { "Transform description is null" },
sourceIndex = requireNotNull(sourceIndex) { "Transform source index is null" },
dataSelectionQuery = dataSelectionQuery,
targetIndex = requireNotNull(targetIndex) { "Transform target index is null" },
pageSize = requireNotNull(pageSize) { "Transform page size is null" },
groups = groups,
aggregations = aggregations,
user = user
)
}