void FetchOperation::specializedTimeoutTriggered()

in squangle/mysql_client/Operation.cpp [1369:1438]


void FetchOperation::specializedTimeoutTriggered() {
  DCHECK(active_fetch_action_ != FetchAction::WaitForConsumer);
  auto delta = chrono::steady_clock::now() - start_time_;
  int64_t delta_micros =
      chrono::duration_cast<chrono::microseconds>(delta).count();
  std::string msg;

  if (conn()->getKillOnQueryTimeout()) {
    killRunningQuery();
  }

  /*
   * The MYSQL_RES struct contains a handle to the MYSQL struct that created
   * it. Currently, calling mysql_free_result attempts to flush the buffer in
   * accordance with the protocol. This makes it so that if a MYSQL_RES is
   * freed during a query and before the entire result is read, then the
   * subsequent queries sent over the same connection will still succeed.
   *
   * In Operation.h it can be seen that mysql_free_result is used to delete
   * the result set, instead of the nonblocking version. The logic to flush
   * the socket is impossible to correctly implement in a destructor, because
   * the function needs to be called repeatedly to ensure all data has been
   * read. Instead we use the code below to detach the result object from the
   * connection, so no network flushing is done.
   *
   * This does not cause a memory leak because the socket will still be cleaned
   * up when the connection is freed. AsyncMySQL also does not provide a way
   * for clients to read half a result, then send more queries. If we allowed
   * partial reads of results, then this strategy would not work. The most
   * common case where we would normally need to flush results is for client
   * query timeouts, where we might still be receiving rows when we interrupt
   * and return an error to the client.
   */
  if (rowStream() && rowStream()->mysql_query_result_) {
    rowStream()->mysql_query_result_->handle = nullptr;
  }

  std::string rows;
  if (rowStream() && rowStream()->numRowsSeen()) {
    rows = fmt::format(
        "{} rows, {} bytes seen",
        rowStream()->numRowsSeen(),
        rowStream()->query_result_size_);
  } else {
    rows = "no rows seen";
  }

  auto cbDelayUs = client()->callbackDelayMicrosAvg();
  if (cbDelayUs < kCallbackDelayStallThresholdUs) {
    msg = fmt::format(
        "[{}]({}) Query timed out ({}, took {} ms)",
        static_cast<uint16_t>(SquangleErrno::SQ_ERRNO_QUERY_TIMEOUT),
        kErrorPrefix,
        rows,
        std::lround(delta_micros / 1000.0));
    setAsyncClientError(CR_NET_READ_INTERRUPTED, msg, "Query timed out");
  } else {
    msg = fmt::format(
        "[{}]({}) Query timed out ({}, took {} ms) ({})",
        static_cast<uint16_t>(
            SquangleErrno::SQ_ERRNO_QUERY_TIMEOUT_LOOP_STALLED),
        kErrorPrefix,
        rows,
        std::lround(delta_micros / 1000.0),
        threadOverloadMessage(cbDelayUs));
    setAsyncClientError(
        CR_NET_READ_INTERRUPTED, msg, "Query timed out (loop stalled)");
  }
  completeOperation(OperationResult::TimedOut);
}