tzrec/datasets/data_parser.py (609 lines of code) (raw):

# Copyright (c) 2025, Alibaba Group; # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os from collections import defaultdict from typing import Dict, List, Optional, Tuple import numpy as np import numpy.typing as npt import pyarrow as pa import pyfg import torch from torchrec import JaggedTensor, KeyedJaggedTensor, KeyedTensor from tzrec.acc.utils import is_cuda_export, is_input_tile, is_input_tile_emb from tzrec.datasets.utils import ( Batch, DenseData, SequenceDenseData, SequenceSparseData, SparseData, ) from tzrec.features.feature import BaseFeature, FgMode, create_fg_json from tzrec.utils.logging_util import logger def _to_tensor(x: npt.NDArray) -> torch.Tensor: if not x.flags.writeable: x = np.array(x) return torch.from_numpy(x) class DataParser: """Input Data Parser. Args: features (list): a list of features. labels (list, optional): a list of label names. is_training (bool): is training or not. fg_threads (int): fg thread number. force_base_data_group (bool): force padding data into same data group with same batch_size. """ def __init__( self, features: List[BaseFeature], labels: Optional[List[str]] = None, sample_weights: Optional[List[str]] = None, is_training: bool = False, fg_threads: int = 1, force_base_data_group: bool = False, ) -> None: self._features = features self._labels = labels or [] self._sample_weights = sample_weights or [] self._is_training = is_training self._force_base_data_group = force_base_data_group self._fg_mode = features[0].fg_mode self._fg_threads = fg_threads self._fg_handler = None self.dense_keys = defaultdict(list) self.dense_length_per_key = defaultdict(list) self.sparse_keys = defaultdict(list) self.sequence_mulval_sparse_keys = defaultdict(list) self.sequence_dense_keys = [] self.has_weight_keys = defaultdict(list) for feature in self._features: if feature.is_sequence: if feature.is_sparse: self.sparse_keys[feature.data_group].append(feature.name) if feature.value_dim != 1: self.sequence_mulval_sparse_keys[feature.data_group].append( feature.name ) else: self.sequence_dense_keys.append(feature.name) elif feature.is_sparse: self.sparse_keys[feature.data_group].append(feature.name) else: self.dense_keys[feature.data_group].append(feature.name) self.dense_length_per_key[feature.data_group].append(feature.value_dim) if feature.is_weighted: self.has_weight_keys[feature.data_group].append(feature.name) if self._fg_mode in [FgMode.FG_DAG, FgMode.FG_BUCKETIZE]: self._init_fg_hander() self.feature_input_names = set() if self._fg_mode == FgMode.FG_DAG: self.feature_input_names = ( self._fg_handler.user_inputs() | self._fg_handler.item_inputs() | self._fg_handler.context_inputs() ) - set(self._fg_handler.sequence_feature_pks().values()) else: for feature in features: self.feature_input_names |= set(feature.inputs) self.user_inputs = set() self.user_feats = set() if is_input_tile(): self._init_fg_hander() self.user_inputs = self._fg_handler.user_inputs() | set( self._fg_handler.sequence_input_to_name().keys() ) self.user_feats = set([x.name for x in self._features if x.is_user_feat]) is_rank_zero = os.environ.get("RANK", "0") == "0" if is_rank_zero: logger.info(f"self.user_feats: {self.user_feats}") logger.info(f"self.user_inputs: {self.user_inputs}") if is_cuda_export(): logger.info(f"self.sparse_keys: {self.sparse_keys}") logger.info(f"self.dense_keys: {self.dense_keys}") logger.info(f"self.sequence_dense_keys: {self.sequence_dense_keys}") # get dense keys list self.dense_keys_list = [] for _, keys in self.dense_keys.items(): for key in keys: self.dense_keys_list.append(f"{key}") def _init_fg_hander(self) -> None: """Init pyfg dag handler.""" if not self._fg_handler: fg_json = create_fg_json(self._features) bucketize_only = self._fg_mode == FgMode.FG_BUCKETIZE # pyre-ignore [16] self._fg_handler = pyfg.FgArrowHandler( fg_json, self._fg_threads, bucketize_only=bucketize_only ) def parse(self, input_data: Dict[str, pa.Array]) -> Dict[str, torch.Tensor]: """Parse input data dict and build batch. Args: input_data (dict): raw input data. Return: output_data (dict): parsed feature data. """ output_data = {} if is_input_tile(): flag = False for k, v in input_data.items(): if self._fg_mode in (FgMode.FG_NONE, FgMode.FG_BUCKETIZE): if k in self.user_feats: input_data[k] = v.take([0]) else: if k in self.user_inputs: input_data[k] = v.take([0]) if not flag: output_data["batch_size"] = torch.tensor(v.__len__()) flag = True if self._fg_mode in (FgMode.FG_DAG, FgMode.FG_BUCKETIZE): self._parse_feature_fg_handler(input_data, output_data) else: self._parse_feature_normal(input_data, output_data) for label_name in self._labels: label = input_data[label_name] if pa.types.is_floating(label.type): output_data[label_name] = _to_tensor( label.cast(pa.float32(), safe=False).to_numpy() ) elif pa.types.is_integer(label.type): output_data[label_name] = _to_tensor( label.cast(pa.int64(), safe=False).to_numpy() ) else: raise ValueError( f"label column [{label_name}] only support int or float dtype now." ) for weight_name in self._sample_weights: weight = input_data[weight_name] if pa.types.is_floating(weight.type): output_data[weight_name] = _to_tensor( weight.cast(pa.float32(), safe=False).to_numpy() ) else: raise ValueError( f"sample weight column [{weight_name}] should be float dtype." ) return output_data def _parse_feature_normal( self, input_data: Dict[str, pa.Array], output_data: Dict[str, torch.Tensor] ) -> None: max_batch_size = ( max([len(v) for v in input_data.values()]) if self._force_base_data_group else 0 ) for feature in self._features: feat_data = feature.parse(input_data, is_training=self._is_training) if isinstance(feat_data, SequenceSparseData): output_data[f"{feature.name}.values"] = _to_tensor(feat_data.values) if self._force_base_data_group: feat_data.seq_lengths = np.pad( feat_data.seq_lengths, (0, max_batch_size - len(feat_data.seq_lengths)), ) output_data[f"{feature.name}.lengths"] = _to_tensor( feat_data.seq_lengths ) if feature.value_dim != 1: output_data[f"{feature.name}.key_lengths"] = _to_tensor( feat_data.key_lengths ) elif isinstance(feat_data, SequenceDenseData): output_data[f"{feature.name}.values"] = _to_tensor(feat_data.values) if self._force_base_data_group: feat_data.seq_lengths = np.pad( feat_data.seq_lengths, (0, max_batch_size - len(feat_data.seq_lengths)), ) output_data[f"{feature.name}.lengths"] = _to_tensor( feat_data.seq_lengths ) elif isinstance(feat_data, SparseData): output_data[f"{feature.name}.values"] = _to_tensor(feat_data.values) if self._force_base_data_group: feat_data.lengths = np.pad( feat_data.lengths, (0, max_batch_size - len(feat_data.lengths)) ) output_data[f"{feature.name}.lengths"] = _to_tensor(feat_data.lengths) if feat_data.weights is not None: output_data[f"{feature.name}.weights"] = _to_tensor( feat_data.weights ) elif isinstance(feat_data, DenseData): if self._force_base_data_group: feat_data.values = np.pad( feat_data.values, ((0, max_batch_size - len(feat_data.values)), (0, 0)), ) output_data[f"{feature.name}.values"] = _to_tensor(feat_data.values) def _parse_feature_fg_handler( self, input_data: Dict[str, pa.Array], output_data: Dict[str, torch.Tensor] ) -> None: max_batch_size = ( max([len(v) for v in input_data.values()]) if self._force_base_data_group else 0 ) input_data = { k: v for k, v in input_data.items() if k in self.feature_input_names } fg_output, status = self._fg_handler.process_arrow(input_data) assert status.ok(), status.message() for feature in self._features: feat_name = feature.name feat_data = fg_output[feat_name] if feature.is_sequence: if feature.is_sparse: output_data[f"{feat_name}.values"] = _to_tensor(feat_data.np_values) feat_lengths = feat_data.np_lengths if self._force_base_data_group: feat_lengths = np.pad( feat_lengths, (0, max_batch_size - len(feat_lengths)) ) output_data[f"{feat_name}.lengths"] = _to_tensor(feat_lengths) if feature.value_dim != 1: output_data[f"{feature.name}.key_lengths"] = _to_tensor( feat_data.np_key_lengths ) else: output_data[f"{feat_name}.values"] = _to_tensor( feat_data.dense_values ) feat_lengths = feat_data.np_lengths if self._force_base_data_group: feat_lengths = np.pad( feat_lengths, (0, max_batch_size - len(feat_lengths)) ) output_data[f"{feat_name}.lengths"] = _to_tensor(feat_lengths) else: if feature.is_sparse: output_data[f"{feat_name}.values"] = _to_tensor(feat_data.np_values) feat_lengths = feat_data.np_lengths if self._force_base_data_group: feat_lengths = np.pad( feat_lengths, (0, max_batch_size - len(feat_lengths)) ) output_data[f"{feat_name}.lengths"] = _to_tensor(feat_lengths) if feature.is_weighted: output_data[f"{feat_name}.weights"] = _to_tensor( feat_data.np_weights ) else: dense_values = feat_data.dense_values if self._force_base_data_group: dense_values = np.pad( dense_values, ((0, max_batch_size - len(dense_values)), (0, 0)), ) output_data[f"{feat_name}.values"] = _to_tensor(dense_values) def to_batch( self, input_data: Dict[str, torch.Tensor], force_no_tile: bool = False ) -> Batch: """Convert input data dict to Batch. Args: input_data (dict): input tensor dict. force_no_tile (bool): force no tile sparse features when INPUT_TILE=2. Returns: an instance of Batch. """ input_tile = is_input_tile() input_tile_emb = is_input_tile_emb() tile_size = -1 if input_tile: tile_size = input_data["batch_size"].item() if input_tile_emb: # For INPUT_TILE = 3 mode, batch_size of user features for sparse and dense # are all equal to 1, we tile it after embedding lookup in EmbeddingGroup dense_features = self._to_dense_features_user1_itemb(input_data) sparse_features, sequence_mulval_lengths = ( self._to_sparse_features_user1_itemb(input_data) ) elif input_tile: # For INPUT_TILE = 2 mode, # batch_size of user features for sparse are all equal to 1, we tile it # here to B and do not tile in EmbeddingGroup # batch_size of item features for dense are all equal to 1, we tile it # in EmbeddingGroup dense_features = self._to_dense_features_user1_itemb(input_data) if force_no_tile: # we prevent tile twice in PREDICT mode. sparse_features, sequence_mulval_lengths = ( self._to_sparse_features_user1_itemb(input_data) ) else: sparse_features, sequence_mulval_lengths = ( self._to_sparse_features_user1tile_itemb(input_data) ) else: dense_features = self._to_dense_features(input_data) sparse_features, sequence_mulval_lengths = self._to_sparse_features( input_data ) sequence_dense_features = {} for key in self.sequence_dense_keys: sequence_dense_feature = JaggedTensor( values=input_data[f"{key}.values"], lengths=input_data[f"{key}.lengths"], ) sequence_dense_features[key] = sequence_dense_feature labels = {} for label_name in self._labels: labels[label_name] = input_data[label_name] sample_weights = {} for weight in self._sample_weights: sample_weights[weight] = input_data[weight] batch = Batch( dense_features=dense_features, sparse_features=sparse_features, sequence_mulval_lengths=sequence_mulval_lengths, sequence_dense_features=sequence_dense_features, labels=labels, sample_weights=sample_weights, # pyre-ignore [6] tile_size=tile_size, ) return batch def _to_dense_features( self, input_data: Dict[str, torch.Tensor] ) -> Dict[str, KeyedTensor]: """Convert input data dict to batch dense features. Args: input_data (dict): input tensor dict. Returns: a dict of KeyedTensor. """ dense_features = {} for dg, keys in self.dense_keys.items(): values = [] for key in keys: values.append(input_data[f"{key}.values"]) dense_feature = KeyedTensor( keys=keys, length_per_key=self.dense_length_per_key[dg], values=torch.cat(values, dim=-1), ) dense_features[dg] = dense_feature return dense_features def _to_sparse_features( self, input_data: Dict[str, torch.Tensor] ) -> Tuple[Dict[str, KeyedJaggedTensor], Dict[str, KeyedJaggedTensor]]: """Convert to batch sparse features. Args: input_data (dict): input tensor dict. Returns: sparse_features: a dict of KeyedJaggedTensor . sequence_mulval_lengths: a dict of KeyedJaggedTensor, kjt.values is key_lengths, kjt.lengths is seq_lengths. """ sparse_features = {} sequence_mulval_lengths = {} for dg, keys in self.sparse_keys.items(): values = [] lengths = [] weights = [] mulval_keys = [] mulval_seq_lengths = [] mulval_key_lengths = [] dg_has_weight_keys = self.has_weight_keys[dg] dg_sequence_mulval_sparse_keys = self.sequence_mulval_sparse_keys[dg] for key in keys: values.append(input_data[f"{key}.values"]) length = input_data[f"{key}.lengths"] if key in dg_sequence_mulval_sparse_keys: # for multi-value sequence we flatten the sequence first seq_length = length key_length = input_data[f"{key}.key_lengths"] # TODO: remove to_float when segment_reduce support int values length = torch.segment_reduce( key_length.float(), "sum", lengths=seq_length ).to(length.dtype) mulval_keys.append(key) mulval_seq_lengths.append(seq_length) mulval_key_lengths.append(key_length) lengths.append(length) if len(dg_has_weight_keys) > 0: if key in dg_has_weight_keys: weights.append(input_data[f"{key}.weights"]) else: weights.append( torch.ones_like( input_data[f"{key}.values"], dtype=torch.float32 ) ) sparse_feature = KeyedJaggedTensor( keys=keys, values=torch.cat(values, dim=-1), lengths=torch.cat(lengths, dim=-1), weights=torch.cat(weights, dim=-1) if len(dg_has_weight_keys) > 0 else None, stride=lengths[0].size(0), # input_data['input_batch_size'] length_per_key=[x.numel() for x in values], ) sparse_features[dg] = sparse_feature if len(mulval_keys) > 0: sequence_mulval_length = KeyedJaggedTensor( keys=mulval_keys, values=torch.cat(mulval_key_lengths, dim=-1), lengths=torch.cat(mulval_seq_lengths, dim=-1), ) sequence_mulval_lengths[dg] = sequence_mulval_length return sparse_features, sequence_mulval_lengths def _to_dense_features_user1_itemb( self, input_data: Dict[str, torch.Tensor] ) -> Dict[str, KeyedTensor]: """Convert to batch dense features user_bs = 1 and item_bs = B. User feature and item feature use separate KeyedTensor because user and item with different batch_size. User feature use batch_size = 1 with suffix _user. Item feature use batch_size = actual batch size with suffix _item. Args: input_data (dict): input tensor dict. Returns: a dict of KeyedTensor. """ dense_features = {} for dg, keys in self.dense_keys.items(): values_item = [] keys_item = [] length_per_key_item = [] values_user = [] keys_user = [] length_per_key_user = [] for index, key in enumerate(keys): if key in self.user_feats: values_user.append(input_data[f"{key}.values"]) keys_user.append(key) length_per_key_user.append(self.dense_length_per_key[dg][index]) else: values_item.append(input_data[f"{key}.values"]) keys_item.append(key) length_per_key_item.append(self.dense_length_per_key[dg][index]) if len(keys_user) > 0: dense_feature_user = KeyedTensor( keys=keys_user, length_per_key=length_per_key_user, values=torch.cat(values_user, dim=-1), ) dense_features[dg + "_user"] = dense_feature_user if len(keys_item) > 0: dense_feature_item = KeyedTensor( keys=keys_item, length_per_key=length_per_key_item, values=torch.cat(values_item, dim=-1), ) dense_features[dg] = dense_feature_item return dense_features def _to_sparse_features_user1tile_itemb( self, input_data: Dict[str, torch.Tensor] ) -> Tuple[Dict[str, KeyedJaggedTensor], Dict[str, KeyedJaggedTensor]]: """Convert batch sparse features user_bs = 1 then tile and item_bs = B. User feature and item feature use one KeyedJaggedTensor. In input data, batch_size of **user** features = **1**, we **tile** it with batch_size and combine it with item features into one KeyedJaggedTensor. Args: input_data (dict): input tensor dict. Returns: sparse_features: a dict of KeyedJaggedTensor . sequence_mulval_lengths: a dict of KeyedJaggedTensor, kjt.values is key_lengths, kjt.lengths is seq_lengths. """ sparse_features = {} sequence_mulval_lengths = {} tile_size = input_data["batch_size"].item() for dg, keys in self.sparse_keys.items(): values = [] lengths = [] weights = [] mulval_keys = [] mulval_seq_lengths = [] mulval_key_lengths = [] dg_has_weight_keys = self.has_weight_keys[dg] dg_sequence_mulval_sparse_keys = self.sequence_mulval_sparse_keys[dg] for key in keys: value = input_data[f"{key}.values"] length = input_data[f"{key}.lengths"] if key in dg_sequence_mulval_sparse_keys: seq_length = length key_length = input_data[f"{key}.key_lengths"] # TODO: remove to_float when segment_reduce support int values length = torch.segment_reduce( key_length.float(), "sum", lengths=seq_length ).to(length.dtype) if key in self.user_feats: # pyre-ignore [6] seq_length = seq_length.tile(tile_size) # pyre-ignore [6] key_length = key_length.tile(tile_size) mulval_keys.append(key) mulval_seq_lengths.append(seq_length) mulval_key_lengths.append(key_length) if key in self.user_feats: # pyre-ignore [6] value = value.tile(tile_size) # pyre-ignore [6] length = length.tile(tile_size) values.append(value) lengths.append(length) if len(dg_has_weight_keys) > 0: if key in dg_has_weight_keys: weight = input_data[f"{key}.weights"] else: weight = torch.ones_like( input_data[f"{key}.values"], dtype=torch.float32 ) if key in self.user_feats: # pyre-ignore [6] weight = weight.tile(tile_size) weights.append(weight) sparse_feature = KeyedJaggedTensor( keys=keys, values=torch.cat(values, dim=-1), lengths=torch.cat(lengths, dim=-1), weights=torch.cat(weights, dim=-1) if len(dg_has_weight_keys) > 0 else None, stride=lengths[0].size(0), # input_data['input_batch_size'] length_per_key=[x.numel() for x in values], ) sparse_features[dg] = sparse_feature if len(mulval_keys) > 0: sequence_mulval_length = KeyedJaggedTensor( keys=mulval_keys, values=torch.cat(mulval_key_lengths, dim=-1), lengths=torch.cat(mulval_seq_lengths, dim=-1), ) sequence_mulval_lengths[dg] = sequence_mulval_length return sparse_features, sequence_mulval_lengths def _to_sparse_features_user1_itemb( self, input_data: Dict[str, torch.Tensor] ) -> Tuple[Dict[str, KeyedJaggedTensor], Dict[str, KeyedJaggedTensor]]: """Convert to batch sparse features user_bs = 1 and item_bs = B. User feature and item feature use separate KeyedJaggedTensor. because user and item with different batch_size. User feature use batch_size = 1 with suffix _user. Item feature use batch_size = actual batch size with suffix _item. Args: input_data (dict): input tensor dict. Returns: sparse_features: a dict of KeyedJaggedTensor . sequence_mulval_lengths: a dict of KeyedJaggedTensor, kjt.values is key_lengths, kjt.lengths is seq_lengths. """ sparse_features = {} sequence_mulval_lengths = {} for dg, keys in self.sparse_keys.items(): values_item = [] lengths_item = [] weights_item = [] keys_item = [] mulval_keys_item = [] mulval_seq_lengths_item = [] mulval_key_lengths_item = [] values_user = [] lengths_user = [] weights_user = [] keys_user = [] mulval_keys_user = [] mulval_seq_lengths_user = [] mulval_key_lengths_user = [] dg_has_weight_keys = self.has_weight_keys[dg] dg_sequence_mulval_sparse_keys = self.sequence_mulval_sparse_keys[dg] for key in keys: value = input_data[f"{key}.values"] length = input_data[f"{key}.lengths"] if key in dg_sequence_mulval_sparse_keys: # for multi-value sequence we flatten the sequence first seq_length = length key_length = input_data[f"{key}.key_lengths"] # TODO: remove to_float when segment_reduce support int values length = torch.segment_reduce( key_length.float(), "sum", lengths=seq_length ).to(length.dtype) if key in self.user_feats: values_user.append(value) lengths_user.append(length) keys_user.append(key) if key in dg_sequence_mulval_sparse_keys: mulval_keys_user.append(key) # pyre-ignore [61] mulval_seq_lengths_user.append(seq_length) # pyre-ignore [61] mulval_key_lengths_user.append(key_length) else: values_item.append(value) lengths_item.append(length) keys_item.append(key) if key in dg_sequence_mulval_sparse_keys: mulval_keys_item.append(key) # pyre-ignore [61] mulval_seq_lengths_item.append(seq_length) # pyre-ignore [61] mulval_key_lengths_item.append(key_length) if len(dg_has_weight_keys) > 0: if key in dg_has_weight_keys: weight = input_data[f"{key}.weights"] else: weight = torch.ones_like( input_data[f"{key}.values"], dtype=torch.float32 ) if key in self.user_feats: weights_user.append(weight) else: weights_item.append(weight) if len(keys_user) > 0: sparse_feature_user = KeyedJaggedTensor( keys=keys_user, values=torch.cat(values_user, dim=-1), lengths=torch.cat(lengths_user, dim=-1), weights=torch.cat(weights_user, dim=-1) if len(dg_has_weight_keys) > 0 else None, stride=lengths_user[0].size(0), # input_data['input_batch_size'] length_per_key=[x.numel() for x in values_user], ) sparse_features[dg + "_user"] = sparse_feature_user if len(keys_item) > 0: sparse_feature_item = KeyedJaggedTensor( keys=keys_item, values=torch.cat(values_item, dim=-1), lengths=torch.cat(lengths_item, dim=-1), weights=torch.cat(weights_item, dim=-1) if len(dg_has_weight_keys) > 0 else None, stride=lengths_item[0].size(0), # input_data['input_batch_size'] length_per_key=[x.numel() for x in values_item], ) sparse_features[dg] = sparse_feature_item if len(mulval_keys_user) > 0: sequence_mulval_length_user = KeyedJaggedTensor( keys=mulval_keys_user, values=torch.cat(mulval_key_lengths_user, dim=-1), lengths=torch.cat(mulval_seq_lengths_user, dim=-1), ) sequence_mulval_lengths[dg + "_user"] = sequence_mulval_length_user if len(mulval_keys_item) > 0: sequence_mulval_length_item = KeyedJaggedTensor( keys=mulval_keys_item, values=torch.cat(mulval_key_lengths_item, dim=-1), lengths=torch.cat(mulval_seq_lengths_item, dim=-1), ) sequence_mulval_lengths[dg] = sequence_mulval_length_item return sparse_features, sequence_mulval_lengths def dump_parsed_inputs(self, input_data: Dict[str, torch.Tensor]) -> pa.Array: """Dump parsed inputs for debug.""" feature_rows = defaultdict(dict) for f in self._features: if f.is_sparse: lengths = input_data[f"{f.name}.lengths"] values = input_data[f"{f.name}.values"].cpu().numpy() cnt = 0 # pyre-ignore [16] sep = f.sequence_delim if f.is_sequence else "," for i, ll in enumerate(lengths): cur_v = values[cnt : cnt + ll] cnt += ll feature_rows[i][f.name] = sep.join(cur_v.astype(str)) else: if f.is_sequence: lengths = input_data[f"{f.name}.lengths"] values = input_data[f"{f.name}.values"].cpu().numpy() cnt = 0 for i, ll in enumerate(lengths): cur_v = values[cnt : cnt + ll] cnt += ll feature_rows[i][f.name] = f.sequence_delim.join( map(",".join, cur_v.astype(str)) ) else: values = input_data[f"{f.name}.values"].cpu().numpy() for i, cur_v in enumerate(values): feature_rows[i][f.name] = ",".join(cur_v.astype(str)) result = [] for i in range(len(feature_rows)): result.append(" | ".join([f"{k}:{v}" for k, v in feature_rows[i].items()])) return pa.array(result) def __del__(self) -> None: if self._fg_handler: self._fg_handler.reset_executor()