api/debug_handler.go (629 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"sort"
"strconv"
"strings"
"github.com/uber/aresdb/cluster/topology"
"github.com/gorilla/mux"
"github.com/uber/aresdb/api/common"
mutatorCom "github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/memstore"
memCom "github.com/uber/aresdb/memstore/common"
metaCom "github.com/uber/aresdb/metastore/common"
"github.com/uber/aresdb/utils"
)
// DebugHandler handles debug operations.
type DebugHandler struct {
namespace string
shardOwner topology.ShardOwner
enumReader mutatorCom.EnumReader
memStore memstore.MemStore
// For getting cutoff of a shard.
metaStore metaCom.MetaStore
queryHandler *QueryHandler
healthCheckHandler *HealthCheckHandler
bootstrapRetryChan chan bool
}
// NewDebugHandler returns a new DebugHandler.
func NewDebugHandler(
namespace string,
memStore memstore.MemStore,
metaStore metaCom.MetaStore,
queryHandler *QueryHandler,
healthCheckHandler *HealthCheckHandler,
shardOwner topology.ShardOwner,
enumReader mutatorCom.EnumReader,
) *DebugHandler {
return &DebugHandler{
namespace: namespace,
enumReader: enumReader,
shardOwner: shardOwner,
memStore: memStore,
metaStore: metaStore,
queryHandler: queryHandler,
healthCheckHandler: healthCheckHandler,
bootstrapRetryChan: make(chan bool),
}
}
// Register registers http handlers.
func (handler *DebugHandler) Register(router *mux.Router) {
router.HandleFunc("/health", utils.ApplyHTTPWrappers(handler.Health)).Methods(http.MethodGet)
router.HandleFunc("/health/{onOrOff}", utils.ApplyHTTPWrappers(handler.HealthSwitch)).Methods(http.MethodPost)
router.HandleFunc("/jobs/{jobType}", utils.ApplyHTTPWrappers(handler.ShowJobStatus)).Methods(http.MethodGet)
router.HandleFunc("/devices", utils.ApplyHTTPWrappers(handler.ShowDeviceStatus)).Methods(http.MethodGet)
router.HandleFunc("/host-memory", utils.ApplyHTTPWrappers(handler.ShowHostMemory)).Methods(http.MethodGet)
router.HandleFunc("/shards", utils.ApplyHTTPWrappers(handler.ShowShardSet)).Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}", utils.ApplyHTTPWrappers(handler.ShowShardMeta)).Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/archive", utils.ApplyHTTPWrappers(handler.Archive)).Methods(http.MethodPost)
router.HandleFunc("/{table}/{shard}/backfill", utils.ApplyHTTPWrappers(handler.Backfill)).Methods(http.MethodPost)
router.HandleFunc("/{table}/{shard}/snapshot", utils.ApplyHTTPWrappers(handler.Snapshot)).Methods(http.MethodPost)
router.HandleFunc("/{table}/{shard}/purge", utils.ApplyHTTPWrappers(handler.Purge)).Methods(http.MethodPost)
router.HandleFunc("/{table}/{shard}/batches/{batch}", utils.ApplyHTTPWrappers(handler.ShowBatch)).Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/batches/{batch}/vector-parties/{column}", utils.ApplyHTTPWrappers(handler.LoadVectorParty)).Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/batches/{batch}/vector-parties/{column}", utils.ApplyHTTPWrappers(handler.EvictVectorParty)).Methods(http.MethodDelete)
router.HandleFunc("/{table}/{shard}/primary-keys", utils.ApplyHTTPWrappers(handler.LookupPrimaryKey)).Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/redologs", utils.ApplyHTTPWrappers(handler.ListRedoLogs)).
Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/redologs/{creationTime}/upsertbatches", utils.ApplyHTTPWrappers(handler.ListUpsertBatches)).
Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/redologs/{creationTime}/upsertbatches/{offset}", utils.ApplyHTTPWrappers(handler.ReadUpsertBatch)).
Methods(http.MethodGet)
router.HandleFunc("/{table}/{shard}/backfill-manager/upsertbatches/{offset}", utils.ApplyHTTPWrappers(handler.ReadBackfillQueueUpsertBatch)).Methods(http.MethodGet)
router.HandleFunc("/bootstrap/retry", utils.ApplyHTTPWrappers(handler.BootstrapRetry)).Methods(http.MethodPost)
}
// ShowShardSet shows the shard set owned by the server
func (handler *DebugHandler) ShowShardSet(w *utils.ResponseWriter, r *http.Request) {
shards := handler.shardOwner.GetOwnedShards()
sort.Ints(shards)
w.WriteObject(shards)
}
// Health returns whether the health check is on or off
func (handler *DebugHandler) Health(w *utils.ResponseWriter, r *http.Request) {
handler.healthCheckHandler.RLock()
disabled := handler.healthCheckHandler.disable
handler.healthCheckHandler.RUnlock()
status := "on"
if disabled {
status = "off"
}
w.Write([]byte(status))
}
// HealthSwitch will turn on health check based on the request.
func (handler *DebugHandler) HealthSwitch(w *utils.ResponseWriter, r *http.Request) {
var request HealthSwitchRequest
var err error
if err = common.ReadRequest(r, &request); err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if request.OnOrOff != "on" && request.OnOrOff != "off" {
w.WriteErrorWithCode(http.StatusBadRequest, errors.New("must specify on or off in the url"))
return
}
handler.healthCheckHandler.Lock()
handler.healthCheckHandler.disable = request.OnOrOff == "off"
handler.healthCheckHandler.Unlock()
w.Write([]byte("OK"))
}
// ShowBatch will only show batches that is present in memory, it will not request batch
// from DiskStore.
func (handler *DebugHandler) ShowBatch(w *utils.ResponseWriter, r *http.Request) {
var request ShowBatchRequest
var response ShowBatchResponse
var err error
err = common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if request.NumRows <= 0 || request.NumRows > 100 {
request.NumRows = 100
}
if request.StartRow < 0 {
request.StartRow = 0
}
response.Body.StartRow = request.StartRow
schema, err := handler.memStore.GetSchema(request.TableName)
if err != nil {
w.WriteError(ErrTableDoesNotExist)
return
}
schema.RLock()
response.Body.Vectors = make([]memCom.SlicedVector, 0, len(schema.Schema.Columns))
response.Body.Columns = make([]string, 0, len(schema.Schema.Columns))
for columnID, column := range schema.Schema.Columns {
response.Body.Columns = append(response.Body.Columns, column.Name)
response.Body.Types = append(response.Body.Types, column.Type)
if column.Deleted {
response.Body.Deleted = append(response.Body.Deleted, columnID)
}
}
schema.RUnlock()
var shard *memstore.TableShard
shard, err = handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer func() {
shard.Users.Done()
if err != nil {
w.WriteError(err)
} else {
w.WriteObject(response.Body)
}
}()
// request archiveBatch
if request.BatchID >= 0 {
shard.ArchiveStore.RLock()
currentVersion := shard.ArchiveStore.CurrentVersion
currentVersion.Users.Add(1)
shard.ArchiveStore.RUnlock()
defer currentVersion.Users.Done()
archiveBatch := currentVersion.GetBatchForRead(request.BatchID)
if archiveBatch == nil {
err = ErrBatchDoesNotExist
return
}
defer archiveBatch.RUnlock()
// holding archive batch lock will prevent any loading and eviction.
response.Body.NumRows, response.Body.Vectors = readRows(archiveBatch.Columns, request.StartRow, request.NumRows)
} else {
// request liveBatch
batchIDs, numRecordsInLastBatch := shard.LiveStore.GetBatchIDs()
liveBatch := shard.LiveStore.GetBatchForRead(int32(request.BatchID))
if liveBatch == nil {
err = ErrBatchDoesNotExist
return
}
defer liveBatch.RUnlock()
if batchIDs[len(batchIDs)-1] == int32(request.BatchID) {
if request.StartRow >= numRecordsInLastBatch {
return
}
if request.NumRows > numRecordsInLastBatch-request.StartRow {
request.NumRows = numRecordsInLastBatch - request.StartRow
}
}
response.Body.NumRows, response.Body.Vectors = readRows(liveBatch.Columns, request.StartRow, request.NumRows)
}
schema.RLock()
for columnID, column := range schema.Schema.Columns {
if !column.Deleted && column.IsEnumBasedColumn() && columnID < len(response.Body.Vectors) {
vector := &response.Body.Vectors[columnID]
var enumCases []string
if handler.enumReader != nil {
// 1. use centralized enum reader
enumCases, err = handler.enumReader.GetEnumCases(handler.namespace, schema.Schema.Name, column.Name)
if err != nil {
return
}
} else {
// 2. use local in memory enum dict
enumCases = schema.EnumDicts[column.Name].ReverseDict
}
err = translateEnums(column.IsEnumArrayColumn(), vector, enumCases)
}
}
schema.RUnlock()
}
func readRows(vps []memCom.VectorParty, startRow, numRows int) (n int, vectors []memCom.SlicedVector) {
vectors = make([]memCom.SlicedVector, len(vps))
for columnID, vp := range vps {
if vp != nil {
vectors[columnID] = vp.Slice(startRow, numRows)
if len(vectors[columnID].Counts) > 0 {
n = vectors[columnID].Counts[len(vectors[columnID].Counts)-1]
}
} else {
vectors[columnID] = memCom.SlicedVector{
Values: []interface{}{nil},
Counts: []int{numRows},
}
}
}
return n, vectors
}
func translateEnums(isEnumArray bool, vector *memCom.SlicedVector, enumCases []string) error {
if isEnumArray {
return tranlateEnumsArray(vector, enumCases)
}
for index, value := range vector.Values {
if value != nil {
var id int
switch v := value.(type) {
case uint8:
id = int(v)
case uint16:
id = int(v)
default:
// this should never happen
return utils.StackError(nil, "Wrong data type for enum vector, %T", value)
}
// it is possible when enum change has not arrived in memory yet,
// display raw enum value in such case
vector.Values[index] = value
if id < len(enumCases) {
vector.Values[index] = enumCases[id]
}
}
}
return nil
}
func tranlateEnumsArray(vector *memCom.SlicedVector, enumCases []string) error {
for index, value := range vector.Values {
if value != nil {
ids := make([]interface{}, 0)
err := json.Unmarshal(([]byte)(value.(string)), &ids)
if err == nil {
for i, v := range ids {
if v != nil {
var id int
switch v := v.(type) {
// unmarshal will turn number to float64
case float64:
id = int(v)
default:
// this should never happen
return utils.StackError(nil, "Wrong data type for enum vector, %T", value)
}
if id < len(enumCases) {
ids[i] = enumCases[id]
}
}
}
newValue, err := json.Marshal(ids)
if err == nil {
vector.Values[index] = string(newValue)
}
}
}
}
return nil
}
// LookupPrimaryKey looks up a key in primary key for given table and shard
func (handler *DebugHandler) LookupPrimaryKey(w *utils.ResponseWriter, r *http.Request) {
var request LookupPrimaryKeyRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteError(err)
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
keyStrs := strings.Split(request.Key, ",")
var found bool
var recordID memCom.RecordID
recordID, found = shard.LiveStore.LookupKey(keyStrs)
if !found {
w.WriteErrorWithCode(http.StatusNotFound, fmt.Errorf("key '%s' does not exist or expired", request.Key))
return
}
w.WriteObject(recordID)
}
// Archive starts an archiving process on demand.
func (handler *DebugHandler) Archive(w *utils.ResponseWriter, r *http.Request) {
var request ArchiveRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// Just check table and shard existence.
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard.Users.Done()
scheduler := handler.memStore.GetScheduler()
err, errChan := scheduler.SubmitJob(
scheduler.NewArchivingJob(request.TableName, request.ShardID, request.Body.Cutoff))
if err == nil {
go func() {
<-errChan
}()
w.Write([]byte("Archiving job submitted"))
} else {
w.WriteErrorWithCode(http.StatusMethodNotAllowed, err)
}
}
// Backfill starts an backfill process on demand.
func (handler *DebugHandler) Backfill(w *utils.ResponseWriter, r *http.Request) {
var request BackfillRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// Just check table and shard existence.
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard.Users.Done()
scheduler := handler.memStore.GetScheduler()
err, errChan := scheduler.SubmitJob(
scheduler.NewBackfillJob(request.TableName, request.ShardID))
if err == nil {
go func() {
<-errChan
}()
w.Write([]byte("Backfill job submitted"))
}
}
// Snapshot starts an snapshot process on demand.
func (handler *DebugHandler) Snapshot(w *utils.ResponseWriter, r *http.Request) {
var request SnapshotRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// Just check table and shard existence.
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
scheduler := handler.memStore.GetScheduler()
err, errChan := scheduler.SubmitJob(
scheduler.NewSnapshotJob(request.TableName, request.ShardID))
if err == nil {
go func() {
<-errChan
}()
w.Write([]byte("Snapshot job submitted"))
}
}
// Purge starts an purge process on demand.
func (handler *DebugHandler) Purge(w *utils.ResponseWriter, r *http.Request) {
var request PurgeRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if !request.Body.SafePurge && (request.Body.BatchIDStart < 0 || request.Body.BatchIDEnd < 0 || request.Body.BatchIDStart > request.Body.BatchIDEnd) {
w.WriteErrorWithCode(http.StatusBadRequest, fmt.Errorf("invalid batch range, expects both to be > 0, got [%d, %d)",
request.Body.BatchIDStart, request.Body.BatchIDEnd))
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
if request.Body.SafePurge {
retentionDays := shard.Schema.Schema.Config.RecordRetentionInDays
if retentionDays > 0 {
nowInDay := int(utils.Now().Unix() / 86400)
request.Body.BatchIDStart = 0
request.Body.BatchIDEnd = nowInDay - retentionDays
} else {
w.WriteErrorWithCode(http.StatusBadRequest, errors.New("safe purge attempted on table with infinite retention"))
return
}
}
scheduler := handler.memStore.GetScheduler()
err, errChan := scheduler.SubmitJob(
scheduler.NewPurgeJob(request.TableName, request.ShardID, request.Body.BatchIDStart, request.Body.BatchIDEnd))
if err == nil {
go func() {
<-errChan
}()
w.Write([]byte("Purge job submitted"))
}
}
// ShowShardMeta shows the metadata for a table shard. It won't show the underlying data.
func (handler *DebugHandler) ShowShardMeta(w *utils.ResponseWriter, r *http.Request) {
var request ShowShardMetaRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
w.WriteObject(shard)
}
// ListRedoLogs lists all the redo log files for a given shard.
func (handler *DebugHandler) ListRedoLogs(w *utils.ResponseWriter, r *http.Request) {
var request ListRedoLogsRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
redoLogFiles, err := shard.NewRedoLogBrowser().ListLogFiles()
if err != nil {
w.WriteError(err)
return
}
response := make(ListRedoLogsResponse, len(redoLogFiles))
for i, redoLogFile := range redoLogFiles {
response[i] = strconv.FormatInt(redoLogFile, 10)
}
w.WriteObject(response)
}
// ListUpsertBatches returns offsets of upsert batches in the redo log file.
func (handler *DebugHandler) ListUpsertBatches(w *utils.ResponseWriter, r *http.Request) {
var request ListUpsertBatchesRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
offsets, err := shard.NewRedoLogBrowser().ListUpsertBatch(request.CreationTime)
if err != nil {
w.WriteError(err)
w.ResponseWriter.Header().Write(os.Stdout)
return
}
w.WriteObject(&offsets)
return
}
// ReadUpsertBatch shows the records of an upsert batch given a redolog file creation time and
// upsert batch index within the file.
func (handler *DebugHandler) ReadUpsertBatch(w *utils.ResponseWriter, r *http.Request) {
var request ReadRedologUpsertBatchRequest
if err := common.ReadRequest(r, &request); err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
rows, columnNames, numTotalRows, err := shard.NewRedoLogBrowser().ReadData(request.CreationTime,
request.Offset, request.Start, request.Length)
if err != nil {
w.WriteError(err)
return
}
response := ReadUpsertBatchResponse{
Draw: request.Draw,
Data: rows,
ColumnNames: columnNames,
RecordsTotal: numTotalRows,
RecordsFiltered: numTotalRows,
}
w.WriteObject(&response)
return
}
// LoadVectorParty requests a vector party from disk if it is not already in memory
func (handler *DebugHandler) LoadVectorParty(w *utils.ResponseWriter, r *http.Request) {
var request LoadVectorPartyRequest
if err := common.ReadRequest(r, &request); err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if request.BatchID < 0 {
w.WriteErrorWithCode(http.StatusBadRequest, errors.New("live batch vector party cannot be loaded"))
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
schema := shard.Schema
isDeleted := false
shard.Schema.RLock()
columnID, exist := schema.ColumnIDs[request.ColumnName]
if columnID < len(schema.Schema.Columns) {
isDeleted = schema.Schema.Columns[columnID].Deleted
}
shard.Schema.RUnlock()
if !exist {
w.WriteError(ErrColumnDoesNotExist)
return
}
if isDeleted {
w.WriteError(ErrColumnDeleted)
return
}
version := shard.ArchiveStore.GetCurrentVersion()
defer version.Users.Done()
batch := version.RequestBatch(int32(request.BatchID))
vp := batch.RequestVectorParty(columnID)
if vp != nil {
vp.WaitForDiskLoad()
vp.Release()
}
w.WriteObject(nil)
}
// EvictVectorParty evict a vector party from memory.
func (handler *DebugHandler) EvictVectorParty(w *utils.ResponseWriter, r *http.Request) {
var request EvictVectorPartyRequest
if err := common.ReadRequest(r, &request); err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
if request.BatchID < 0 {
w.WriteErrorWithCode(http.StatusBadRequest, errors.New("live batch vector party cannot be evicted"))
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
defer shard.Users.Done()
shard.Schema.RLock()
columnID, exist := shard.Schema.ColumnIDs[request.ColumnName]
shard.Schema.RUnlock()
if !exist {
w.WriteError(ErrColumnDoesNotExist)
return
}
version := shard.ArchiveStore.GetCurrentVersion()
defer version.Users.Done()
batch := version.RequestBatch(int32(request.BatchID))
// this operation is blocking and needs the user to wait
batch.BlockingDelete(columnID)
w.WriteObject(nil)
}
// ShowJobStatus shows the current archive job status.
func (handler *DebugHandler) ShowJobStatus(w *utils.ResponseWriter, r *http.Request) {
var request ShowJobStatusRequest
if err := common.ReadRequest(r, &request); err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
scheduler := handler.memStore.GetScheduler()
scheduler.RLock()
jsonBuffer, err := json.Marshal(scheduler.GetJobDetails(memCom.JobType(request.JobType)))
scheduler.RUnlock()
w.WriteJSONBytes(jsonBuffer, err)
return
}
// ShowDeviceStatus shows the current scheduler status.
func (handler *DebugHandler) ShowDeviceStatus(w *utils.ResponseWriter, r *http.Request) {
deviceManager := handler.queryHandler.GetDeviceManager()
deviceManager.RLock()
jsonBuffer, err := json.Marshal(*deviceManager)
deviceManager.RUnlock()
w.WriteJSONBytes(jsonBuffer, err)
return
}
// ShowHostMemory shows the current host memory usage
func (handler *DebugHandler) ShowHostMemory(w *utils.ResponseWriter, r *http.Request) {
memoryUsageByTableShard, err := handler.memStore.GetMemoryUsageDetails()
if err != nil {
w.WriteError(err)
return
}
w.WriteObject(memoryUsageByTableShard)
}
// ReadBackfillQueueUpsertBatch reads upsert batch inside backfill manager backfill queue
func (handler *DebugHandler) ReadBackfillQueueUpsertBatch(w *utils.ResponseWriter, r *http.Request) {
var request ReadBackfillQueueUpsertBatchRequest
err := common.ReadRequest(r, &request)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
shard, err := handler.memStore.GetTableShard(request.TableName, request.ShardID)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
data, columnNames, err := shard.LiveStore.BackfillManager.ReadUpsertBatch(int(request.Offset), request.Start, request.Length, shard.Schema)
shard.Users.Done()
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
response := ReadUpsertBatchResponse{
Data: data,
ColumnNames: columnNames,
RecordsFiltered: len(data),
RecordsTotal: len(data),
Draw: request.Draw,
}
w.WriteObject(response)
return
}
// Bootstrap will turn on bootstrap based on the request.
func (handler *DebugHandler) BootstrapRetry(w *utils.ResponseWriter, r *http.Request) {
handler.bootstrapRetryChan <- true
w.Write([]byte("Bootstrap retry submitted"))
}
// GetBootstrapRetryChan returns bootstrapRetryChan
func (handler *DebugHandler) GetBootstrapRetryChan() chan bool {
return handler.bootstrapRetryChan
}
// SetBootstrapRetryChan is used for testing
func (handler *DebugHandler) SetBootstrapRetryChan(bootstrapRetryChan chan bool) {
handler.bootstrapRetryChan = bootstrapRetryChan
}