in modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java [219:493]
private boolean deserialize(ByteBuffer buf, UUID nodeId, boolean firstRecord) throws IOException {
if (buf.remaining() < 1)
return false;
byte opTypeByte = buf.get();
OperationType opType = OperationType.of(opTypeByte);
if (firstRecord && opType != VERSION)
throw new IgniteException("Unsupported file format");
if (opType == VERSION) {
if (buf.remaining() < OperationType.versionRecordSize())
return false;
short ver = buf.getShort();
if (ver != FilePerformanceStatisticsWriter.FILE_FORMAT_VERSION) {
throw new IgniteException("Unsupported file format version [fileVer=" + ver + ", supportedVer=" +
FilePerformanceStatisticsWriter.FILE_FORMAT_VERSION + ']');
}
return true;
}
else if (cacheOperation(opType)) {
if (buf.remaining() < cacheRecordSize())
return false;
int cacheId = buf.getInt();
long startTime = buf.getLong();
long duration = buf.getLong();
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.cacheOperation(nodeId, opType, cacheId, startTime, duration);
return true;
}
else if (transactionOperation(opType)) {
if (buf.remaining() < 4)
return false;
int cacheIdsCnt = buf.getInt();
if (buf.remaining() < transactionRecordSize(cacheIdsCnt) - 4)
return false;
GridIntList cacheIds = new GridIntList(cacheIdsCnt);
for (int i = 0; i < cacheIdsCnt; i++)
cacheIds.add(buf.getInt());
long startTime = buf.getLong();
long duration = buf.getLong();
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.transaction(nodeId, cacheIds, startTime, duration, opType == TX_COMMIT);
return true;
}
else if (opType == QUERY) {
ForwardableString text = readString(buf);
if (text == null || buf.remaining() < readQueryRecordSize())
return false;
GridCacheQueryType qryType = GridCacheQueryType.fromOrdinal(buf.get());
long id = buf.getLong();
long startTime = buf.getLong();
long duration = buf.getLong();
boolean success = buf.get() != 0;
forwardRead(text);
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.query(nodeId, qryType, text.str, id, startTime, duration, success);
return true;
}
else if (opType == SYSTEM_VIEW_SCHEMA) {
ForwardableString viewName = readString(buf);
if (viewName == null)
return false;
ForwardableString walkerName = readString(buf);
if (walkerName == null)
return false;
assert viewName.str != null : "Views are written by single thread, no string cache misses are possible";
assert walkerName.str != null : "Views are written by single thread, no string cache misses are possible";
try {
sysViewEntry = new SystemViewEntry(viewName.str, walkerName.str);
}
catch (ReflectiveOperationException e) {
throw new IgniteException("Could not find walker: " + walkerName);
}
return true;
}
else if (opType == SYSTEM_VIEW_ROW) {
List<Object> row = sysViewEntry.nextRow();
if (row == null)
return false;
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.systemView(nodeId, sysViewEntry.viewName, sysViewEntry.schema, row);
return true;
}
else if (opType == QUERY_READS) {
if (buf.remaining() < queryReadsRecordSize())
return false;
GridCacheQueryType qryType = GridCacheQueryType.fromOrdinal(buf.get());
UUID uuid = readUuid(buf);
long id = buf.getLong();
long logicalReads = buf.getLong();
long physicalReads = buf.getLong();
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.queryReads(nodeId, qryType, uuid, id, logicalReads, physicalReads);
return true;
}
else if (opType == QUERY_ROWS) {
ForwardableString action = readString(buf);
if (action == null || buf.remaining() < readQueryRowsRecordSize())
return false;
GridCacheQueryType qryType = GridCacheQueryType.fromOrdinal(buf.get());
UUID uuid = readUuid(buf);
long id = buf.getLong();
long rows = buf.getLong();
forwardRead(action);
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.queryRows(nodeId, qryType, uuid, id, action.str, rows);
return true;
}
else if (opType == QUERY_PROPERTY) {
ForwardableString name = readString(buf);
if (name == null)
return false;
ForwardableString val = readString(buf);
if (val == null)
return false;
if (buf.remaining() < readQueryPropertyRecordSize())
return false;
GridCacheQueryType qryType = GridCacheQueryType.fromOrdinal(buf.get());
UUID uuid = readUuid(buf);
long id = buf.getLong();
forwardRead(name);
forwardRead(val);
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.queryProperty(nodeId, qryType, uuid, id, name.str, val.str);
return true;
}
else if (opType == TASK) {
ForwardableString taskName = readString(buf);
if (taskName == null || buf.remaining() < readTaskRecordSize())
return false;
IgniteUuid sesId = readIgniteUuid(buf);
long startTime = buf.getLong();
long duration = buf.getLong();
int affPartId = buf.getInt();
forwardRead(taskName);
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.task(nodeId, sesId, taskName.str, startTime, duration, affPartId);
return true;
}
else if (opType == JOB) {
if (buf.remaining() < jobRecordSize())
return false;
IgniteUuid sesId = readIgniteUuid(buf);
long queuedTime = buf.getLong();
long startTime = buf.getLong();
long duration = buf.getLong();
boolean timedOut = buf.get() != 0;
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.job(nodeId, sesId, queuedTime, startTime, duration, timedOut);
return true;
}
else if (opType == CACHE_START) {
ForwardableString cacheName = readString(buf);
if (cacheName == null || buf.remaining() < readCacheStartRecordSize())
return false;
int cacheId = buf.getInt();
forwardRead(cacheName);
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.cacheStart(nodeId, cacheId, cacheName.str);
return true;
}
else if (opType == CHECKPOINT) {
if (buf.remaining() < checkpointRecordSize())
return false;
long beforeLockDuration = buf.getLong();
long lockWaitDuration = buf.getLong();
long listenersExecDuration = buf.getLong();
long markDuration = buf.getLong();
long lockHoldDuration = buf.getLong();
long pagesWriteDuration = buf.getLong();
long fsyncDuration = buf.getLong();
long walCpRecordFsyncDuration = buf.getLong();
long writeCheckpointEntryDuration = buf.getLong();
long splitAndSortCpPagesDuration = buf.getLong();
long recoveryDataWriteDuration = buf.getLong();
long totalDuration = buf.getLong();
long cpStartTime = buf.getLong();
int pagesSize = buf.getInt();
int dataPagesWritten = buf.getInt();
int cowPagesWritten = buf.getInt();
for (PerformanceStatisticsHandler hnd : curHnd) {
hnd.checkpoint(nodeId,
beforeLockDuration,
lockWaitDuration,
listenersExecDuration,
markDuration,
lockHoldDuration,
pagesWriteDuration,
fsyncDuration,
walCpRecordFsyncDuration,
writeCheckpointEntryDuration,
splitAndSortCpPagesDuration,
recoveryDataWriteDuration,
totalDuration,
cpStartTime,
pagesSize,
dataPagesWritten,
cowPagesWritten);
}
return true;
}
else if (opType == PAGES_WRITE_THROTTLE) {
if (buf.remaining() < pagesWriteThrottleRecordSize())
return false;
long endTime = buf.getLong();
long duration = buf.getLong();
for (PerformanceStatisticsHandler hnd : curHnd)
hnd.pagesWriteThrottle(nodeId, endTime, duration);
return true;
}
else
throw new IgniteException("Unknown operation type id [typeId=" + opTypeByte + ']');
}