in api/debug_handler.go [141:254]
func (handler *DebugHandler) ShowBatch(w *utils.ResponseWriter, r *http.Request) {
var request ShowBatchRequest
var response ShowBatchResponse
var err error
err = common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if request.NumRows <= 0 || request.NumRows > 100 {
request.NumRows = 100
}
if request.StartRow < 0 {
request.StartRow = 0
}
response.Body.StartRow = request.StartRow
schema, err := handler.memStore.GetSchema(request.TableName)
if err != nil {
w.WriteError(ErrTableDoesNotExist)
return
}
schema.RLock()
response.Body.Vectors = make([]memCom.SlicedVector, 0, len(schema.Schema.Columns))
response.Body.Columns = make([]string, 0, len(schema.Schema.Columns))
for columnID, column := range schema.Schema.Columns {
response.Body.Columns = append(response.Body.Columns, column.Name)
response.Body.Types = append(response.Body.Types, column.Type)
if column.Deleted {
response.Body.Deleted = append(response.Body.Deleted, columnID)
}
}
schema.RUnlock()
var shard *memstore.TableShard
shard, err = handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer func() {
shard.Users.Done()
if err != nil {
w.WriteError(err)
} else {
w.WriteObject(response.Body)
}
}()
// request archiveBatch
if request.BatchID >= 0 {
shard.ArchiveStore.RLock()
currentVersion := shard.ArchiveStore.CurrentVersion
currentVersion.Users.Add(1)
shard.ArchiveStore.RUnlock()
defer currentVersion.Users.Done()
archiveBatch := currentVersion.GetBatchForRead(request.BatchID)
if archiveBatch == nil {
err = ErrBatchDoesNotExist
return
}
defer archiveBatch.RUnlock()
// holding archive batch lock will prevent any loading and eviction.
response.Body.NumRows, response.Body.Vectors = readRows(archiveBatch.Columns, request.StartRow, request.NumRows)
} else {
// request liveBatch
batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
liveBatch := shard.LiveStore.GetBatchForRead(int32(request.BatchID))
if liveBatch == nil {
err = ErrBatchDoesNotExist
return
}
defer liveBatch.RUnlock()
if batchIDs[len(batchIDs)-1] == int32(request.BatchID) {
if request.StartRow >= numRecordsInLastBatch {
return
}
if request.NumRows > numRecordsInLastBatch-request.StartRow {
request.NumRows = numRecordsInLastBatch - request.StartRow
}
}
response.Body.NumRows, response.Body.Vectors = readRows(liveBatch.Columns, request.StartRow, request.NumRows)
}
schema.RLock()
for columnID, column := range schema.Schema.Columns {
if !column.Deleted && column.IsEnumBasedColumn() && columnID < len(response.Body.Vectors) {
vector := &response.Body.Vectors[columnID]
var enumCases []string
if handler.enumReader != nil {
// 1. use centralized enum reader
enumCases, err = handler.enumReader.GetEnumCases(handler.namespace, schema.Schema.Name, column.Name)
if err != nil {
return
}
} else {
// 2. use local in memory enum dict
enumCases = schema.EnumDicts[column.Name].ReverseDict
}
err = translateEnums(column.IsEnumArrayColumn(), vector, enumCases)
}
}
schema.RUnlock()
}