def add_user_record()

in python/aws_kinesis_agg/aggregator.py [0:0]


    def add_user_record(self, partition_key, data, explicit_hash_key=None):
        """Add a new user record to this existing aggregated record if there is
        enough space (based on the defined Kinesis limits for a PutRecord call).
        
        Args:
            partition_key - The partition key of the new user record to add (bytes)
            explicit_hash_key - The explicit hash key of the new user record to add (bytes)
            data - The raw data of the new user record to add (bytes)
        Returns:
            True if the new user record was successfully added to this
            aggregated record or false if this aggregated record is too full."""

        if isinstance(partition_key, six.string_types):
            partition_key_bytes = partition_key.encode('utf-8')
        else:
            partition_key_bytes = partition_key

        if explicit_hash_key is not None and isinstance(explicit_hash_key, six.string_types):
            explicit_hash_key_bytes = explicit_hash_key.encode('utf-8')
        elif explicit_hash_key is None:
            explicit_hash_key_bytes = None
            # explicit_hash_key_bytes = AggRecord._create_explicit_hash_key(partition_key_bytes)
            # explicit_hash_key = explicit_hash_key_bytes.decode('utf-8')
        else:
            explicit_hash_key_bytes = explicit_hash_key

        if isinstance(data, six.string_types):
            data_bytes = data.encode('utf-8')
        else:
            data_bytes = data

        # Validate new record size won't overflow max size for a PutRecordRequest
        size_of_new_record = self._calculate_record_size(partition_key_bytes, data_bytes, explicit_hash_key_bytes)
        if size_of_new_record > self.max_size:
            raise ValueError('Input record (PK=%s, EHK=%s, SizeBytes=%d) too big to fit inside a single agg record.' %
                             (partition_key, explicit_hash_key, size_of_new_record))
        elif self.get_size_bytes() + size_of_new_record > self.max_size:
            return False
        
        record = self.agg_record.records.add()
        record.data = data_bytes
        
        pk_add_result = self.partition_keys.add_key(partition_key)
        if pk_add_result[0]:
            self.agg_record.partition_key_table.append(partition_key)
        record.partition_key_index = pk_add_result[1]

        if explicit_hash_key:
            ehk_add_result = self.explicit_hash_keys.add_key(explicit_hash_key)
            if ehk_add_result[0]:
                self.agg_record.explicit_hash_key_table.append(explicit_hash_key)
            record.explicit_hash_key_index = ehk_add_result[1]
        
        self._agg_size_bytes += size_of_new_record
        
        # if this is the first record, we use its partition key and hash key for the entire agg record
        if len(self.agg_record.records) == 1:
            self._agg_partition_key = partition_key
            self._agg_explicit_hash_key = explicit_hash_key
            
        return True