memstore/table_shard.go (111 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package memstore import ( "github.com/uber/aresdb/datanode/bootstrap" "github.com/uber/aresdb/diskstore" "github.com/uber/aresdb/memstore/common" metaCom "github.com/uber/aresdb/metastore/common" "github.com/uber/aresdb/utils" "sync" ) // TableShard stores the data for one table shard in memory. type TableShard struct { // Wait group used to prevent the stores from being prematurely deleted. Users sync.WaitGroup `json:"-"` ShardID int `json:"-"` // For convenience, reference to the table schema struct. Schema *common.TableSchema `json:"schema"` // For convenience. metaStore metaCom.MetaStore diskStore diskstore.DiskStore options Options // Live store. Its locks also cover the primary key. LiveStore *LiveStore `json:"liveStore"` // Archive store. ArchiveStore *ArchiveStore `json:"archiveStore"` // The special column deletion lock, // see https://docs.google.com/spreadsheets/d/1QI3s1_4wgP3Cy-IGoKFCx9BcN23FzIfZGRSNC8I-1Sk/edit#gid=0 columnDeletion sync.Mutex // For convenience. HostMemoryManager common.HostMemoryManager `json:"-"` // bootstrapLock protects BootstrapState bootstrapLock sync.RWMutex BootstrapState bootstrap.BootstrapState `json:"BootstrapState"` // BootstrapDetails shows the details of bootstrap BootstrapDetails bootstrap.BootstrapDetails `json:"bootstrapDetails,omitempty"` // needPeerCopy mark whether the table shard need to copy data from peer // before own disk data is available for serve // default to 0 (no need for peer copy) needPeerCopy uint32 } // NewTableShard creates and initiates a table shard based on the schema. func NewTableShard(schema *common.TableSchema, metaStore metaCom.MetaStore, diskStore diskstore.DiskStore, hostMemoryManager common.HostMemoryManager, shard int, totalShardsInCluster int, options Options, ) *TableShard { tableShard := &TableShard{ ShardID: shard, Schema: schema, diskStore: diskStore, metaStore: metaStore, HostMemoryManager: hostMemoryManager, options: options, BootstrapDetails: bootstrap.NewBootstrapDetails(), } archiveStore := NewArchiveStore(tableShard) tableShard.ArchiveStore = archiveStore tableShard.LiveStore = NewLiveStore(schema.Schema.Config.BatchSize, totalShardsInCluster, tableShard) return tableShard } // Destruct destructs the table shard. // Caller must detach the shard from memstore first. func (shard *TableShard) Destruct() { // TODO: if this blocks on archiving for too long, figure out a way to cancel it. shard.Users.Wait() shard.options.redoLogMaster.Close(shard.Schema.Schema.Name, shard.ShardID) shard.LiveStore.Destruct() if shard.Schema.Schema.IsFactTable { shard.ArchiveStore.Destruct() } } // DeleteColumn deletes the data for the specified column. func (shard *TableShard) DeleteColumn(columnID int) error { shard.columnDeletion.Lock() defer shard.columnDeletion.Unlock() // Delete from live store shard.LiveStore.WriterLock.Lock() batchIDs, _ := shard.LiveStore.GetBatchIDs() for _, batchID := range batchIDs { batch := shard.LiveStore.GetBatchForWrite(batchID) if batch == nil { continue } if columnID < len(batch.Columns) { vp := batch.Columns[columnID] if vp != nil { bytes := vp.GetBytes() batch.Columns[columnID] = nil vp.SafeDestruct() shard.HostMemoryManager.ReportUnmanagedSpaceUsageChange(int64(-bytes)) } } batch.Unlock() } shard.LiveStore.WriterLock.Unlock() if !shard.Schema.Schema.IsFactTable { return nil } // Delete from disk store // Schema cannot be changed while this function is called. // Only delete unsorted columns from disk. if utils.IndexOfInt(shard.Schema.Schema.ArchivingSortColumns, columnID) < 0 { err := shard.diskStore.DeleteColumn(shard.Schema.Schema.Name, columnID, shard.ShardID) if err != nil { return err } } // Delete from archive store currentVersion := shard.ArchiveStore.GetCurrentVersion() defer currentVersion.Users.Done() var batches []*ArchiveBatch currentVersion.RLock() for _, batch := range currentVersion.Batches { batches = append(batches, batch) } currentVersion.RUnlock() for _, batch := range batches { batch.BlockingDelete(columnID) } return nil } // PreloadColumn loads the column into memory and wait for completion of loading // within (startDay, endDay]. Note endDay is inclusive but startDay is exclusive. func (shard *TableShard) PreloadColumn(columnID int, startDay int, endDay int) { archiveStoreVersion := shard.ArchiveStore.GetCurrentVersion() for batchID := endDay; batchID > startDay; batchID-- { batch := archiveStoreVersion.RequestBatch(int32(batchID)) // Only do loading if this batch does not have any data yet. if batch.Size > 0 { vp := batch.RequestVectorParty(columnID) vp.WaitForDiskLoad() vp.Release() } } archiveStoreVersion.Users.Done() }