in squangle/mysql_client/Operation.cpp [1130:1313]
void FetchOperation::socketActionable() {
DCHECK(isInEventBaseThread());
DCHECK(active_fetch_action_ != FetchAction::WaitForConsumer);
auto& handler = conn()->client()->getMysqlHandler();
MYSQL* mysql = conn()->mysql();
// This loop runs the fetch actions required to successfully execute query,
// request next results, fetch results, identify errors and complete operation
// and queries.
// All callbacks are done in the `notify` methods that children must
// override. During callbacks for actions `Fetch` and `CompleteQuery`,
// the consumer is allowed to pause the operation.
// Some actions may request an action above it (like CompleteQuery may request
// StartQuery) this is why we use this loop.
while (1) {
// When the fetch action is StartQuery it means either we need to execute
// the query or ask for new results.
// Next Actions:
// - StartQuery: may continue with StartQuery if socket not actionable, in
// this case socketActionable is exited;
// - CompleteOperation: if it fails to execute query or request next
// results.
// - InitFetch: no errors during results request, so we initiate fetch.
if (active_fetch_action_ == FetchAction::StartQuery) {
auto status = MysqlHandler::PENDING;
if (query_executed_) {
++num_current_query_;
status = handler.nextResult(mysql);
} else {
status = handler.runQuery(mysql, rendered_query_);
}
if (status == MysqlHandler::PENDING) {
waitForSocketActionable();
return;
}
current_last_insert_id_ = 0;
current_affected_rows_ = 0;
current_recv_gtid_ = std::string();
query_executed_ = true;
if (status == MysqlHandler::ERROR) {
active_fetch_action_ = FetchAction::CompleteQuery;
} else {
active_fetch_action_ = FetchAction::InitFetch;
}
}
// Prior fetch start we read the values that may indicate errors, rows to
// fetch or not. The initialize from children classes is called either way
// to signal that any other calls from now are regarding a new query.
// Next Actions:
// - CompleteOperation: in case an error occurred
// - Fetch: there are rows to fetch in this query
// - CompleteQuery: no rows to fetch (complete query will read rowsAffected
// and lastInsertId to add to result
if (active_fetch_action_ == FetchAction::InitFetch) {
auto* mysql_query_result = handler.getResult(mysql);
auto num_fields = mysql_field_count(mysql);
// Check to see if this an empty query or an error
if (!mysql_query_result && num_fields > 0) {
// Failure. CompleteQuery will read errors.
active_fetch_action_ = FetchAction::CompleteQuery;
} else {
if (num_fields > 0) {
current_row_stream_.assign(RowStream(mysql_query_result, &handler));
active_fetch_action_ = FetchAction::Fetch;
} else {
active_fetch_action_ = FetchAction::CompleteQuery;
}
notifyInitQuery();
}
}
// This action is going to stick around until all rows are fetched or an
// error occurs. When the RowStream is ready, we notify the subclasses for
// them to consume it.
// If `pause` is called during the callback and the stream is consumed then,
// `row_stream_` is checked and we skip to the next action `CompleteQuery`.
// If row_stream_ isn't ready, we wait for socket actionable.
// Next Actions:
// - Fetch: in case it needs to fetch more rows, we break the loop and wait
// for socketActionable to be called again
// - CompleteQuery: an error occurred or rows finished to fetch
// - WaitForConsumer: in case `pause` is called during `notifyRowsReady`
if (active_fetch_action_ == FetchAction::Fetch) {
DCHECK(current_row_stream_.has_value());
// Try to catch when the user didn't pause or consumed the rows
if (current_row_stream_->current_row_.has_value()) {
// This should help
LOG(ERROR) << "Rows not consumed. Perhaps missing `pause`?";
cancel_ = true;
active_fetch_action_ = FetchAction::CompleteQuery;
continue;
}
// When the query finished, `is_ready` is true, but there are no rows.
bool is_ready = current_row_stream_->slurp();
if (!is_ready) {
waitForSocketActionable();
break;
}
if (current_row_stream_->hasQueryFinished()) {
active_fetch_action_ = FetchAction::CompleteQuery;
} else {
notifyRowsReady();
}
}
// In case the query has at least started and finished by error or not,
// here the final checks and data are gathered for the current query.
// It checks if any errors occurred during query, and call children classes
// to deal with their specialized query completion.
// If `pause` is called, then `paused_action_` will be already `StartQuery`
// or `CompleteOperation`.
// Next Actions:
// - StartQuery: There are more results and children is not opposed to it.
// QueryOperation child sets to CompleteOperation, since it
// is not supposed to receive more than one result.
// - CompleteOperation: In case an error occurred during query or there are
// no more results to read.
// - WaitForConsumer: In case `pause` is called during notification.
if (active_fetch_action_ == FetchAction::CompleteQuery) {
snapshotMysqlErrors();
bool more_results = false;
if (mysql_errno_ != 0 || cancel_) {
active_fetch_action_ = FetchAction::CompleteOperation;
} else {
current_last_insert_id_ = mysql_insert_id(mysql);
current_affected_rows_ = mysql_affected_rows(mysql);
const char* data;
size_t length;
if (!mysql_session_track_get_first(
mysql, SESSION_TRACK_GTIDS, &data, &length)) {
current_recv_gtid_ = std::string(data, length);
}
current_resp_attrs_ = readResponseAttributes();
more_results = mysql_more_results(mysql);
active_fetch_action_ = more_results ? FetchAction::StartQuery
: FetchAction::CompleteOperation;
// Call it after setting the active_fetch_action_ so the child class can
// decide if it wants to change the state
if (current_row_stream_ && current_row_stream_->mysql_query_result_) {
rows_received_ +=
mysql_num_rows(current_row_stream_->mysql_query_result_.get());
total_result_size_ += current_row_stream_->query_result_size_;
}
++num_queries_executed_;
no_index_used_ |= mysql->server_status & SERVER_QUERY_NO_INDEX_USED;
notifyQuerySuccess(more_results);
}
current_row_stream_.reset();
}
// Once this action is set, the operation is going to be completed no matter
// the reason it was called. It exists the loop.
if (active_fetch_action_ == FetchAction::CompleteOperation) {
if (cancel_) {
state_ = OperationState::Cancelling;
completeOperation(OperationResult::Cancelled);
} else if (mysql_errno_ != 0) {
completeOperation(OperationResult::Failed);
} else {
completeOperation(OperationResult::Succeeded);
}
break;
}
// If `pause` is called during the operation callbacks, this the Action it
// should come to.
// It's not necessary to unregister the socket event, so just cancel the
// timeout and wait for `resume` to be called.
if (active_fetch_action_ == FetchAction::WaitForConsumer) {
conn()->socketHandler()->cancelTimeout();
break;
}
}
}