tzrec/features/sequence_feature.py (529 lines of code) (raw):

# Copyright (c) 2024, Alibaba Group; # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from typing import Any, Dict, List, Optional, Tuple import numpy as np import pyarrow as pa import pyfg from tzrec.datasets.utils import ( ParsedData, SequenceDenseData, SequenceSparseData, ) from tzrec.features.custom_feature import CustomFeature from tzrec.features.feature import MAX_HASH_BUCKET_SIZE from tzrec.features.id_feature import FgMode, IdFeature from tzrec.features.raw_feature import RawFeature from tzrec.protos import feature_pb2 from tzrec.protos.feature_pb2 import FeatureConfig from tzrec.utils.logging_util import logger def _parse_fg_encoded_sequence_sparse_feature_impl( name: str, feat: pa.Array, sequence_delim: str = ";", multival_sep: str = chr(3), default_value: Optional[List[int]] = None, ) -> SequenceSparseData: """Parse fg encoded sequence sparse feature. Args: name (str): feature name. feat (pa.Array): input feature data. sequence_delim (str): sequence delimiter. multival_sep (str): string separator for multi-val data. default_value (int): default value. Returns: an instance of SequenceSparseFeature. """ if pa.types.is_string(feat.type): # dtype = string is_empty = pa.compute.equal(feat, pa.scalar("")) feat = pa.compute.if_else(is_empty, pa.nulls(len(feat)), feat) if default_value is not None: feat = feat.fill_null(multival_sep.join(map(str, default_value))) list_seq_feat = pa.compute.split_pattern(feat, sequence_delim) list_feat = pa.compute.split_pattern(list_seq_feat.values, multival_sep) seq_offsets = list_seq_feat.offsets.to_numpy() seq_lengths = seq_offsets[1:] - seq_offsets[:-1] feat_values = list_feat.values.cast(pa.int64()).to_numpy() feat_offsets = list_feat.offsets.to_numpy() feat_lengths = feat_offsets[1:] - feat_offsets[:-1] elif pa.types.is_list(feat.type): if pa.types.is_list(feat.type.value_type): # dtype = list<list<int>> or others can cast to list<list<int>> feat = feat.cast(pa.list_(pa.list_(pa.int64())), safe=False) if default_value is not None: is_empty = pa.compute.equal(pa.compute.list_value_length(feat), 0) feat = pa.compute.if_else(is_empty, pa.nulls(len(feat)), feat) feat = feat.fill_null([default_value]) seq_offsets = feat.offsets.to_numpy() seq_lengths = seq_offsets[1:] - seq_offsets[:-1] feat_values = feat.values.values.to_numpy() feat_offsets = feat.values.offsets.to_numpy() feat_lengths = feat_offsets[1:] - feat_offsets[:-1] else: # dtype = list<int> or others can cast to list<int> feat = feat.cast(pa.list_(pa.int64()), safe=False) if default_value is not None: is_empty = pa.compute.equal(pa.compute.list_value_length(feat), 0) feat = pa.compute.if_else(is_empty, pa.nulls(len(feat)), feat) feat = feat.fill_null(default_value) seq_offsets = feat.offsets.to_numpy() seq_lengths = seq_offsets[1:] - seq_offsets[:-1] feat_values = feat.values.to_numpy() feat_lengths = np.ones_like(feat_values, dtype=np.int32) else: raise ValueError( f"{name} only support str|list<int>|list<list<int>> dtype input," f" but get {feat.dtype}." ) return SequenceSparseData(name, feat_values, feat_lengths, seq_lengths) def _parse_fg_encoded_sequence_dense_feature_impl( name: str, feat: pa.Array, sequence_delim: str = ";", multival_sep: str = chr(3), value_dim: int = 1, default_value: Optional[List[float]] = None, ) -> SequenceDenseData: """Parse fg encoded sequence dense feature. Args: name (str): feature name. feat (pa.Array): input feature data. sequence_delim (str): sequence delimiter. multival_sep (str): string separator for multi-val data. value_dtype (pa.DataType): value dtype. value_dim (int): value dimension. default_value (list): default value. Returns: an instance of SequenceSparseFeature. """ if pa.types.is_string(feat.type): is_empty = pa.compute.equal(feat, pa.scalar("")) feat = pa.compute.if_else(is_empty, pa.nulls(len(feat)), feat) if default_value is not None: feat = feat.fill_null(multival_sep.join(map(str, default_value))) list_seq_feat = pa.compute.split_pattern(feat, sequence_delim) list_feat = pa.compute.split_pattern(list_seq_feat.values, multival_sep) seq_offsets = list_seq_feat.offsets.to_numpy() seq_lengths = seq_offsets[1:] - seq_offsets[:-1] feat_values = ( list_feat.values.cast(pa.float32(), safe=False) .to_numpy() .reshape(-1, value_dim) ) elif pa.types.is_list(feat.type): if pa.types.is_list(feat.type.value_type): # dtype = list<list<float>> or others can cast to list<list<float>> feat = feat.cast(pa.list_(pa.list_(pa.float32())), safe=False) if default_value is not None: is_empty = pa.compute.equal(pa.compute.list_value_length(feat), 0) feat = pa.compute.if_else(is_empty, pa.nulls(len(feat)), feat) feat = feat.fill_null([default_value]) seq_offsets = feat.offsets.to_numpy() seq_lengths = seq_offsets[1:] - seq_offsets[:-1] feat_values = feat.values.values.to_numpy().reshape(-1, value_dim) else: # dtype = list<float> or others can cast to list<float> feat = feat.cast(pa.list_(pa.float32()), safe=False) if default_value is not None: is_empty = pa.compute.equal(pa.compute.list_value_length(feat), 0) feat = pa.compute.if_else(is_empty, pa.nulls(len(feat)), feat) feat = feat.fill_null(default_value) seq_offsets = feat.offsets.to_numpy() seq_lengths = seq_offsets[1:] - seq_offsets[:-1] feat_values = feat.values.to_numpy().reshape(-1, value_dim) else: raise ValueError( f"{name} only support str|list<float>|list<float<float>> dtype input," f" but get {feat.dtype}." ) return SequenceDenseData(name, feat_values, seq_lengths) class SequenceIdFeature(IdFeature): """SequenceIdFeature class. Args: feature_config (FeatureConfig): a instance of feature config. sequence_name (str): sequence group name. sequence_delim (str): separator for sequence feature. sequence_length (int): max sequence length. sequence_pk (str): sequence primary key name for serving. fg_mode (FgMode): input data fg mode. fg_encoded_multival_sep (str, optional): multival_sep when fg_mode=FG_NONE """ def __init__( self, feature_config: FeatureConfig, sequence_name: Optional[str] = None, sequence_delim: Optional[str] = None, sequence_length: Optional[int] = None, sequence_pk: Optional[str] = None, fg_mode: FgMode = FgMode.FG_NONE, fg_encoded_multival_sep: Optional[str] = None, ) -> None: fc_type = feature_config.WhichOneof("feature") config = getattr(feature_config, fc_type) self._is_grouped_seq = False self.sequence_name = None self.sequence_delim = None self.sequence_length = None self.sequence_pk = None if isinstance(config, feature_pb2.IdFeature): self._is_grouped_seq = True self.sequence_name = sequence_name self.sequence_delim = sequence_delim self.sequence_length = sequence_length if not sequence_pk: self.sequence_pk = f"user:{sequence_name}" else: self.sequence_pk = sequence_pk else: self.sequence_delim = config.sequence_delim self.sequence_length = config.sequence_length super().__init__(feature_config, fg_mode, fg_encoded_multival_sep) @property def name(self) -> str: """Feature name.""" if self._is_grouped_seq: return f"{self.sequence_name}__{self.config.feature_name}" else: return self.config.feature_name @property def value_dim(self) -> int: """Fg value dimension of the feature.""" if self.config.HasField("value_dim"): return self.config.value_dim else: return 1 @property def is_sequence(self) -> bool: """Feature is sequence or not.""" return True @property def is_grouped_sequence(self) -> bool: """Feature is grouped sequence or not.""" return self._is_grouped_seq def _build_side_inputs(self) -> Optional[List[Tuple[str, str]]]: """Input field names with side.""" if self.config.HasField("expression"): if self._is_grouped_seq: side, name = self.config.expression.split(":") return [(side, f"{self.sequence_name}__{name}")] else: return [tuple(self.config.expression.split(":"))] else: return None def _parse(self, input_data: Dict[str, pa.Array]) -> ParsedData: """Parse input data for the feature impl. Args: input_data (dict): raw input feature data. Return: parsed feature data. """ if self.fg_mode == FgMode.FG_NONE: feat = input_data[self.name] parsed_feat = _parse_fg_encoded_sequence_sparse_feature_impl( self.name, feat, sequence_delim=self.sequence_delim, **self._fg_encoded_kwargs, ) elif self.fg_mode == FgMode.FG_NORMAL: input_feat = input_data[self.inputs[0]] if pa.types.is_list(input_feat.type): input_feat = input_feat.fill_null([]) input_feat = input_feat.tolist() values, key_lengths, seq_lengths = self._fg_op.to_bucketized_jagged_tensor( input_feat ) parsed_feat = SequenceSparseData( name=self.name, values=values, key_lengths=key_lengths, seq_lengths=seq_lengths, ) else: raise ValueError( f"fg_mode: {self.fg_mode} is not supported without fg handler." ) return parsed_feat def init_fg(self) -> None: """Init fg op.""" cfgs = self.fg_json() is_rank_zero = os.environ.get("RANK", "0") == "0" if self._is_grouped_seq: # pyre-ignore [16] self._fg_op = pyfg.FeatureFactory.create( cfgs[0], self.sequence_name, self.sequence_delim, self.sequence_length, is_rank_zero, ) else: # pyre-ignore [16] self._fg_op = pyfg.FeatureFactory.create( cfgs[0], is_rank_zero, ) def fg_json(self) -> List[Dict[str, Any]]: """Get fg json config.""" if self.config.default_value == "": logger.warning( "SequenceIdFeature not support empty default value now. reset to zero." ) self.config.default_value = "0" fg_cfg = { "feature_type": "id_feature" if self._is_grouped_seq else "sequence_id_feature", "feature_name": self.config.feature_name, "default_value": self.config.default_value, "expression": self.config.expression, "value_type": "string", "need_prefix": False, } if not self._is_grouped_seq: fg_cfg["sequence_delim"] = self.config.sequence_delim fg_cfg["sequence_length"] = self.config.sequence_length if self.config.separator != "\x1d": fg_cfg["separator"] = self.config.separator if self.config.HasField("zch"): fg_cfg["hash_bucket_size"] = MAX_HASH_BUCKET_SIZE elif self.config.HasField("hash_bucket_size"): fg_cfg["hash_bucket_size"] = self.config.hash_bucket_size elif self.config.HasField("num_buckets"): fg_cfg["num_buckets"] = self.config.num_buckets elif len(self.vocab_list) > 0: fg_cfg["vocab_list"] = self.vocab_list fg_cfg["default_bucketize_value"] = self.default_bucketize_value elif len(self.config.vocab_dict) > 0: fg_cfg["vocab_dict"] = self.vocab_dict fg_cfg["default_bucketize_value"] = self.default_bucketize_value elif len(self.vocab_file) > 0: fg_cfg["vocab_file"] = self.vocab_file fg_cfg["default_bucketize_value"] = self.default_bucketize_value if self.config.HasField("value_dim"): fg_cfg["value_dim"] = self.config.value_dim else: fg_cfg["value_dim"] = 1 return [fg_cfg] class SequenceRawFeature(RawFeature): """SequenceIdFeature class. Args: feature_config (FeatureConfig): a instance of feature config. sequence_name (str): sequence group name. sequence_delim (str): separator for sequence feature. sequence_length (int): max sequence length. sequence_pk (str): sequence primary key name for serving. fg_mode (FgMode): input data fg mode. fg_encoded_multival_sep (str, optional): multival_sep when fg_mode=FG_NONE """ def __init__( self, feature_config: FeatureConfig, sequence_name: Optional[str] = None, sequence_delim: Optional[str] = None, sequence_length: Optional[int] = None, sequence_pk: Optional[str] = None, fg_mode: FgMode = FgMode.FG_NONE, fg_encoded_multival_sep: Optional[str] = None, ) -> None: fc_type = feature_config.WhichOneof("feature") config = getattr(feature_config, fc_type) self._is_grouped_seq = False self.sequence_name = None self.sequence_delim = None self.sequence_length = None self.sequence_pk = None if isinstance(config, feature_pb2.RawFeature): self._is_grouped_seq = True self.sequence_name = sequence_name self.sequence_delim = sequence_delim self.sequence_length = sequence_length if not sequence_pk: self.sequence_pk = f"user:{sequence_name}" else: self.sequence_pk = sequence_pk else: self.sequence_delim = config.sequence_delim self.sequence_length = config.sequence_length super().__init__(feature_config, fg_mode, fg_encoded_multival_sep) @property def name(self) -> str: """Feature name.""" if self._is_grouped_seq: return f"{self.sequence_name}__{self.config.feature_name}" else: return self.config.feature_name @property def is_sequence(self) -> bool: """Feature is sequence or not.""" return True @property def is_grouped_sequence(self) -> bool: """Feature is grouped sequence or not.""" return self._is_grouped_seq @property def _dense_emb_type(self) -> Optional[str]: # TODO: support dense embedding for sequence raw feature. return None def _build_side_inputs(self) -> Optional[List[Tuple[str, str]]]: """Input field names with side.""" if self.config.HasField("expression"): if self._is_grouped_seq: side, name = self.config.expression.split(":") return [(side, f"{self.sequence_name}__{name}")] else: return [tuple(self.config.expression.split(":"))] else: return None def _parse(self, input_data: Dict[str, pa.Array]) -> ParsedData: """Parse input data for the feature impl. Args: input_data (dict): raw input feature data. Return: parsed feature data. """ if self.fg_mode == FgMode.FG_NONE: feat = input_data[self.name] if self.is_sparse: parsed_feat = _parse_fg_encoded_sequence_sparse_feature_impl( self.name, feat, sequence_delim=self.sequence_delim, **self._fg_encoded_kwargs, ) else: parsed_feat = _parse_fg_encoded_sequence_dense_feature_impl( self.name, feat, sequence_delim=self.sequence_delim, value_dim=self.config.value_dim, **self._fg_encoded_kwargs, ) elif self.fg_mode == FgMode.FG_NORMAL: input_feat = input_data[self.inputs[0]] if pa.types.is_list(input_feat.type): input_feat = input_feat.fill_null([]) input_feat = input_feat.tolist() if self._fg_op.is_sparse: values, lengths = self._fg_op.to_bucketized_jagged_tensor(input_feat) parsed_feat = SequenceSparseData( name=self.name, values=values, key_lengths=np.array( [self._fg_op.value_dimension()] * sum(lengths) ), seq_lengths=lengths, ) else: values, lengths = self._fg_op.to_jagged_tensor(input_feat) parsed_feat = SequenceDenseData( name=self.name, values=values, seq_lengths=lengths, ) else: raise ValueError( f"fg_mode: {self.fg_mode} is not supported without fg handler." ) return parsed_feat def init_fg(self) -> None: """Init fg op.""" cfgs = self.fg_json() is_rank_zero = os.environ.get("RANK", "0") == "0" if self._is_grouped_seq: # pyre-ignore [16] self._fg_op = pyfg.FeatureFactory.create( cfgs[0], self.sequence_name, self.sequence_delim, self.sequence_length, is_rank_zero, ) else: # pyre-ignore [16] self._fg_op = pyfg.FeatureFactory.create( cfgs[0], is_rank_zero, ) def fg_json(self) -> List[Dict[str, Any]]: """Get fg json config.""" fg_cfg = { "feature_type": "raw_feature" if self._is_grouped_seq else "sequence_raw_feature", "feature_name": self.config.feature_name, "default_value": self.config.default_value, "expression": self.config.expression, "value_type": "float", } if not self._is_grouped_seq: fg_cfg["sequence_delim"] = self.config.sequence_delim fg_cfg["sequence_length"] = self.config.sequence_length if self.config.value_dim > 1: if self.config.separator != "\x1d": fg_cfg["separator"] = self.config.separator fg_cfg["value_dim"] = self.config.value_dim if self.config.normalizer != "": fg_cfg["normalizer"] = self.config.normalizer if len(self.config.boundaries) > 0: fg_cfg["boundaries"] = list(self.config.boundaries) return [fg_cfg] class SequenceCustomFeature(CustomFeature): """SequenceCustomFeature class. Args: feature_config (FeatureConfig): a instance of feature config. sequence_name (str): sequence group name. sequence_delim (str): separator for sequence feature. sequence_length (int): max sequence length. sequence_pk (str): sequence primary key name for serving. fg_mode (FgMode): input data fg mode. fg_encoded_multival_sep (str, optional): multival_sep when fg_mode=FG_NONE """ def __init__( self, feature_config: FeatureConfig, sequence_name: Optional[str] = None, sequence_delim: Optional[str] = None, sequence_length: Optional[int] = None, sequence_pk: Optional[str] = None, fg_mode: FgMode = FgMode.FG_NONE, fg_encoded_multival_sep: Optional[str] = None, ) -> None: fc_type = feature_config.WhichOneof("feature") config = getattr(feature_config, fc_type) self._is_grouped_seq = False self.sequence_name = None self.sequence_delim = None self.sequence_length = None self.sequence_pk = None if isinstance(config, feature_pb2.CustomFeature): self._is_grouped_seq = True self.sequence_name = sequence_name self.sequence_delim = sequence_delim self.sequence_length = sequence_length if not sequence_pk: self.sequence_pk = f"user:{sequence_name}" else: self.sequence_pk = sequence_pk else: self.sequence_delim = config.sequence_delim self.sequence_length = config.sequence_length super().__init__(feature_config, fg_mode, fg_encoded_multival_sep) @property def name(self) -> str: """Feature name.""" if self._is_grouped_seq: return f"{self.sequence_name}__{self.config.feature_name}" else: return self.config.feature_name @property def is_sequence(self) -> bool: """Feature is sequence or not.""" return True @property def is_grouped_sequence(self) -> bool: """Feature is grouped sequence or not.""" return self._is_grouped_seq def _build_side_inputs(self) -> Optional[List[Tuple[str, str]]]: """Input field names with side.""" if len(self.config.expression) > 0: side_inputs = [] if self._is_grouped_seq: for expression in self.config.expression: side, name = expression.split(":") side_inputs.append((side, f"{self.sequence_name}__{name}")) else: for expression in self.config.expression: side_inputs.append(tuple(expression.split(":"))) return side_inputs else: return None @property def _dense_emb_type(self) -> Optional[str]: return None def _parse(self, input_data: Dict[str, pa.Array]) -> ParsedData: """Parse input data for the feature impl. Args: input_data (dict): raw input feature data. Return: parsed feature data. """ if self.fg_mode == FgMode.FG_NONE: feat = input_data[self.name] if self.is_sparse: parsed_feat = _parse_fg_encoded_sequence_sparse_feature_impl( self.name, feat, sequence_delim=self.sequence_delim, **self._fg_encoded_kwargs, ) else: parsed_feat = _parse_fg_encoded_sequence_dense_feature_impl( self.name, feat, sequence_delim=self.sequence_delim, value_dim=self.config.value_dim, **self._fg_encoded_kwargs, ) elif self.fg_mode == FgMode.FG_NORMAL: input_feats = [] for name in self.inputs: x = input_data[name] if pa.types.is_list(x.type): x = x.fill_null([]) x = x.tolist() input_feats.append(x) if self._fg_op.is_sparse: values, key_lengths, seq_lengths = ( self._fg_op.to_bucketized_jagged_tensor(input_feats) ) parsed_feat = SequenceSparseData( name=self.name, values=values, key_lengths=key_lengths, seq_lengths=seq_lengths, ) else: values, lengths = self._fg_op.to_jagged_tensor(input_feats) parsed_feat = SequenceDenseData( name=self.name, values=values, seq_lengths=lengths, ) else: raise ValueError( f"fg_mode: {self.fg_mode} is not supported without fg handler." ) return parsed_feat def init_fg(self) -> None: """Init fg op.""" cfgs = self.fg_json() is_rank_zero = os.environ.get("RANK", "0") == "0" if self._is_grouped_seq: # pyre-ignore [16] self._fg_op = pyfg.FeatureFactory.create( cfgs[0], self.sequence_name, self.sequence_delim, self.sequence_length, is_rank_zero, ) else: # pyre-ignore [16] self._fg_op = pyfg.FeatureFactory.create( cfgs[0], is_rank_zero, ) def fg_json(self) -> List[Dict[str, Any]]: """Get fg json config.""" if self.config.default_value == "": logger.warning( "SequenceCustomFeature not support empty default value now. " "reset to zero." ) self.config.default_value = "0" fg_cfg = super().fg_json()[0] fg_cfg["feature_name"] = self.config.feature_name fg_cfg["is_sequence"] = True if not self._is_grouped_seq: fg_cfg["sequence_delim"] = self.config.sequence_delim fg_cfg["sequence_length"] = self.config.sequence_length return [fg_cfg]