in receiver/mysqlreceiver/client.go [417:672]
func (c *mySQLClient) getReplicaStatusStats() ([]ReplicaStatusStats, error) {
mysqlVersion, err := c.getVersion()
if err != nil {
return nil, err
}
query := "SHOW REPLICA STATUS"
minMysqlVersion, _ := version.NewVersion("8.0.22")
if strings.Contains(mysqlVersion.String(), "MariaDB") {
query = "SHOW SLAVE STATUS"
} else if mysqlVersion.LessThan(minMysqlVersion) {
query = "SHOW SLAVE STATUS"
}
rows, err := c.client.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
cols, err := rows.Columns()
if err != nil {
return nil, err
}
var stats []ReplicaStatusStats
for rows.Next() {
var s ReplicaStatusStats
dest := []any{}
for _, col := range cols {
switch strings.ToLower(col) {
case "replica_io_state":
dest = append(dest, &s.replicaIOState)
case "slave_io_state":
dest = append(dest, &s.replicaIOState)
case "source_host":
dest = append(dest, &s.sourceHost)
case "master_host":
dest = append(dest, &s.sourceHost)
case "source_user":
dest = append(dest, &s.sourceUser)
case "master_user":
dest = append(dest, &s.sourceUser)
case "source_port":
dest = append(dest, &s.sourcePort)
case "master_port":
dest = append(dest, &s.sourcePort)
case "connect_retry":
dest = append(dest, &s.connectRetry)
case "source_log_file":
dest = append(dest, &s.sourceLogFile)
case "master_log_file":
dest = append(dest, &s.sourceLogFile)
case "read_source_log_pos":
dest = append(dest, &s.readSourceLogPos)
case "read_master_log_pos":
dest = append(dest, &s.readSourceLogPos)
case "relay_log_file":
dest = append(dest, &s.relayLogFile)
case "relay_log_pos":
dest = append(dest, &s.relayLogPos)
case "relay_source_log_file":
dest = append(dest, &s.relaySourceLogFile)
case "relay_master_log_file":
dest = append(dest, &s.relaySourceLogFile)
case "replica_io_running":
dest = append(dest, &s.replicaIORunning)
case "slave_io_running":
dest = append(dest, &s.replicaIORunning)
case "replica_sql_running":
dest = append(dest, &s.replicaSQLRunning)
case "slave_sql_running":
dest = append(dest, &s.replicaSQLRunning)
case "replicate_do_db":
dest = append(dest, &s.replicateDoDB)
case "replicate_ignore_db":
dest = append(dest, &s.replicateIgnoreDB)
case "replicate_do_table":
dest = append(dest, &s.replicateDoTable)
case "replicate_ignore_table":
dest = append(dest, &s.replicateIgnoreTable)
case "replicate_wild_do_table":
dest = append(dest, &s.replicateWildDoTable)
case "replicate_wild_ignore_table":
dest = append(dest, &s.replicateWildIgnoreTable)
case "last_errno":
dest = append(dest, &s.lastErrno)
case "last_error":
dest = append(dest, &s.lastError)
case "skip_counter":
dest = append(dest, &s.skipCounter)
case "exec_source_log_pos":
dest = append(dest, &s.execSourceLogPos)
case "exec_master_log_pos":
dest = append(dest, &s.execSourceLogPos)
case "relay_log_space":
dest = append(dest, &s.relayLogSpace)
case "until_condition":
dest = append(dest, &s.untilCondition)
case "until_log_file":
dest = append(dest, &s.untilLogFile)
case "until_log_pos":
dest = append(dest, &s.untilLogPos)
case "source_ssl_allowed":
dest = append(dest, &s.sourceSSLAllowed)
case "master_ssl_allowed":
dest = append(dest, &s.sourceSSLAllowed)
case "source_ssl_ca_file":
dest = append(dest, &s.sourceSSLCAFile)
case "master_ssl_ca_file":
dest = append(dest, &s.sourceSSLCAFile)
case "source_ssl_ca_path":
dest = append(dest, &s.sourceSSLCAPath)
case "master_ssl_ca_path":
dest = append(dest, &s.sourceSSLCAPath)
case "source_ssl_cert":
dest = append(dest, &s.sourceSSLCert)
case "master_ssl_cert":
dest = append(dest, &s.sourceSSLCert)
case "source_ssl_cipher":
dest = append(dest, &s.sourceSSLCipher)
case "master_ssl_cipher":
dest = append(dest, &s.sourceSSLCipher)
case "source_ssl_key":
dest = append(dest, &s.sourceSSLKey)
case "master_ssl_key":
dest = append(dest, &s.sourceSSLKey)
case "seconds_behind_source":
dest = append(dest, &s.secondsBehindSource)
case "seconds_behind_master":
dest = append(dest, &s.secondsBehindSource)
case "source_ssl_verify_server_cert":
dest = append(dest, &s.sourceSSLVerifyServerCert)
case "master_ssl_verify_server_cert":
dest = append(dest, &s.sourceSSLVerifyServerCert)
case "last_io_errno":
dest = append(dest, &s.lastIOErrno)
case "last_io_error":
dest = append(dest, &s.lastIOError)
case "last_sql_errno":
dest = append(dest, &s.lastSQLErrno)
case "last_sql_error":
dest = append(dest, &s.lastSQLError)
case "replicate_ignore_server_ids":
dest = append(dest, &s.replicateIgnoreServerIDs)
case "source_server_id":
dest = append(dest, &s.sourceServerID)
case "master_server_id":
dest = append(dest, &s.sourceServerID)
case "source_uuid":
dest = append(dest, &s.sourceUUID)
case "master_uuid":
dest = append(dest, &s.sourceUUID)
case "source_info_file":
dest = append(dest, &s.sourceInfoFile)
case "master_info_file":
dest = append(dest, &s.sourceInfoFile)
case "sql_delay":
dest = append(dest, &s.sqlDelay)
case "sql_remaining_delay":
dest = append(dest, &s.sqlRemainingDelay)
case "replica_sql_running_state":
dest = append(dest, &s.replicaSQLRunningState)
case "slave_sql_running_state":
dest = append(dest, &s.replicaSQLRunningState)
case "source_retry_count":
dest = append(dest, &s.sourceRetryCount)
case "master_retry_count":
dest = append(dest, &s.sourceRetryCount)
case "source_bind":
dest = append(dest, &s.sourceBind)
case "master_bind":
dest = append(dest, &s.sourceBind)
case "last_io_error_timestamp":
dest = append(dest, &s.lastIOErrorTimestamp)
case "last_sql_error_timestamp":
dest = append(dest, &s.lastSQLErrorTimestamp)
case "source_ssl_crl":
dest = append(dest, &s.sourceSSLCrl)
case "master_ssl_crl":
dest = append(dest, &s.sourceSSLCrl)
case "source_ssl_crlpath":
dest = append(dest, &s.sourceSSLCrlpath)
case "master_ssl_crlpath":
dest = append(dest, &s.sourceSSLCrlpath)
case "retrieved_gtid_set":
dest = append(dest, &s.retrievedGtidSet)
case "executed_gtid_set":
dest = append(dest, &s.executedGtidSet)
case "auto_position":
dest = append(dest, &s.autoPosition)
case "replicate_rewrite_db":
dest = append(dest, &s.replicateRewriteDB)
case "channel_name":
dest = append(dest, &s.channelName)
case "source_tls_version":
dest = append(dest, &s.sourceTLSVersion)
case "master_tls_version":
dest = append(dest, &s.sourceTLSVersion)
case "source_public_key_path":
dest = append(dest, &s.sourcePublicKeyPath)
case "master_public_key_path":
dest = append(dest, &s.sourcePublicKeyPath)
case "get_source_public_key":
dest = append(dest, &s.getSourcePublicKey)
case "get_master_public_key":
dest = append(dest, &s.getSourcePublicKey)
case "network_namespace":
dest = append(dest, &s.networkNamespace)
case "using_gtid":
dest = append(dest, &s.usingGtid)
case "gtid_io_pos":
dest = append(dest, &s.gtidIoPos)
case "slave_ddl_groups":
dest = append(dest, &s.slaveDdlGroups)
case "slave_non_transactional_groups":
dest = append(dest, &s.slaveNonTransactionalGroups)
case "slave_transactional_groups":
dest = append(dest, &s.slaveTransactionalGroups)
case "retried_transactions":
dest = append(dest, &s.retriedTransactions)
case "max_relay_log_size":
dest = append(dest, &s.maxRelayLogSize)
case "executed_log_entries":
dest = append(dest, &s.executedLogEntries)
case "slave_received_heartbeats":
dest = append(dest, &s.slaveReceivedHeartbeats)
case "slave_heartbeat_period":
dest = append(dest, &s.slaveHeartbeatPeriod)
case "gtid_slave_pos":
dest = append(dest, &s.gtidSlavePos)
case "master_last_event_time":
dest = append(dest, &s.masterLastEventTime)
case "slave_last_event_time":
dest = append(dest, &s.slaveLastEventTime)
case "master_slave_time_diff":
dest = append(dest, &s.masterSlaveTimeDiff)
case "parallel_mode":
dest = append(dest, &s.parallelMode)
case "replicate_do_domain_ids":
dest = append(dest, &s.replicateDoDomainIDs)
case "replicate_ignore_domain_ids":
dest = append(dest, &s.replicateIgnoreDomainIDs)
default:
return nil, fmt.Errorf("unknown column name %s for replica status", col)
}
}
err := rows.Scan(dest...)
if err != nil {
return nil, err
}
stats = append(stats, s)
}
return stats, nil
}