public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java [913:1115]


  public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
      TSFastLastDataQueryForOneDeviceReq req) {
    boolean finished = false;
    Long statementId = req.getStatementId();
    long queryId = Long.MIN_VALUE;
    IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
    OperationQuota quota = null;
    if (!SESSION_MANAGER.checkLogin(clientSession)) {
      return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus());
    }
    long startTime = System.nanoTime();
    Throwable t = null;
    try {
      String db;
      String device;
      PartialPath devicePath;

      queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);

      if (req.isLegalPathNodes()) {
        db = req.db;
        device = req.deviceId;
        devicePath = new PartialPath(device.split("\\."));
      } else {
        db = new PartialPath(req.db).getFullPath();
        devicePath = new PartialPath(req.deviceId);
        device = devicePath.getFullPath();
      }

      IDeviceID deviceID = Factory.DEFAULT_FACTORY.create(device);

      DataPartitionQueryParam queryParam =
          new DataPartitionQueryParam(deviceID, Collections.emptyList(), true, true);
      DataPartition dataPartition =
          partitionFetcher.getDataPartitionWithUnclosedTimeRange(
              Collections.singletonMap(db, Collections.singletonList(queryParam)));
      List<TRegionReplicaSet> regionReplicaSets =
          dataPartition.getDataRegionReplicaSetWithTimeFilter(deviceID, null);

      // no valid DataRegion
      if (regionReplicaSets.isEmpty()
          || regionReplicaSets.size() == 1 && NOT_ASSIGNED == regionReplicaSets.get(0)) {
        TSExecuteStatementResp resp =
            createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
        resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
        resp.setQueryResult(Collections.emptyList());
        finished = true;
        resp.setMoreData(false);
        sampleForCacheHitFastLastDataQueryForOneDevice(req);
        return resp;
      }

      TEndPoint lastRegionLeader =
          regionReplicaSets
              .get(regionReplicaSets.size() - 1)
              .dataNodeLocations
              .get(0)
              .mPPDataExchangeEndPoint;

      // the device's dataRegion's leader of the latest time partition is on current node, may can
      // read directly from cache
      if (isSameNode(lastRegionLeader)) {
        // the device's all dataRegions' leader are on current node, can use null entry in cache
        boolean canUseNullEntry =
            regionReplicaSets.stream()
                .limit(regionReplicaSets.size() - 1L)
                .allMatch(
                    regionReplicaSet ->
                        isSameNode(
                            regionReplicaSet.dataNodeLocations.get(0).mPPDataExchangeEndPoint));
        int sensorNum = req.sensors.size();
        TsBlockBuilder builder = LastQueryUtil.createTsBlockBuilder(sensorNum);
        boolean allCached = true;
        for (String sensor : req.sensors) {
          MeasurementPath fullPath;
          if (req.isLegalPathNodes()) {
            fullPath = devicePath.concatAsMeasurementPath(sensor);
          } else {
            fullPath = devicePath.concatAsMeasurementPath((new PartialPath(sensor)).getFullPath());
          }
          TimeValuePair timeValuePair = DATA_NODE_SCHEMA_CACHE.getLastCache(fullPath);
          if (timeValuePair == null) {
            allCached = false;
            break;
          } else if (timeValuePair.getValue() == null) {
            // there is no data for this sensor
            if (!canUseNullEntry) {
              allCached = false;
              break;
            }
          } else {
            // we don't consider TTL
            LastQueryUtil.appendLastValue(
                builder,
                timeValuePair.getTimestamp(),
                new Binary(fullPath.getFullPath(), TSFileConfig.STRING_CHARSET),
                timeValuePair.getValue().getStringValue(),
                timeValuePair.getValue().getDataType().name());
          }
        }
        // cache hit
        if (allCached) {
          TSExecuteStatementResp resp =
              createResponse(DatasetHeaderFactory.getLastQueryHeader(), queryId);
          resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, ""));
          if (builder.isEmpty()) {
            resp.setQueryResult(Collections.emptyList());
          } else {
            resp.setQueryResult(Collections.singletonList(serde.serialize(builder.build())));
          }
          finished = true;
          resp.setMoreData(false);
          sampleForCacheHitFastLastDataQueryForOneDevice(req);
          return resp;
        }
      }

      // cache miss
      Statement s = StatementGenerator.createStatement(convert(req));
      // permission check
      TSStatus status = AuthorityChecker.checkAuthority(s, clientSession);
      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
        return RpcUtils.getTSExecuteStatementResp(status);
      }

      quota =
          DataNodeThrottleQuotaManager.getInstance()
              .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);

      if (ENABLE_AUDIT_LOG) {
        AuditLogger.log(String.format("Last Data Query: %s", req), s);
      }
      // create and cache dataset
      ExecutionResult result =
          COORDINATOR.executeForTreeModel(
              s,
              queryId,
              SESSION_MANAGER.getSessionInfo(clientSession),
              "",
              partitionFetcher,
              schemaFetcher,
              req.getTimeout(),
              true);

      if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
        finished = true;
        throw new RuntimeException("error code: " + result.status);
      }

      IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId);

      try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) {
        TSExecuteStatementResp resp;
        if (queryExecution.isQuery()) {
          resp = createResponse(queryExecution.getDatasetHeader(), queryId);
          TSStatus tsstatus = new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode());
          tsstatus.setRedirectNode(
              regionReplicaSets
                  .get(regionReplicaSets.size() - 1)
                  .dataNodeLocations
                  .get(0)
                  .clientRpcEndPoint);
          resp.setStatus(tsstatus);
          finished = SELECT_RESULT.apply(resp, queryExecution, req.fetchSize);
          resp.setMoreData(!finished);
          quota.addReadResult(resp.getQueryResult());
        } else {
          resp = RpcUtils.getTSExecuteStatementResp(result.status);
        }
        return resp;
      }

    } catch (Exception e) {
      finished = true;
      t = e;
      return RpcUtils.getTSExecuteStatementResp(
          onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY));
    } catch (Error error) {
      finished = true;
      t = error;
      throw error;
    } finally {

      long currentOperationCost = System.nanoTime() - startTime;
      COORDINATOR.recordExecutionTime(queryId, currentOperationCost);

      // record each operation time cost
      CommonUtils.addStatementExecutionLatency(
          OperationType.EXECUTE_LAST_DATA_QUERY, StatementType.QUERY.name(), currentOperationCost);

      if (finished) {
        // record total time cost for one query
        long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
        CommonUtils.addQueryLatency(
            StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
        clearUp(clientSession, statementId, queryId, req, t);
      }
      SESSION_MANAGER.updateIdleTime();
      if (quota != null) {
        quota.close();
      }
    }
  }