func()

in api/debug_handler.go [141:254]


func (handler *DebugHandler) ShowBatch(w *utils.ResponseWriter, r *http.Request) {

	var request ShowBatchRequest
	var response ShowBatchResponse
	var err error

	err = common.ReadRequest(r, &request)
	if err != nil {
		w.WriteErrorWithCode(http.StatusBadRequest, err)
		return
	}

	if request.NumRows <= 0 || request.NumRows > 100 {
		request.NumRows = 100
	}

	if request.StartRow < 0 {
		request.StartRow = 0
	}

	response.Body.StartRow = request.StartRow

	schema, err := handler.memStore.GetSchema(request.TableName)
	if err != nil {
		w.WriteError(ErrTableDoesNotExist)
		return
	}

	schema.RLock()
	response.Body.Vectors = make([]memCom.SlicedVector, 0, len(schema.Schema.Columns))
	response.Body.Columns = make([]string, 0, len(schema.Schema.Columns))
	for columnID, column := range schema.Schema.Columns {
		response.Body.Columns = append(response.Body.Columns, column.Name)
		response.Body.Types = append(response.Body.Types, column.Type)
		if column.Deleted {
			response.Body.Deleted = append(response.Body.Deleted, columnID)
		}
	}
	schema.RUnlock()

	var shard *memstore.TableShard
	shard, err = handler.memStore.GetTableShard(request.TableName, request.ShardID)
	if err != nil {
		w.WriteErrorWithCode(http.StatusBadRequest, err)
		return
	}

	defer func() {
		shard.Users.Done()
		if err != nil {
			w.WriteError(err)
		} else {
			w.WriteObject(response.Body)
		}
	}()

	// request archiveBatch
	if request.BatchID >= 0 {
		shard.ArchiveStore.RLock()
		currentVersion := shard.ArchiveStore.CurrentVersion
		currentVersion.Users.Add(1)
		shard.ArchiveStore.RUnlock()
		defer currentVersion.Users.Done()

		archiveBatch := currentVersion.GetBatchForRead(request.BatchID)
		if archiveBatch == nil {
			err = ErrBatchDoesNotExist
			return
		}
		defer archiveBatch.RUnlock()
		// holding archive batch lock will prevent any loading and eviction.
		response.Body.NumRows, response.Body.Vectors = readRows(archiveBatch.Columns, request.StartRow, request.NumRows)
	} else {
		// request liveBatch
		batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
		liveBatch := shard.LiveStore.GetBatchForRead(int32(request.BatchID))
		if liveBatch == nil {
			err = ErrBatchDoesNotExist
			return
		}
		defer liveBatch.RUnlock()

		if batchIDs[len(batchIDs)-1] == int32(request.BatchID) {
			if request.StartRow >= numRecordsInLastBatch {
				return
			}

			if request.NumRows > numRecordsInLastBatch-request.StartRow {
				request.NumRows = numRecordsInLastBatch - request.StartRow
			}
		}
		response.Body.NumRows, response.Body.Vectors = readRows(liveBatch.Columns, request.StartRow, request.NumRows)
	}

	schema.RLock()
	for columnID, column := range schema.Schema.Columns {
		if !column.Deleted && column.IsEnumBasedColumn() && columnID < len(response.Body.Vectors) {
			vector := &response.Body.Vectors[columnID]
			var enumCases []string
			if handler.enumReader != nil {
				// 1. use centralized enum reader
				enumCases, err = handler.enumReader.GetEnumCases(handler.namespace, schema.Schema.Name, column.Name)
				if err != nil {
					return
				}
			} else {
				// 2. use local in memory enum dict
				enumCases = schema.EnumDicts[column.Name].ReverseDict
			}
			err = translateEnums(column.IsEnumArrayColumn(), vector, enumCases)
		}
	}
	schema.RUnlock()
}