in squangle/mysql_client/Operation.cpp [1369:1438]
void FetchOperation::specializedTimeoutTriggered() {
DCHECK(active_fetch_action_ != FetchAction::WaitForConsumer);
auto delta = chrono::steady_clock::now() - start_time_;
int64_t delta_micros =
chrono::duration_cast<chrono::microseconds>(delta).count();
std::string msg;
if (conn()->getKillOnQueryTimeout()) {
killRunningQuery();
}
/*
* The MYSQL_RES struct contains a handle to the MYSQL struct that created
* it. Currently, calling mysql_free_result attempts to flush the buffer in
* accordance with the protocol. This makes it so that if a MYSQL_RES is
* freed during a query and before the entire result is read, then the
* subsequent queries sent over the same connection will still succeed.
*
* In Operation.h it can be seen that mysql_free_result is used to delete
* the result set, instead of the nonblocking version. The logic to flush
* the socket is impossible to correctly implement in a destructor, because
* the function needs to be called repeatedly to ensure all data has been
* read. Instead we use the code below to detach the result object from the
* connection, so no network flushing is done.
*
* This does not cause a memory leak because the socket will still be cleaned
* up when the connection is freed. AsyncMySQL also does not provide a way
* for clients to read half a result, then send more queries. If we allowed
* partial reads of results, then this strategy would not work. The most
* common case where we would normally need to flush results is for client
* query timeouts, where we might still be receiving rows when we interrupt
* and return an error to the client.
*/
if (rowStream() && rowStream()->mysql_query_result_) {
rowStream()->mysql_query_result_->handle = nullptr;
}
std::string rows;
if (rowStream() && rowStream()->numRowsSeen()) {
rows = fmt::format(
"{} rows, {} bytes seen",
rowStream()->numRowsSeen(),
rowStream()->query_result_size_);
} else {
rows = "no rows seen";
}
auto cbDelayUs = client()->callbackDelayMicrosAvg();
if (cbDelayUs < kCallbackDelayStallThresholdUs) {
msg = fmt::format(
"[{}]({}) Query timed out ({}, took {} ms)",
static_cast<uint16_t>(SquangleErrno::SQ_ERRNO_QUERY_TIMEOUT),
kErrorPrefix,
rows,
std::lround(delta_micros / 1000.0));
setAsyncClientError(CR_NET_READ_INTERRUPTED, msg, "Query timed out");
} else {
msg = fmt::format(
"[{}]({}) Query timed out ({}, took {} ms) ({})",
static_cast<uint16_t>(
SquangleErrno::SQ_ERRNO_QUERY_TIMEOUT_LOOP_STALLED),
kErrorPrefix,
rows,
std::lround(delta_micros / 1000.0),
threadOverloadMessage(cbDelayUs));
setAsyncClientError(
CR_NET_READ_INTERRUPTED, msg, "Query timed out (loop stalled)");
}
completeOperation(OperationResult::TimedOut);
}