memstore/common/vector_party_serializer.go (138 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"github.com/uber/aresdb/diskstore"
"github.com/uber/aresdb/utils"
"os"
)
// VectorPartyHeader is the magic header written into the beginning of each vector party file.
const VectorPartyHeader uint32 = 0xFADEFACE
// VectorPartyBaseSerializer is the base class contains basic data to read/write VectorParty
type vectorPartyBaseSerializer struct {
shard, columnID, batchID int
batchVersion uint32
seqNum uint32
table string
diskstore diskstore.DiskStore
hostMemoryManager HostMemoryManager
}
// CheckVectorPartySerializable check if the archive VectorParty is serializable
func (s *vectorPartyBaseSerializer) CheckVectorPartySerializable(vp VectorParty) error {
if cvp, ok := vp.(CVectorParty); ok {
passed := true
switch cvp.GetMode() {
case AllValuesDefault:
passed = vp.GetNonDefaultValueCount() == 0
default:
passed = vp.GetNonDefaultValueCount() > 0
}
if !passed {
return utils.StackError(nil,
"NonDefaultValueCount %d is not valid for mode %d vector with length %d",
vp.GetNonDefaultValueCount(), cvp.GetMode(), vp.GetLength())
}
}
return nil
}
// VectorPartyArchiveSerializer is the class to read/write archive VectorParty
type vectorPartyArchiveSerializer struct {
vectorPartyBaseSerializer
}
// VectorPartyArchiveSerializer is the class to read/write snapshot VectorParty
type vectorPartySnapshotSerializer struct {
vectorPartyBaseSerializer
redoLogFile int64
offset uint32
}
// NewVectorPartyArchiveSerializer returns a new VectorPartySerializer
func NewVectorPartyArchiveSerializer(hostMemManager HostMemoryManager, diskStore diskstore.DiskStore, table string, shardID int,
columnID int, batchID int, batchVersion uint32, seqNum uint32) VectorPartySerializer {
return &vectorPartyArchiveSerializer{
vectorPartyBaseSerializer{
table: table,
shard: shardID,
columnID: columnID,
batchID: batchID,
batchVersion: batchVersion,
seqNum: seqNum,
diskstore: diskStore,
hostMemoryManager: hostMemManager,
},
}
}
// NewVectorPartySnapshotSerializer returns a new VectorPartySerializer
func NewVectorPartySnapshotSerializer(hostMemeManager HostMemoryManager, diskStore diskstore.DiskStore, table string, shardID int,
columnID, batchID int, batchVersion uint32, seqNum uint32, redoLogFile int64, offset uint32) VectorPartySerializer {
return &vectorPartySnapshotSerializer{
vectorPartyBaseSerializer{
table: table,
shard: shardID,
columnID: columnID,
batchID: batchID,
batchVersion: batchVersion,
seqNum: seqNum,
diskstore: diskStore,
hostMemoryManager: hostMemeManager,
},
redoLogFile,
offset,
}
}
// ReadVectorParty reads vector party from disk and set fields in passed-in vp.
func (s *vectorPartyArchiveSerializer) ReadVectorParty(vp VectorParty) error {
if vp == nil {
return nil
}
readCloser, err := s.diskstore.OpenVectorPartyFileForRead(s.table, s.columnID, s.shard,
s.batchID, s.batchVersion, s.seqNum)
if err != nil {
if err == os.ErrNotExist {
return nil
}
return err
}
defer readCloser.Close()
return vp.Read(readCloser, s)
}
// WriteVectorParty writes vector party to disk
func (s *vectorPartyArchiveSerializer) WriteVectorParty(vp VectorParty) error {
if vp == nil {
return nil
}
writer, err := s.diskstore.OpenVectorPartyFileForWrite(s.table, s.columnID, s.shard,
s.batchID, s.batchVersion, s.seqNum)
if err != nil {
return err
}
defer writer.Close()
if err = vp.Write(writer); err != nil {
return err
}
return writer.Sync()
}
// ReportVectorPartyMemoryUsage report memory usage according to underneath VectorParty property
func (s *vectorPartyArchiveSerializer) ReportVectorPartyMemoryUsage(bytes int64) {
s.hostMemoryManager.ReportManagedObject(
s.table, s.shard, s.batchID, s.columnID, bytes)
}
// WriteVectorParty writes snapshot vector party to disk
func (s *vectorPartySnapshotSerializer) WriteVectorParty(vp VectorParty) error {
if vp == nil {
return nil
}
writer, err := s.diskstore.OpenSnapshotVectorPartyFileForWrite(s.table, s.shard, s.redoLogFile, s.offset, s.batchID, s.columnID)
if err != nil {
return err
}
defer writer.Close()
err = vp.Write(writer)
if err != nil {
return err
}
return writer.Sync()
}
// ReadVectorParty reads snapshot vector party from disk
func (s *vectorPartySnapshotSerializer) ReadVectorParty(vp VectorParty) error {
if vp == nil {
return nil
}
readCloser, err := s.diskstore.OpenSnapshotVectorPartyFileForRead(s.table, s.shard, s.redoLogFile, s.offset, s.batchID, s.columnID)
if err != nil {
return err
}
defer readCloser.Close()
return vp.Read(readCloser, s)
}
// CheckVectorPartySerializable check if the snapshot VectorParty is serializable, which is always true for now
func (s *vectorPartySnapshotSerializer) CheckVectorPartySerializable(vp VectorParty) error {
return nil
}
// ReportVectorPartyMemoryUsage report memory usage according to underneath VectorParty property
func (s *vectorPartySnapshotSerializer) ReportVectorPartyMemoryUsage(bytes int64) {
s.hostMemoryManager.ReportUnmanagedSpaceUsageChange(bytes)
}