sources/dynamodb/streaming_info.go (89 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package dynamodb import ( "fmt" "sync" sp "cloud.google.com/go/spanner" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" ) // StreamingInfo contains information related to processing of DynamoDB Streams. type StreamingInfo struct { Records map[string]map[string]int64 // Tablewise count of records received from DynamoDB Streams, broken down by record type i.e. INSERT, MODIFY & REMOVE. BadRecords map[string]map[string]int64 // Tablewise count of records not converted successfully, broken down by record type. DroppedRecords map[string]map[string]int64 // Tablewise count of records successfully converted but failed to written on Spanner, broken down by record type. recordsProcessed int64 // Count of total records processed to Cloud Spanner(includes records which generated error as well). ShardProcessed map[string]bool // Processing status of a shard, (default false i.e. unprocessed). UserExit bool // Flag confirming if customer wants to exit or not, (false until user presses Ctrl+C). Unexpecteds map[string]int64 // Count of unexpected conditions, broken down by condition description. write func(m *sp.Mutation) error // Writes a given mutation to Cloud Spanner. SampleBadRecords []string // Records that generated errors during conversion. SampleBadWrites []string // Records that faced errors while writing to Cloud Spanner. lock sync.Mutex } func MakeStreamingInfo() *StreamingInfo { return &StreamingInfo{ Records: make(map[string]map[string]int64), BadRecords: make(map[string]map[string]int64), DroppedRecords: make(map[string]map[string]int64), recordsProcessed: int64(0), ShardProcessed: make(map[string]bool), Unexpecteds: make(map[string]int64), UserExit: false, lock: sync.Mutex{}, } } // makeRecordMaps initializes maps used to stores record count for // a given table. func (info *StreamingInfo) makeRecordMaps(srcTable string) { info.Records[srcTable] = make(map[string]int64) info.BadRecords[srcTable] = make(map[string]int64) info.DroppedRecords[srcTable] = make(map[string]int64) } // SetShardStatus changes the processing status of a shard. // // true -> shard processed and vice versa. func (info *StreamingInfo) SetShardStatus(shardId string, status bool) { info.lock.Lock() info.ShardProcessed[shardId] = status info.lock.Unlock() } // StatsAddRecord increases the count of records read from DynamoDB Streams // based on the table name and record type. func (info *StreamingInfo) StatsAddRecord(srcTable, recordType string) { info.lock.Lock() info.Records[srcTable][recordType]++ info.lock.Unlock() } // StatsAddBadRecord increases the count of records which are not successfully converted to // Cloud Spanner supported data types based on the table name and record type. func (info *StreamingInfo) StatsAddBadRecord(srcTable, recordType string) { info.lock.Lock() info.BadRecords[srcTable][recordType]++ info.lock.Unlock() } // StatsAddDroppedRecord increases the count of records which failed while writing to Cloud Spanner // based on the table name and record type. func (info *StreamingInfo) StatsAddDroppedRecord(srcTable, recordType string) { info.lock.Lock() info.DroppedRecords[srcTable][recordType]++ info.lock.Unlock() } // StatsAddRecordProcessed increases the count of total records processed to Cloud Spanner. func (info *StreamingInfo) StatsAddRecordProcessed() { info.lock.Lock() info.recordsProcessed++ info.lock.Unlock() } // Unexpected records stats about corner-cases and conditions // that were not expected. func (info *StreamingInfo) Unexpected(u string) { info.lock.Lock() internal.VerbosePrintf("Unexpected condition: %s\n", u) // Limit size of unexpected map. If over limit, then only // update existing entries. if _, ok := info.Unexpecteds[u]; ok || len(info.Unexpecteds) < 1000 { info.Unexpecteds[u]++ } info.lock.Unlock() } // TotalUnexpecteds returns the total number of distinct unexpected conditions // encountered during processing of DynamoDB Streams. func (info *StreamingInfo) TotalUnexpecteds() int64 { return int64(len(info.Unexpecteds)) } // CollectBadRecord collects a record if record is not successfully converted to Cloud Spanner // supported data types. func (info *StreamingInfo) CollectBadRecord(recordType, srcTable string, srcCols []string, vals []string) { info.lock.Lock() badRecord := fmt.Sprintf("type=%s table=%s cols=%v data=%v", recordType, srcTable, srcCols, vals) // Cap storage used by sampleBadRecords. Keep at least one bad record and at max 100. if len(info.SampleBadRecords) < 100 { info.SampleBadRecords = append(info.SampleBadRecords, badRecord) } info.lock.Unlock() } // CollectDroppedRecord collects a record if record faces an error while writing to Cloud Spanner. func (info *StreamingInfo) CollectDroppedRecord(recordType, spTable string, spCols []string, spVals []interface{}, err error) { info.lock.Lock() droppedRecord := fmt.Sprintf("type=%s table=%s cols=%v data=%v error=%v", recordType, spTable, spCols, spVals, err) // Cap storage used by sampleBadWrites. Keep at least one dropped record and at max 100. if len(info.SampleBadWrites) < 100 { info.SampleBadWrites = append(info.SampleBadWrites, droppedRecord) } info.lock.Unlock() }