in entity-store/src/main/kotlin/jetbrains/exodus/entitystore/EntityIterableCache.kt [219:307]
override fun execute() {
// Update cache size lazily
updateCacheSizeIfNecessary()
val started = System.currentTimeMillis()
// don't try to cache if it is too late
if (!cancellingPolicy.canStartAt(started)) {
stats.incTotalJobsNotStarted()
return
}
val isConsistent = cancellingPolicy.isConsistent
val iterableIdentity = handle.identity
// for consistent jobs, don't try to cache if we know that this iterable was "heavy" during its life span
if (isConsistent && config.entityIterableCacheHeavyEnabled) {
val lastCancelled = heavyIterablesCache.tryKey(iterableIdentity)
if (lastCancelled != null) {
if (lastCancelled + config.entityIterableCacheHeavyIterablesLifeSpan > started) {
stats.incTotalJobsNotStarted()
logger.debug { "Heavy iterable not started, handle=${toString(config, handle)}" }
return
}
heavyIterablesCache.remove(iterableIdentity)
}
}
stats.incTotalJobsStarted()
store.executeInReadonlyTransaction { txn ->
if (!handle.isConsistent) {
handle.resetBirthTime()
}
txn as PersistentStoreTransaction
cancellingPolicy.setLocalCache(txn.localCache)
txn.queryCancellingPolicy = cancellingPolicy
try {
it.getOrCreateCachedInstance(txn, !isConsistent)
if (logger.isInfoEnabled) {
val cachedIn = System.currentTimeMillis() - started
if (cachedIn > 1000) {
logger.info {
val action = if (isConsistent) "Cached" else "Cached (inconsistent)"
"$action in $cachedIn ms, handle=${toString(config, handle)}"
}
}
}
} catch (_: ReadonlyTransactionException) {
// work around XD-626
val action = if (isConsistent) "Caching" else "Caching (inconsistent)"
logger.error("$action failed with ReadonlyTransactionException. Re-queueing...")
queue(Priority.below_normal)
} catch (e: TooLongEntityIterableInstantiationException) {
val cachingTime = System.currentTimeMillis() - started
if (e.reason == CACHE_ADAPTER_OBSOLETE) {
val maxRetries = config.entityIterableCacheObsoleteMaxRetries
if (maxRetries > 0 && currentAttempt <= maxRetries) {
val handle = toString(config, handle)
logger.info { "Re-queuing obsolete cache job for handle ${handle}, retries left: ${maxRetries - currentAttempt}" }
currentAttempt++
queue(Priority.normal)
if (isConsistent) {
stats.incTotalJobsRetried()
} else {
stats.incTotalCountJobsRetried()
}
return@executeInReadonlyTransaction
}
}
if (isConsistent && config.entityIterableCacheHeavyEnabled) {
heavyIterablesCache.cacheObject(iterableIdentity, System.currentTimeMillis())
}
// Update stats and requeue if can
stats.incTotalJobsInterrupted()
when (e.reason) {
CACHE_ADAPTER_OBSOLETE -> stats.incTotalJobsObsolete()
JOB_OVERDUE -> stats.incTotalJobsOverdue()
}
// Log
logger.info {
val action = if (isConsistent) "Caching" else "Caching (inconsistent)"
val handle = toString(config, handle)
"$action forcibly stopped for handle $handle: ${e.reason.message}, caching time: $cachingTime ms"
}
}
}
}