func DeaggregateRecords()

in go/deaggregator/deaggregator.go [25:79]


func DeaggregateRecords(records []*kinesis.Record) ([]*kinesis.Record, error) {
	var isAggregated bool
	allRecords := make([]*kinesis.Record, 0)
	for _, record := range records {
		isAggregated = true

		var dataMagic string
		var decodedDataNoMagic []byte
		// Check if record is long enough to have magic file header
		if len(record.Data) >= KplMagicLen {
			dataMagic = fmt.Sprintf("%q", record.Data[:KplMagicLen])
			decodedDataNoMagic = record.Data[KplMagicLen:]
		} else {
			isAggregated = false
		}

		// Check if record has KPL Aggregate Record Magic Header and data length
		// is correct size
		if KplMagicHeader != dataMagic || len(decodedDataNoMagic) <= DigestSize {
			isAggregated = false
		}

		if isAggregated {
			messageDigest := fmt.Sprintf("%x", decodedDataNoMagic[len(decodedDataNoMagic)-DigestSize:])
			messageData := decodedDataNoMagic[:len(decodedDataNoMagic)-DigestSize]

			calculatedDigest := fmt.Sprintf("%x", md5.Sum(messageData))

			// Check protobuf MD5 hash matches MD5 sum of record
			if messageDigest != calculatedDigest {
				isAggregated = false
			} else {
				aggRecord := &rec.AggregatedRecord{}
				err := proto.Unmarshal(messageData, aggRecord)

				if err != nil {
					return nil, err
				}

				partitionKeys := aggRecord.PartitionKeyTable

				for _, aggrec := range aggRecord.Records {
					newRecord := createUserRecord(partitionKeys, aggrec, record)
					allRecords = append(allRecords, newRecord)
				}
			}
		}

		if !isAggregated {
			allRecords = append(allRecords, record)
		}
	}

	return allRecords, nil
}