in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java [913:1115]
public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
TSFastLastDataQueryForOneDeviceReq req) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
if (!SESSION_MANAGER.checkLogin(clientSession)) {
return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
}
long startTime = System.nanoTime();
Throwable t = null;
try {
String db;
String device;
PartialPath devicePath;
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
if (req.isLegalPathNodes()) {
db = req.db;
device = req.deviceId;
devicePath = new PartialPath(device.split("\\."));
} else {
db = new PartialPath(req.db).getFullPath();
devicePath = new PartialPath(req.deviceId);
device = devicePath.getFullPath();
}
IDeviceID deviceID = Factory.DEFAULT_FACTORY.create(device);
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(deviceID, Collections.emptyList(), true, true);
DataPartition dataPartition =
partitionFetcher.getDataPartitionWithUnclosedTimeRange(
Collections.singletonMap(db, Collections.singletonList(queryParam)));
List<TRegionReplicaSet> regionReplicaSets =
dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceID, null);
// no valid DataRegion
if (regionReplicaSets.isEmpty()
|| regionReplicaSets.size() == 1 && NOT_ASSIGNED == regionReplicaSets.get(0)) {
TSExecuteStatementResp resp =
createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
resp.setQueryResult(Collections.emptyList());
finished = true;
resp.setMoreData(false);
sampleForCacheHitFastLastDataQueryForOneDevice(req);
return resp;
}
TEndPoint lastRegionLeader =
regionReplicaSets
.get(regionReplicaSets.size() - 1)
.dataNodeLocations
.get(0)
.mPPDataExchangeEndPoint;
// the device's dataRegion's leader of the latest time partition is on current node, may can
// read directly from cache
if (isSameNode(lastRegionLeader)) {
// the device's all dataRegions' leader are on current node, can use null entry in cache
boolean canUseNullEntry =
regionReplicaSets.stream()
.limit(regionReplicaSets.size() - 1L)
.allMatch(
regionReplicaSet ->
isSameNode(
regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint));
int sensorNum = req.sensors.size();
TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum);
boolean allCached = true;
for (String sensor : req.sensors) {
MeasurementPath fullPath;
if (req.isLegalPathNodes()) {
fullPath = devicePath.concatAsMeasurementPath(sensor);
} else {
fullPath = devicePath.concatAsMeasurementPath((new PartialPath(sensor)).getFullPath());
}
TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath);
if (timeValuePair == null) {
allCached = false;
break;
} else if (timeValuePair.getValue() == null) {
// there is no data for this sensor
if (!canUseNullEntry) {
allCached = false;
break;
}
} else {
// we don't consider TTL
LastQueryUtil.appendLastValue(
builder,
timeValuePair.getTimestamp(),
new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET),
timeValuePair.getValue().getStringValue(),
timeValuePair.getValue().getDataType().name());
}
}
// cache hit
if (allCached) {
TSExecuteStatementResp resp =
createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
if (builder.isEmpty()) {
resp.setQueryResult(Collections.emptyList());
} else {
resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build())));
}
finished = true;
resp.setMoreData(false);
sampleForCacheHitFastLastDataQueryForOneDevice(req);
return resp;
}
}
// cache miss
Statement s = StatementGenerator.createStatement(convert(req));
// permission check
TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return RpcUtils.getTSExecuteStatementResp(status);
}
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(String.format("Last Data Query: %s", req), s);
}
// create and cache dataset
ExecutionResult result =
COORDINATOR.executeForTreeModel(
s,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession),
"",
partitionFetcher,
schemaFetcher,
req.getTimeout(),
true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
finished = true;
throw new RuntimeException("error code: " + result.status);
}
IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);
try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
TSExecuteStatementResp resp;
if (queryExecution.isQuery()) {
resp = createResponse(queryExecution.getDatasetHeader(), queryId);
TSStatus tsstatus = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
tsstatus.setRedirectNode(
regionReplicaSets
.get(regionReplicaSets.size() - 1)
.dataNodeLocations
.get(0)
.clientRpcEndPoint);
resp.setStatus(tsstatus);
finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize);
resp.setMoreData(!finished);
quota.addReadResult(resp.getQueryResult());
} else {
resp = RpcUtils.getTSExecuteStatementResp(result.status);
}
return resp;
}
} catch (Exception e) {
finished = true;
t = e;
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
} catch (Error error) {
finished = true;
t = error;
throw error;
} finally {
long currentOperationCost = System.nanoTime() - startTime;
COORDINATOR.recordExecutionTime(queryId, currentOperationCost);
// record each operation time cost
CommonUtils.addStatementExecutionLatency(
OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost);
if (finished) {
// record total time cost for one query
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
clearUp(clientSession, statementId, queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
quota.close();
}
}
}