in collector/syncer.go [640:734]
func (sync *OplogSyncer) RestAPI() {
type Time struct {
TimestampUnix int64 `json:"unix"`
TimestampTime string `json:"time"`
}
type MongoTime struct {
Time
TimestampMongo string `json:"ts"`
}
type Info struct {
Who string `json:"who"`
Tag string `json:"tag"`
ReplicaSet string `json:"replset"`
Logs uint64 `json:"logs_get"`
LogsRepl uint64 `json:"logs_repl"`
LogsSuccess uint64 `json:"logs_success"`
Tps uint64 `json:"tps"`
Lsn *MongoTime `json:"lsn"`
LsnAck *MongoTime `json:"lsn_ack"`
LsnCkpt *MongoTime `json:"lsn_ckpt"`
Now *Time `json:"now"`
OplogAvg string `json:"log_size_avg"`
OplogMax string `json:"log_size_max"`
}
// total replication info
utils.IncrSyncHttpApi.RegisterAPI("/repl", nimo.HttpGet, func([]byte) interface{} {
return &Info{
Who: conf.Options.Id,
Tag: utils.BRANCH,
ReplicaSet: sync.Replset,
Logs: sync.replMetric.Get(),
LogsRepl: sync.replMetric.Apply(),
LogsSuccess: sync.replMetric.Success(),
Tps: sync.replMetric.Tps(),
Lsn: &MongoTime{
TimestampMongo: utils.Int64ToString(sync.replMetric.LSN),
Time: Time{
TimestampUnix: utils.ExtractMongoTimestamp(sync.replMetric.LSN),
TimestampTime: utils.TimestampToString(utils.ExtractMongoTimestamp(sync.replMetric.LSN)),
}},
LsnCkpt: &MongoTime{
TimestampMongo: utils.Int64ToString(sync.replMetric.LSNCheckpoint),
Time: Time{
TimestampUnix: utils.ExtractMongoTimestamp(sync.replMetric.LSNCheckpoint),
TimestampTime: utils.TimestampToString(utils.ExtractMongoTimestamp(sync.replMetric.LSNCheckpoint)),
}},
LsnAck: &MongoTime{
TimestampMongo: utils.Int64ToString(sync.replMetric.LSNAck),
Time: Time{
TimestampUnix: utils.ExtractMongoTimestamp(sync.replMetric.LSNAck),
TimestampTime: utils.TimestampToString(utils.ExtractMongoTimestamp(sync.replMetric.LSNAck)),
}},
Now: &Time{
TimestampUnix: time.Now().Unix(),
TimestampTime: utils.TimestampToString(time.Now().Unix()),
},
OplogAvg: utils.GetMetricWithSize(sync.replMetric.OplogAvgSize),
OplogMax: utils.GetMetricWithSize(sync.replMetric.OplogMaxSize),
}
})
// queue size info
type InnerQueue struct {
Id uint `json:"queue_id"`
PendingQueue uint64 `json:"pending_queue_used"`
LogsQueue uint64 `json:"logs_queue_used"`
}
type Queue struct {
SyncerId string `json:"syncer_replica_set_name"`
LogsQueuePerSize int `json:"logs_queue_size"`
PendingQueuePerSize int `json:"pending_queue_size"`
InnerQueue []InnerQueue `json:"syncer_inner_queue"`
PersisterBufferUsed int `json:"persister_buffer_used"`
}
utils.IncrSyncHttpApi.RegisterAPI("/queue", nimo.HttpGet, func([]byte) interface{} {
queue := make([]InnerQueue, calculatePendingQueueConcurrency())
for i := 0; i < len(queue); i++ {
queue[i] = InnerQueue{
Id: uint(i),
PendingQueue: uint64(len(sync.PendingQueue[i])),
LogsQueue: uint64(len(sync.logsQueue[i])),
}
}
return &Queue{
SyncerId: sync.Replset,
LogsQueuePerSize: cap(sync.logsQueue[0]),
PendingQueuePerSize: cap(sync.PendingQueue[0]),
InnerQueue: queue,
PersisterBufferUsed: len(sync.persister.Buffer),
}
})
}