Skip to content

API Reference: Torch Choice

Source code in torch_choice/data/choice_dataset.py
class ChoiceDataset(torch.utils.data.Dataset):
    def __init__(self,
                 item_index: torch.LongTensor,
                 num_items: int = None,
                 num_users: int = None,
                 num_sessions: int = None,
                 label: Optional[torch.LongTensor] = None,
                 user_index: Optional[torch.LongTensor] = None,
                 session_index: Optional[torch.LongTensor] = None,
                 item_availability: Optional[torch.BoolTensor] = None,
                 **kwargs) -> None:
        """
        Initialization methods for the dataset object, researchers should supply all information about the dataset
        using this initialization method.

        The number of choice instances are called `batch_size` in the documentation. The `batch_size` corresponds to the
        file length in wide-format dataset, and often denoted using `N`. We call it `batch_size` to follow the convention
        in machine learning literature.
        A `choice instance` is a row of the dataset, so there are `batch_size` choice instances in each `ChoiceDataset`.

        The dataset consists of:
        (1) a collection of `batch_size` tuples (item_id, user_id, session_id, label), where each tuple is a choice instance.
        (2) a collection of `observables` associated with item, user, session, etc.

        Args:
            item_index (torch.LongTensor): a tensor of shape (batch_size) indicating the relevant item in each row
                of the dataset, the relevant item can be:
                (1) the item bought in this choice instance,
                (2) or the item reviewed by the user. In the later case, we need the `label` tensor to specify the rating score.
                NOTE: The support for second case is under-development, currently, we are only supporting binary label.

            num_items (Optional[int]): the number of items in the dataset. If `None` is provided (default), the number of items will be inferred from the number of unique numbers in `item_index`.

            num_users (Optional[int]): the number of users in the dataset. If `None` is provided (default), the number of users will be inferred from the number of unique numbers in `user_index`.

            num_sessions (Optional[int]): the number of sessions in the dataset. If `None` is provided (default), the number of sessions will be inferred from the number of unique numbers in `session_index`.

            label (Optional[torch.LongTensor], optional): a tensor of shape (batch_size) indicating the label for prediction in
                each choice instance. While you want to predict the item bought, you can leave the `label` argument
                as `None` in the initialization method, and the model will use `item_index` as the object to be predicted.
                But if you are, for example, predicting the rating an user gave an item, label must be provided.
                Defaults to None.

            user_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
                the ID of the user who was involved in each choice instance. If `None` user index is provided, it's assumed
                that the choice instances are from the same user.
                `user_index` is required if and only if there are multiple users in the dataset, for example:
                    (1) user-observables is involved in the utility form,
                    (2) and/or the coefficient is user-specific.
                This tensor is used to select the corresponding user observables and coefficients assigned to the
                user (like theta_user) for making prediction for that purchase.
                Defaults to None.

            session_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
                the ID of the session when that choice instance occurred. This tensor is used to select the correct
                session observables or price observables for making prediction for that choice instance. Therefore, if
                there is no session/price observables, you can leave this argument as `None`. In this case, the `ChoiceDataset`
                object will assume each choice instance to be in its own session.
                Defaults to None.

            item_availability (Optional[torch.BoolTensor], optional): A boolean tensor of shape (num_sessions, num_items)
                indicating the availability of each item in each session. Utilities of unavailable items would be set to -infinite,
                and hence these unavailable items will be set to 0 while making prediction.
                We assume all items are available if set to None.
                Defaults to None.

        Other Kwargs (Observables):
            One can specify the following types of observables, where * in shape denotes any positive
                integer. Typically * represents the number of observables.
            Please refer to the documentation for a detailed guide to use observables.
            1. user observables must start with 'user_' and have shape (num_users, *)
            2. item observables must start with 'item_' and have shape (num_items, *)
            3. session observables must start with 'session_' and have shape (num_sessions, *)
            4. taste observables (those vary by user and item) must start with `taste_` and have shape
                (num_users, num_items, *).
            NOTE: we don't recommend using taste observables, because num_users * num_items is potentially large.
            5. price observables (those vary by session and item) must start with `price_` and have
                shape (num_sessions, num_items, *)
            6. itemsession observables starting with `itemsession_`, this is a more intuitive alias to the price
                observable.
        """
        # ENHANCEMENT(Tianyu): add item_names for summary.
        super(ChoiceDataset, self).__init__()
        self.label = label
        self.item_index = item_index
        self._num_items = num_items
        self._num_users = num_users
        self._num_sessions = num_sessions

        self.user_index = user_index
        self.session_index = session_index

        if self.session_index is None:
            # if any([x.startswith('session_') or x.startswith('price_') for x in kwargs.keys()]):
            # if any session sensitive observable is provided, but session index is not,
            # infer each row in the dataset to be a session.
            # TODO: (design choice) should we assign unique session index to each choice instance or the same session index.
            print('No `session_index` is provided, assume each choice instance is in its own session.')
            self.session_index = torch.arange(len(self.item_index)).long()

        self.item_availability = item_availability

        for key, item in kwargs.items():
            if self._is_attribute(key):
                # all observable should be float.
                item = item.float()
            setattr(self, key, item)

        # TODO: add a validation procedure to check the consistency of the dataset.

    def __getitem__(self, indices: Union[int, torch.LongTensor]) -> "ChoiceDataset":
        """Retrieves samples corresponding to the provided index or list of indices.

        Args:
            indices (Union[int, torch.LongTensor]): a single integer index or a tensor of indices.

        Returns:
            ChoiceDataset: a subset of the dataset.
        """
        if isinstance(indices, int):
            # convert single integer index to an array of indices.
            indices = torch.LongTensor([indices])
        new_dict = dict()
        new_dict['item_index'] = self.item_index[indices].clone()

        # copy optional attributes.
        new_dict['label'] = self.label[indices].clone() if self.label is not None else None
        new_dict['user_index'] = self.user_index[indices].clone() if self.user_index is not None else None
        new_dict['session_index'] = self.session_index[indices].clone() if self.session_index is not None else None
        # item_availability has shape (num_sessions, num_items), no need to re-index it.
        new_dict['item_availability'] = self.item_availability

        # copy other attributes.
        for key, val in self.__dict__.items():
            if key not in new_dict.keys():
                if torch.is_tensor(val):
                    new_dict[key] = val.clone()
                else:
                    new_dict[key] = copy.deepcopy(val)

        subset = self._from_dict(new_dict)
        # make sure the new dataset inherits the num_sessions, num_items, and num_users from parent.
        subset._num_users = self.num_users
        subset._num_items = self.num_items
        subset._num_sessions = self.num_sessions
        return subset

    def __len__(self) -> int:
        """Returns number of samples in this dataset.

        Returns:
            int: length of the dataset.
        """
        return len(self.item_index)

    def __contains__(self, key: str) -> bool:
        return key in self.keys

    def __eq__(self, other: "ChoiceDataset") -> bool:
        """Returns whether all tensor attributes of both ChoiceDatasets are equal."""
        if not isinstance(other, ChoiceDataset):
            raise TypeError('You can only compare with ChoiceDataset objects.')
        else:
            flag = True
            for key, val in self.__dict__.items():
                if torch.is_tensor(val):
                    # ignore NaNs while comparing.
                    if not torch.equal(torch.nan_to_num(val), torch.nan_to_num(other.__dict__[key])):
                        print('Attribute {} is not equal.'.format(key))
                        flag = False
            return flag

    @property
    def device(self) -> str:
        """Returns the device of the dataset.

        Returns:
            str: the device of the dataset.
        """
        for attr in self.__dict__.values():
            if torch.is_tensor(attr):
                return attr.device

    @property
    def num_users(self) -> int:
        """Returns number of users involved in this dataset, returns 1 if there is no user identity.

        Returns:
            int: the number of users involved in this dataset.
        """
        if self._num_users is not None:
            return self._num_users
        elif self.user_index is not None:
            num_unique = len(torch.unique(self.user_index))
            expected_num_users = int(self.user_index.max()) + 1
            if num_unique != expected_num_users:
                warnings.warn(f"The number of users is inferred from the number of unique users in the user_index tensor. The user_index tensor in the ChoiceDataset ranges from {int(self.user_index.min())} to {int(self.user_index.max())}. The ChoiceDataset assumes user_index to be 0-indexed and encoded using consecutive integers. There are {expected_num_users} users expected given max(user_index). However, there are {num_unique} unique values in the user_index . This could be caused by missing users in the dataset (i.e., some users are not in user_index at all). If this is not expected, please check the user_index tensor. For a safer behavior, please provide the number of users explicitly by using the num_users keyword while initializing the ChoiceDataset class.")
            else:
                warnings.warn(f"The number of users is inferred from the number of unique users in the user_index tensor. This might lead to unexpected behaviors if some users never appeared in the user_index tensor. For a safer behavior, please provide the number of users explicitly by using the num_users keyword while initializing the ChoiceDataset class.")

            # infer from the number of unique users using the user_index.
            return len(torch.unique(self.user_index))
        else:
            return 1

    @property
    def num_items(self) -> int:
        """Returns the number of items involved in this dataset.

        Returns:
            int: the number of items involved in this dataset.
        """
        if self._num_items is not None:
            # return the _num_items provided in the constructor.
            return self._num_items
        else:
            # infer the number of items from item_index.
            # the -1 is an optional special symbol for outside option, do not count it towards the number of items.
            num_unique = len(torch.unique(self.item_index[self.item_index != -1]))
            expected_num_items = int(self.item_index[self.item_index != -1].max()) + 1
            if num_unique != expected_num_items:
                warnings.warn(f"The number of items is inferred from the number of unique items, excluding -1's denoting outside options, in the item_index tensor. The item_index tensor in the ChoiceDataset ranges from {int(self.item_index[self.item_index != -1].min())} to {int(self.item_index[self.item_index != -1].max())}, excluding -1's. The ChoiceDataset assumes item_index to be 0-indexed and encoded using consecutive integers. There are {expected_num_items} items expected given max(item_index). However, there are {num_unique} unique values in item_index. This could be caused by missing items in the dataset (i.e., some items are not in item_index at all). If this is not expected, please check the item_index tensor. For a safer behavior, please provide the number of items explicitly by using the num_items keyword while initializing the ChoiceDataset class.")
            else:
                warnings.warn(f"The number of items is inferred from the number of unique items, excluding -1's denoting outside options, in the item_index tensor. This might lead to unexpected behaviors if some items never appeared in the item_index tensor. For a safer behavior, please provide the number of items explicitly by using the num_items keyword while initializing the ChoiceDataset class.")

            return len(torch.unique(self.item_index[self.item_index != -1]))

    @property
    def num_sessions(self) -> int:
        """Returns the number of sessions involved in this dataset.

        Returns:
            int: the number of sessions involved in this dataset.
        """
        if self._num_sessions is not None:
            # return the _num_sessions provided in the constructor.
            return self._num_sessions
        else:
            num_unique = len(torch.unique(self.session_index))
            expected_num_sessions = int(self.session_index.max()) + 1
            if num_unique != expected_num_sessions:
                warnings.warn(f"The number of sessions is inferred from the number of unique sessions in the session_index tensor. The session_index tensor in the ChoiceDataset ranges from {int(self.session_index.min())} to {int(self.session_index.max())}. The ChoiceDataset assumes session_index to be 0-indexed and encoded using consecutive integers. There are {expected_num_sessions} sessions expected given max(session_index). However, there are {num_unique} unique values in the session_index . This could be caused by missing sessions in the dataset (i.e., some sessions are not in session_index at all). If this is not expected, please check the session_index tensor. For a safer behavior, please provide the number of sessions explicitly by using the num_sessions keyword while initializing the ChoiceDataset class.")
            else:
                warnings.warn(f"The number of sessions is inferred from the number of unique sessions in the session_index tensor. This might lead to unexpected behaviors if some sessions never appeared in the session_index tensor. For a safer behavior, please provide the number of sessions explicitly by using the num_sessions keyword while initializing the ChoiceDataset class.")
            # infer the number of sessions from session_index.
            return len(torch.unique(self.session_index))

    @property
    def x_dict(self) -> Dict[object, torch.Tensor]:
        """Formats attributes of in this dataset into shape (num_sessions, num_items, num_params) and returns in a dictionary format.
        Models in this package are expecting this dictionary based data format.

        Returns:
            Dict[object, torch.Tensor]: a dictionary with attribute names in the dataset as keys, and reshaped attribute
                tensors as values.
        """
        out = dict()
        for key, val in self.__dict__.items():
            if self._is_attribute(key):  # only include attributes.
                out[key] = self._expand_tensor(key, val)  # reshape to (num_sessions, num_items, num_params).
        return out

    @classmethod
    def _from_dict(cls, dictionary: Dict[str, torch.tensor]) -> "ChoiceDataset":
        """Creates an instance of ChoiceDataset from a dictionary of arguments.

        Args:
            dictionary (Dict[str, torch.tensor]): a dictionary with keys as argument names and values as arguments.

        Returns:
            ChoiceDataset: the created copy of dataset.
        """
        dataset = cls(**dictionary)
        for key, item in dictionary.items():
            setattr(dataset, key, item)
        return dataset

    def apply_tensor(self, func: callable) -> "ChoiceDataset":
        """This s a helper method to apply the provided function to all tensors and tensor values of all dictionaries.

        Args:
            func (callable): a callable function to be applied on tensors and tensor-values of dictionaries.

        Returns:
            ChoiceDataset: the modified dataset.
        """
        for key, item in self.__dict__.items():
            if torch.is_tensor(item):
                setattr(self, key, func(item))
            # boardcast func to dictionary of tensors as well.
            elif isinstance(getattr(self, key), dict):
                for obj_key, obj_item in getattr(self, key).items():
                    if torch.is_tensor(obj_item):
                        setattr(getattr(self, key), obj_key, func(obj_item))
        return self

    def to(self, device: Union[str, torch.device]) -> "ChoiceDataset":
        """Moves all tensors in this dataset to the specified PyTorch device.

        Args:
            device (Union[str, torch.device]): the destination device.

        Returns:
            ChoiceDataset: the modified dataset on the new device.
        """
        return self.apply_tensor(lambda x: x.to(device))

    def clone(self) -> "ChoiceDataset":
        """Creates a copy of self.

        Returns:
            ChoiceDataset: a copy of self.
        """
        dictionary = {}
        for k, v in self.__dict__.items():
            if torch.is_tensor(v):
                dictionary[k] = v.clone()
            else:
                dictionary[k] = copy.deepcopy(v)
        new = self.__class__._from_dict(dictionary)
        new._num_users = self.num_users
        new._num_items = self.num_items
        new._num_sessions = self.num_sessions
        return new

    def _check_device_consistency(self) -> None:
        """Checks if all tensors in this dataset are on the same device.

        Raises:
            Exception: an exception is raised if not all tensors are on the same device.
        """
        # assert all tensors are on the same device.
        devices = list()
        for val in self.__dict__.values():
            if torch.is_tensor(val):
                devices.append(val.device)
        if len(set(devices)) > 1:
            raise Exception(f'Found tensors on different devices: {set(devices)}.',
                            'Use dataset.to() method to align devices.')

    def _size_repr(self, value: object) -> List[int]:
        """A helper method to get the string-representation of object sizes, this is helpful while constructing the
        string representation of the dataset.

        Args:
            value (object): an object to examine its size.

        Returns:
            List[int]: list of integers representing the size of the object, length of the list is equal to dimension of `value`.
        """
        if torch.is_tensor(value):
            return list(value.size())
        elif isinstance(value, int) or isinstance(value, float):
            return [1]
        elif isinstance(value, list) or isinstance(value, tuple):
            return [len(value)]
        else:
            return []

    def __repr__(self) -> str:
        """A method to get a string representation of the dataset.

        Returns:
            str: the string representation of the dataset.
        """
        # don't print shapes of internal attributes like _num_users and _num_items.
        info = [f'{key}={self._size_repr(item)}' for key, item in self.__dict__.items() if not key.startswith('_')]
        return f"{self.__class__.__name__}(num_items={self.num_items}, num_users={self.num_users}, num_sessions={self.num_sessions}, {', '.join(info)}, device={self.device})"

    # ==================================================================================================================
    # methods for checking attribute categories.
    # ==================================================================================================================
    @staticmethod
    def _is_item_attribute(key: str) -> bool:
        return key.startswith('item_') and (key != 'item_availability') and (key != 'item_index')

    @staticmethod
    def _is_user_attribute(key: str) -> bool:
        return key.startswith('user_') and (key != 'user_index')

    @staticmethod
    def _is_session_attribute(key: str) -> bool:
        return key.startswith('session_') and (key != 'session_index')

    @staticmethod
    def _is_useritem_attribute(key: str) -> bool:
        return key.startswith('useritem_') or key.startswith('itemuser_')

    @staticmethod
    def _is_price_attribute(key: str) -> bool:
        return key.startswith('price_') or key.startswith('itemsession_') or key.startswith('sessionitem_')

    @staticmethod
    def _is_usersession_attribute(key: str) -> bool:
        return key.startswith('usersession_') or key.startswith('sessionuser_')

    @staticmethod
    def _is_usersessionitem_attribute(key: str) -> bool:
        return key.startswith('usersessionitem_') or key.startswith('useritemsession_') \
            or key.startswith('itemusersession_') or key.startswith('itemsessionuser_') \
            or key.startswith('sessionuseritem_') or key.startswith('sessionitemuser_')

    def _is_attribute(self, key: str) -> bool:
        return self._is_item_attribute(key) \
            or self._is_user_attribute(key) \
            or self._is_session_attribute(key) \
            or self._is_useritem_attribute(key) \
            or self._is_price_attribute(key) \
            or self._is_usersession_attribute(key) \
            or self._is_usersessionitem_attribute(key)

    def _expand_tensor(self, key: str, val: torch.Tensor) -> torch.Tensor:
        """Expands attribute tensor to (len(self), num_items, num_params) shape for prediction tasks, this method
        won't reshape the tensor at all if the `key` (i.e., name of the tensor) suggests its not an attribute of any kind.

        Args:
            key (str): name of the attribute used to determine the raw shape of the tensor. For example, 'item_obs' means
                the raw tensor is in shape (num_items, num_params).
            val (torch.Tensor): the attribute tensor to be reshaped.

        Returns:
            torch.Tensor: the reshaped tensor with shape (num_sessions, num_items, num_params).
        """
        if not self._is_attribute(key):
            # this is a sanity check.
            raise ValueError(f'Warning: the input key {key} is not an attribute of the dataset, will NOT modify the provided tensor.')

        num_params = val.shape[-1]  # the number of parameters/coefficients/observables.

        # convert attribute tensors to (len(self), num_items, num_params) shape.
        if self._is_user_attribute(key):
            # user_attribute (num_users, *)
            out = val[self.user_index, :].view(
                len(self), 1, num_params).expand(-1, self.num_items, -1)
        elif self._is_item_attribute(key):
            # item_attribute (num_items, *)
            out = val.view(1, self.num_items, num_params).expand(
                len(self), -1, -1)
        elif self._is_useritem_attribute(key):
            # useritem_attribute (num_users, num_items, *)
            out = val[self.user_index, :, :]
        elif self._is_session_attribute(key):
            # session_attribute (num_sessions, *)
            out = val[self.session_index, :].view(
                len(self), 1, num_params).expand(-1, self.num_items, -1)
        elif self._is_price_attribute(key):
            # price_attribute (num_sessions, num_items, *)
            out = val[self.session_index, :, :]
        elif self._is_usersession_attribute(key):
            # user-session (num_users, num_sessions, *)
            out = val[self.user_index, self.session_index, :]  # (len(self), *)
            out = out.view(len(self), 1, num_params).expand(-1, self.num_items, -1)  # (len(self), num_items, *)
        elif self._is_usersessionitem_attribute(key):
            # usersessionitem_attribute has shape (num_users, num_sessions, num_items, *)
            out = val[self.user_index, self.session_index, :, :]  # (len(self), num_items, *)

        else:
            raise ValueError(f'Warning: the input key {key} is not an attribute of the dataset, will NOT modify the provided tensor.')

        assert out.shape == (len(self), self.num_items, num_params), f'Error: the output shape {out.shape} is not correct, expected: {(len(self), self.num_items, num_params)}.'
        return out

    @staticmethod
    def unique(tensor: torch.Tensor) -> Tuple[np.ndarray]:
        arr = tensor.cpu().numpy()
        unique, counts = np.unique(arr, return_counts=True)
        count_sort_ind = np.argsort(-counts)
        unique = unique[count_sort_ind]
        counts = counts[count_sort_ind]
        return unique, counts

    def summary(self) -> None:
        """A method to summarize the dataset.

        Returns:
            str: the string representation of the dataset.
        """
        summary = ['ChoiceDataset with {} sessions, {} items, {} users, {} purchase records (observations) .'.format(
            self.num_sessions, self.num_items, self.num_users if self.user_index is not None else 'single', len(self))]

        # summarize users.
        if self.user_index is not None:
            unique, counts = self.unique(self.user_index)
            summary.append(f"The most frequent user is {unique[0]} with {counts[0]} observations; the least frequent user is {unique[-1]} with {counts[-1]} observations; on average, there are {counts.astype(float).mean():.2f} observations per user.")

            N = len(unique)
            K = min(5, N)
            string = f'{K} most frequent users are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
            summary.append(string)
            string = f'{K} least frequent users are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
            summary.append(string)

        # summarize items.
        unique, counts = self.unique(self.item_index)
        N = len(unique)
        K = min(5, N)
        summary.append(f"The most frequent item is {unique[0]}, it was chosen {counts[0]} times; the least frequent item is {unique[-1]} it was {counts[-1]} times; on average, each item was purchased {counts.astype(float).mean():.2f} times.")

        string = f'{K} most frequent items are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
        summary.append(string)
        string = f'{K} least frequent items are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
        summary.append(string)

        summary.append('Attribute Summaries:')
        for key, item in self.__dict__.items():
            if self._is_attribute(key) and torch.is_tensor(item):
                summary.append("Observable Tensor '{}' with shape {}".format(key, item.shape))
                # price attributes are 3-dimensional tensors, ignore  for cleanness here.
                if (not self._is_price_attribute(key)) and (not self._is_usersessionitem_attribute(key)) and (not self._is_useritem_attribute(key)) and (not self._is_usersession_attribute(key)):
                    summary.append(str(pd.DataFrame(item.to('cpu').float().numpy()).describe()))
        print('\n'.join(summary) + f"\ndevice={self.device}")
        return None

device: str property readonly

Returns the device of the dataset.

Returns:

Type Description
str

the device of the dataset.

num_items: int property readonly

Returns the number of items involved in this dataset.

Returns:

Type Description
int

the number of items involved in this dataset.

num_sessions: int property readonly

Returns the number of sessions involved in this dataset.

Returns:

Type Description
int

the number of sessions involved in this dataset.

num_users: int property readonly

Returns number of users involved in this dataset, returns 1 if there is no user identity.

Returns:

Type Description
int

the number of users involved in this dataset.

x_dict: Dict[object, torch.Tensor] property readonly

Formats attributes of in this dataset into shape (num_sessions, num_items, num_params) and returns in a dictionary format. Models in this package are expecting this dictionary based data format.

Returns:

Type Description
Dict[object, torch.Tensor]

a dictionary with attribute names in the dataset as keys, and reshaped attribute tensors as values.

__eq__(self, other) special

Returns whether all tensor attributes of both ChoiceDatasets are equal.

Source code in torch_choice/data/choice_dataset.py
def __eq__(self, other: "ChoiceDataset") -> bool:
    """Returns whether all tensor attributes of both ChoiceDatasets are equal."""
    if not isinstance(other, ChoiceDataset):
        raise TypeError('You can only compare with ChoiceDataset objects.')
    else:
        flag = True
        for key, val in self.__dict__.items():
            if torch.is_tensor(val):
                # ignore NaNs while comparing.
                if not torch.equal(torch.nan_to_num(val), torch.nan_to_num(other.__dict__[key])):
                    print('Attribute {} is not equal.'.format(key))
                    flag = False
        return flag

__getitem__(self, indices) special

Retrieves samples corresponding to the provided index or list of indices.

Parameters:

Name Type Description Default
indices Union[int, torch.LongTensor]

a single integer index or a tensor of indices.

required

Returns:

Type Description
ChoiceDataset

a subset of the dataset.

Source code in torch_choice/data/choice_dataset.py
def __getitem__(self, indices: Union[int, torch.LongTensor]) -> "ChoiceDataset":
    """Retrieves samples corresponding to the provided index or list of indices.

    Args:
        indices (Union[int, torch.LongTensor]): a single integer index or a tensor of indices.

    Returns:
        ChoiceDataset: a subset of the dataset.
    """
    if isinstance(indices, int):
        # convert single integer index to an array of indices.
        indices = torch.LongTensor([indices])
    new_dict = dict()
    new_dict['item_index'] = self.item_index[indices].clone()

    # copy optional attributes.
    new_dict['label'] = self.label[indices].clone() if self.label is not None else None
    new_dict['user_index'] = self.user_index[indices].clone() if self.user_index is not None else None
    new_dict['session_index'] = self.session_index[indices].clone() if self.session_index is not None else None
    # item_availability has shape (num_sessions, num_items), no need to re-index it.
    new_dict['item_availability'] = self.item_availability

    # copy other attributes.
    for key, val in self.__dict__.items():
        if key not in new_dict.keys():
            if torch.is_tensor(val):
                new_dict[key] = val.clone()
            else:
                new_dict[key] = copy.deepcopy(val)

    subset = self._from_dict(new_dict)
    # make sure the new dataset inherits the num_sessions, num_items, and num_users from parent.
    subset._num_users = self.num_users
    subset._num_items = self.num_items
    subset._num_sessions = self.num_sessions
    return subset

__init__(self, item_index, num_items=None, num_users=None, num_sessions=None, label=None, user_index=None, session_index=None, item_availability=None, **kwargs) special

Initialization methods for the dataset object, researchers should supply all information about the dataset using this initialization method.

The number of choice instances are called batch_size in the documentation. The batch_size corresponds to the file length in wide-format dataset, and often denoted using N. We call it batch_size to follow the convention in machine learning literature. A choice instance is a row of the dataset, so there are batch_size choice instances in each ChoiceDataset.

The dataset consists of: (1) a collection of batch_size tuples (item_id, user_id, session_id, label), where each tuple is a choice instance. (2) a collection of observables associated with item, user, session, etc.

Parameters:

Name Type Description Default
item_index torch.LongTensor

a tensor of shape (batch_size) indicating the relevant item in each row of the dataset, the relevant item can be: (1) the item bought in this choice instance, (2) or the item reviewed by the user. In the later case, we need the label tensor to specify the rating score. NOTE: The support for second case is under-development, currently, we are only supporting binary label.

required
num_items Optional[int]

the number of items in the dataset. If None is provided (default), the number of items will be inferred from the number of unique numbers in item_index.

None
num_users Optional[int]

the number of users in the dataset. If None is provided (default), the number of users will be inferred from the number of unique numbers in user_index.

None
num_sessions Optional[int]

the number of sessions in the dataset. If None is provided (default), the number of sessions will be inferred from the number of unique numbers in session_index.

None
label Optional[torch.LongTensor]

a tensor of shape (batch_size) indicating the label for prediction in each choice instance. While you want to predict the item bought, you can leave the label argument as None in the initialization method, and the model will use item_index as the object to be predicted. But if you are, for example, predicting the rating an user gave an item, label must be provided. Defaults to None.

None
user_index Optional[torch.LongTensor]

a tensor of shape num_purchases (batch_size) indicating the ID of the user who was involved in each choice instance. If None user index is provided, it's assumed that the choice instances are from the same user. user_index is required if and only if there are multiple users in the dataset, for example: (1) user-observables is involved in the utility form, (2) and/or the coefficient is user-specific. This tensor is used to select the corresponding user observables and coefficients assigned to the user (like theta_user) for making prediction for that purchase. Defaults to None.

None
session_index Optional[torch.LongTensor]

a tensor of shape num_purchases (batch_size) indicating the ID of the session when that choice instance occurred. This tensor is used to select the correct session observables or price observables for making prediction for that choice instance. Therefore, if there is no session/price observables, you can leave this argument as None. In this case, the ChoiceDataset object will assume each choice instance to be in its own session. Defaults to None.

None
item_availability Optional[torch.BoolTensor]

A boolean tensor of shape (num_sessions, num_items) indicating the availability of each item in each session. Utilities of unavailable items would be set to -infinite, and hence these unavailable items will be set to 0 while making prediction. We assume all items are available if set to None. Defaults to None.

None

Other Kwargs (Observables): One can specify the following types of observables, where * in shape denotes any positive integer. Typically * represents the number of observables. Please refer to the documentation for a detailed guide to use observables. 1. user observables must start with 'user_' and have shape (num_users, ) 2. item observables must start with 'item_' and have shape (num_items, ) 3. session observables must start with 'session_' and have shape (num_sessions, ) 4. taste observables (those vary by user and item) must start with taste_ and have shape (num_users, num_items, ). NOTE: we don't recommend using taste observables, because num_users * num_items is potentially large. 5. price observables (those vary by session and item) must start with price_ and have shape (num_sessions, num_items, *) 6. itemsession observables starting with itemsession_, this is a more intuitive alias to the price observable.

Source code in torch_choice/data/choice_dataset.py
def __init__(self,
             item_index: torch.LongTensor,
             num_items: int = None,
             num_users: int = None,
             num_sessions: int = None,
             label: Optional[torch.LongTensor] = None,
             user_index: Optional[torch.LongTensor] = None,
             session_index: Optional[torch.LongTensor] = None,
             item_availability: Optional[torch.BoolTensor] = None,
             **kwargs) -> None:
    """
    Initialization methods for the dataset object, researchers should supply all information about the dataset
    using this initialization method.

    The number of choice instances are called `batch_size` in the documentation. The `batch_size` corresponds to the
    file length in wide-format dataset, and often denoted using `N`. We call it `batch_size` to follow the convention
    in machine learning literature.
    A `choice instance` is a row of the dataset, so there are `batch_size` choice instances in each `ChoiceDataset`.

    The dataset consists of:
    (1) a collection of `batch_size` tuples (item_id, user_id, session_id, label), where each tuple is a choice instance.
    (2) a collection of `observables` associated with item, user, session, etc.

    Args:
        item_index (torch.LongTensor): a tensor of shape (batch_size) indicating the relevant item in each row
            of the dataset, the relevant item can be:
            (1) the item bought in this choice instance,
            (2) or the item reviewed by the user. In the later case, we need the `label` tensor to specify the rating score.
            NOTE: The support for second case is under-development, currently, we are only supporting binary label.

        num_items (Optional[int]): the number of items in the dataset. If `None` is provided (default), the number of items will be inferred from the number of unique numbers in `item_index`.

        num_users (Optional[int]): the number of users in the dataset. If `None` is provided (default), the number of users will be inferred from the number of unique numbers in `user_index`.

        num_sessions (Optional[int]): the number of sessions in the dataset. If `None` is provided (default), the number of sessions will be inferred from the number of unique numbers in `session_index`.

        label (Optional[torch.LongTensor], optional): a tensor of shape (batch_size) indicating the label for prediction in
            each choice instance. While you want to predict the item bought, you can leave the `label` argument
            as `None` in the initialization method, and the model will use `item_index` as the object to be predicted.
            But if you are, for example, predicting the rating an user gave an item, label must be provided.
            Defaults to None.

        user_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
            the ID of the user who was involved in each choice instance. If `None` user index is provided, it's assumed
            that the choice instances are from the same user.
            `user_index` is required if and only if there are multiple users in the dataset, for example:
                (1) user-observables is involved in the utility form,
                (2) and/or the coefficient is user-specific.
            This tensor is used to select the corresponding user observables and coefficients assigned to the
            user (like theta_user) for making prediction for that purchase.
            Defaults to None.

        session_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
            the ID of the session when that choice instance occurred. This tensor is used to select the correct
            session observables or price observables for making prediction for that choice instance. Therefore, if
            there is no session/price observables, you can leave this argument as `None`. In this case, the `ChoiceDataset`
            object will assume each choice instance to be in its own session.
            Defaults to None.

        item_availability (Optional[torch.BoolTensor], optional): A boolean tensor of shape (num_sessions, num_items)
            indicating the availability of each item in each session. Utilities of unavailable items would be set to -infinite,
            and hence these unavailable items will be set to 0 while making prediction.
            We assume all items are available if set to None.
            Defaults to None.

    Other Kwargs (Observables):
        One can specify the following types of observables, where * in shape denotes any positive
            integer. Typically * represents the number of observables.
        Please refer to the documentation for a detailed guide to use observables.
        1. user observables must start with 'user_' and have shape (num_users, *)
        2. item observables must start with 'item_' and have shape (num_items, *)
        3. session observables must start with 'session_' and have shape (num_sessions, *)
        4. taste observables (those vary by user and item) must start with `taste_` and have shape
            (num_users, num_items, *).
        NOTE: we don't recommend using taste observables, because num_users * num_items is potentially large.
        5. price observables (those vary by session and item) must start with `price_` and have
            shape (num_sessions, num_items, *)
        6. itemsession observables starting with `itemsession_`, this is a more intuitive alias to the price
            observable.
    """
    # ENHANCEMENT(Tianyu): add item_names for summary.
    super(ChoiceDataset, self).__init__()
    self.label = label
    self.item_index = item_index
    self._num_items = num_items
    self._num_users = num_users
    self._num_sessions = num_sessions

    self.user_index = user_index
    self.session_index = session_index

    if self.session_index is None:
        # if any([x.startswith('session_') or x.startswith('price_') for x in kwargs.keys()]):
        # if any session sensitive observable is provided, but session index is not,
        # infer each row in the dataset to be a session.
        # TODO: (design choice) should we assign unique session index to each choice instance or the same session index.
        print('No `session_index` is provided, assume each choice instance is in its own session.')
        self.session_index = torch.arange(len(self.item_index)).long()

    self.item_availability = item_availability

    for key, item in kwargs.items():
        if self._is_attribute(key):
            # all observable should be float.
            item = item.float()
        setattr(self, key, item)

    # TODO: add a validation procedure to check the consistency of the dataset.

__len__(self) special

Returns number of samples in this dataset.

Returns:

Type Description
int

length of the dataset.

Source code in torch_choice/data/choice_dataset.py
def __len__(self) -> int:
    """Returns number of samples in this dataset.

    Returns:
        int: length of the dataset.
    """
    return len(self.item_index)

__repr__(self) special

A method to get a string representation of the dataset.

Returns:

Type Description
str

the string representation of the dataset.

Source code in torch_choice/data/choice_dataset.py
def __repr__(self) -> str:
    """A method to get a string representation of the dataset.

    Returns:
        str: the string representation of the dataset.
    """
    # don't print shapes of internal attributes like _num_users and _num_items.
    info = [f'{key}={self._size_repr(item)}' for key, item in self.__dict__.items() if not key.startswith('_')]
    return f"{self.__class__.__name__}(num_items={self.num_items}, num_users={self.num_users}, num_sessions={self.num_sessions}, {', '.join(info)}, device={self.device})"

apply_tensor(self, func)

This s a helper method to apply the provided function to all tensors and tensor values of all dictionaries.

Parameters:

Name Type Description Default
func callable

a callable function to be applied on tensors and tensor-values of dictionaries.

required

Returns:

Type Description
ChoiceDataset

the modified dataset.

Source code in torch_choice/data/choice_dataset.py
def apply_tensor(self, func: callable) -> "ChoiceDataset":
    """This s a helper method to apply the provided function to all tensors and tensor values of all dictionaries.

    Args:
        func (callable): a callable function to be applied on tensors and tensor-values of dictionaries.

    Returns:
        ChoiceDataset: the modified dataset.
    """
    for key, item in self.__dict__.items():
        if torch.is_tensor(item):
            setattr(self, key, func(item))
        # boardcast func to dictionary of tensors as well.
        elif isinstance(getattr(self, key), dict):
            for obj_key, obj_item in getattr(self, key).items():
                if torch.is_tensor(obj_item):
                    setattr(getattr(self, key), obj_key, func(obj_item))
    return self

clone(self)

Creates a copy of self.

Returns:

Type Description
ChoiceDataset

a copy of self.

Source code in torch_choice/data/choice_dataset.py
def clone(self) -> "ChoiceDataset":
    """Creates a copy of self.

    Returns:
        ChoiceDataset: a copy of self.
    """
    dictionary = {}
    for k, v in self.__dict__.items():
        if torch.is_tensor(v):
            dictionary[k] = v.clone()
        else:
            dictionary[k] = copy.deepcopy(v)
    new = self.__class__._from_dict(dictionary)
    new._num_users = self.num_users
    new._num_items = self.num_items
    new._num_sessions = self.num_sessions
    return new

summary(self)

A method to summarize the dataset.

Returns:

Type Description
str

the string representation of the dataset.

Source code in torch_choice/data/choice_dataset.py
def summary(self) -> None:
    """A method to summarize the dataset.

    Returns:
        str: the string representation of the dataset.
    """
    summary = ['ChoiceDataset with {} sessions, {} items, {} users, {} purchase records (observations) .'.format(
        self.num_sessions, self.num_items, self.num_users if self.user_index is not None else 'single', len(self))]

    # summarize users.
    if self.user_index is not None:
        unique, counts = self.unique(self.user_index)
        summary.append(f"The most frequent user is {unique[0]} with {counts[0]} observations; the least frequent user is {unique[-1]} with {counts[-1]} observations; on average, there are {counts.astype(float).mean():.2f} observations per user.")

        N = len(unique)
        K = min(5, N)
        string = f'{K} most frequent users are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
        summary.append(string)
        string = f'{K} least frequent users are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
        summary.append(string)

    # summarize items.
    unique, counts = self.unique(self.item_index)
    N = len(unique)
    K = min(5, N)
    summary.append(f"The most frequent item is {unique[0]}, it was chosen {counts[0]} times; the least frequent item is {unique[-1]} it was {counts[-1]} times; on average, each item was purchased {counts.astype(float).mean():.2f} times.")

    string = f'{K} most frequent items are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
    summary.append(string)
    string = f'{K} least frequent items are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
    summary.append(string)

    summary.append('Attribute Summaries:')
    for key, item in self.__dict__.items():
        if self._is_attribute(key) and torch.is_tensor(item):
            summary.append("Observable Tensor '{}' with shape {}".format(key, item.shape))
            # price attributes are 3-dimensional tensors, ignore  for cleanness here.
            if (not self._is_price_attribute(key)) and (not self._is_usersessionitem_attribute(key)) and (not self._is_useritem_attribute(key)) and (not self._is_usersession_attribute(key)):
                summary.append(str(pd.DataFrame(item.to('cpu').float().numpy()).describe()))
    print('\n'.join(summary) + f"\ndevice={self.device}")
    return None

to(self, device)

Moves all tensors in this dataset to the specified PyTorch device.

Parameters:

Name Type Description Default
device Union[str, torch.device]

the destination device.

required

Returns:

Type Description
ChoiceDataset

the modified dataset on the new device.

Source code in torch_choice/data/choice_dataset.py
def to(self, device: Union[str, torch.device]) -> "ChoiceDataset":
    """Moves all tensors in this dataset to the specified PyTorch device.

    Args:
        device (Union[str, torch.device]): the destination device.

    Returns:
        ChoiceDataset: the modified dataset on the new device.
    """
    return self.apply_tensor(lambda x: x.to(device))

A helper class for joining several pytorch datasets, using JointDataset and pytorch data loader allows for sampling the same batch index from several datasets.

The JointDataset class is a wrapper for the torch.utils.data.ChoiceDataset class, it is particularly useful when we need to make prediction from multiple datasets. For example, you have data on consumer purchase records in a fast food store, and suppose every customer will purchase exactly a single main food and a single drink. In this case, you have two separate datasets: FoodDataset and DrinkDataset. You may want to use PyTorch sampler to sample them in a dependent manner: you want to take the i-th sample from both datasets, so that you know what (food, drink) combo the i-th customer purchased. You can do this by using the JointDataset class.

Source code in torch_choice/data/joint_dataset.py
class JointDataset(torch.utils.data.Dataset):
    """A helper class for joining several pytorch datasets, using JointDataset
    and pytorch data loader allows for sampling the same batch index from several
    datasets.

    The JointDataset class is a wrapper for the torch.utils.data.ChoiceDataset class, it is particularly useful when we
    need to make prediction from multiple datasets. For example, you have data on consumer purchase records in a fast food
    store, and suppose every customer will purchase exactly a single main food and a single drink. In this case, you have
    two separate datasets: FoodDataset and DrinkDataset. You may want to use PyTorch sampler to sample them in a dependent
    manner: you want to take the i-th sample from both datasets, so that you know what (food, drink) combo the i-th customer
    purchased. You can do this by using the JointDataset class.
    """
    def __init__(self, **datasets) -> None:
        """The initialize methods.

        Args:
            Arbitrarily many datasets with arbitrary names as keys. In the example above, you can construct
            ```
            dataset = JointDataset(food=FoodDataset, drink=DrinkDataset)
            ```
            All datasets should have the same length.

        """
        super(JointDataset, self).__init__()
        self.datasets = datasets
        # check the length of sub-datasets are the same.
        assert len(set([len(d) for d in self.datasets.values()])) == 1

    def __len__(self) -> int:
        """Get the number of samples in the joint dataset.

        Returns:
            int: the number of samples in the joint dataset, which is the same as the number of samples in each dataset contained.
        """
        for d in self.datasets.values():
            return len(d)

    def __getitem__(self, indices: Union[int, torch.LongTensor]) -> Dict[str, ChoiceDataset]:
        """Queries samples from the dataset by index.

        Args:
            indices (Union[int, torch.LongTensor]): an integer or a 1D tensor of multiple indices.

        Returns:
            Dict[str, ChoiceDataset]: the subset of the dataset. Keys of the dictionary will be names of each dataset
                contained (the same as the keys of the ``datasets`` argument in the constructor). Values will be subsets
                of contained datasets, sliced using the provided indices.
        """
        return dict((name, d[indices]) for (name, d) in self.datasets.items())

    def __repr__(self) -> str:
        """A method to get a string representation of the dataset.

        Returns:
            str: the string representation of the dataset.
        """
        out = [f'JointDataset with {len(self.datasets)} sub-datasets: (']
        for name, dataset in self.datasets.items():
            out.append(f'\t{name}: {str(dataset)}')
        out.append(')')
        return '\n'.join(out)

    @property
    def device(self) -> str:
        """Returns the device of datasets contained in the joint dataset.

        Returns:
            str: the device of the dataset.
        """
        for d in self.datasets.values():
            return d.device

    def to(self, device: Union[str, torch.device]) -> "JointDataset":
        """Moves all datasets in this dataset to the specified PyTorch device.

        Args:
            device (Union[str, torch.device]): the destination device.

        Returns:
            ChoiceDataset: the modified dataset on the new device.
        """
        for d in self.datasets.values():
            d = d.to(device)
        return self

    def clone(self) -> "JointDataset":
        """Returns a copy of the dataset.

        Returns:
            JointDataset: a copy of the dataset.
        """
        return JointDataset(**{name: d.clone() for (name, d) in self.datasets.items()})

    @property
    def item_index(self) -> torch.LongTensor:
        """Returns the current index of each dataset.

        Returns:
            torch.LongTensor: the indices of items chosen.
        """
        return self.datasets["item"].item_index

device: str property readonly

Returns the device of datasets contained in the joint dataset.

Returns:

Type Description
str

the device of the dataset.

item_index: LongTensor property readonly

Returns the current index of each dataset.

Returns:

Type Description
torch.LongTensor

the indices of items chosen.

__getitem__(self, indices) special

Queries samples from the dataset by index.

Parameters:

Name Type Description Default
indices Union[int, torch.LongTensor]

an integer or a 1D tensor of multiple indices.

required

Returns:

Type Description
Dict[str, ChoiceDataset]

the subset of the dataset. Keys of the dictionary will be names of each dataset contained (the same as the keys of the datasets argument in the constructor). Values will be subsets of contained datasets, sliced using the provided indices.

Source code in torch_choice/data/joint_dataset.py
def __getitem__(self, indices: Union[int, torch.LongTensor]) -> Dict[str, ChoiceDataset]:
    """Queries samples from the dataset by index.

    Args:
        indices (Union[int, torch.LongTensor]): an integer or a 1D tensor of multiple indices.

    Returns:
        Dict[str, ChoiceDataset]: the subset of the dataset. Keys of the dictionary will be names of each dataset
            contained (the same as the keys of the ``datasets`` argument in the constructor). Values will be subsets
            of contained datasets, sliced using the provided indices.
    """
    return dict((name, d[indices]) for (name, d) in self.datasets.items())

__init__(self, **datasets) special

The initialize methods.

Source code in torch_choice/data/joint_dataset.py
def __init__(self, **datasets) -> None:
    """The initialize methods.

    Args:
        Arbitrarily many datasets with arbitrary names as keys. In the example above, you can construct
        ```
        dataset = JointDataset(food=FoodDataset, drink=DrinkDataset)
        ```
        All datasets should have the same length.

    """
    super(JointDataset, self).__init__()
    self.datasets = datasets
    # check the length of sub-datasets are the same.
    assert len(set([len(d) for d in self.datasets.values()])) == 1

__len__(self) special

Get the number of samples in the joint dataset.

Returns:

Type Description
int

the number of samples in the joint dataset, which is the same as the number of samples in each dataset contained.

Source code in torch_choice/data/joint_dataset.py
def __len__(self) -> int:
    """Get the number of samples in the joint dataset.

    Returns:
        int: the number of samples in the joint dataset, which is the same as the number of samples in each dataset contained.
    """
    for d in self.datasets.values():
        return len(d)

__repr__(self) special

A method to get a string representation of the dataset.

Returns:

Type Description
str

the string representation of the dataset.

Source code in torch_choice/data/joint_dataset.py
def __repr__(self) -> str:
    """A method to get a string representation of the dataset.

    Returns:
        str: the string representation of the dataset.
    """
    out = [f'JointDataset with {len(self.datasets)} sub-datasets: (']
    for name, dataset in self.datasets.items():
        out.append(f'\t{name}: {str(dataset)}')
    out.append(')')
    return '\n'.join(out)

clone(self)

Returns a copy of the dataset.

Returns:

Type Description
JointDataset

a copy of the dataset.

Source code in torch_choice/data/joint_dataset.py
def clone(self) -> "JointDataset":
    """Returns a copy of the dataset.

    Returns:
        JointDataset: a copy of the dataset.
    """
    return JointDataset(**{name: d.clone() for (name, d) in self.datasets.items()})

to(self, device)

Moves all datasets in this dataset to the specified PyTorch device.

Parameters:

Name Type Description Default
device Union[str, torch.device]

the destination device.

required

Returns:

Type Description
ChoiceDataset

the modified dataset on the new device.

Source code in torch_choice/data/joint_dataset.py
def to(self, device: Union[str, torch.device]) -> "JointDataset":
    """Moves all datasets in this dataset to the specified PyTorch device.

    Args:
        device (Union[str, torch.device]): the destination device.

    Returns:
        ChoiceDataset: the modified dataset on the new device.
    """
    for d in self.datasets.values():
        d = d.to(device)
    return self

The more generalized version of conditional logit model, the model allows for research specific variable types(groups) and different levels of variations for coefficient.

The model allows for the following levels for variable variations: !!! note "unless the -full flag is specified (which means we want to explicitly model coefficients" for all items), for all variation levels related to item (item specific and user-item specific), the model force coefficients for the first item to be zero. This design follows standard econometric practice.

  • constant: constant over all users and items,

  • user: user-specific parameters but constant across all items,

  • item: item-specific parameters but constant across all users, parameters for the first item are forced to be zero.

  • item-full: item-specific parameters but constant across all users, explicitly model for all items.

  • user-item: parameters that are specific to both user and item, parameter for the first item for all users are forced to be zero.

  • user-item-full: parameters that are specific to both user and item, explicitly model for all items.
Source code in torch_choice/model/conditional_logit_model.py
class ConditionalLogitModel(nn.Module):
    """The more generalized version of conditional logit model, the model allows for research specific
    variable types(groups) and different levels of variations for coefficient.

    The model allows for the following levels for variable variations:
    NOTE: unless the `-full` flag is specified (which means we want to explicitly model coefficients
        for all items), for all variation levels related to item (item specific and user-item specific),
        the model force coefficients for the first item to be zero. This design follows standard
        econometric practice.

    - constant: constant over all users and items,

    - user: user-specific parameters but constant across all items,

    - item: item-specific parameters but constant across all users, parameters for the first item are
        forced to be zero.
    - item-full: item-specific parameters but constant across all users, explicitly model for all items.

    - user-item: parameters that are specific to both user and item, parameter for the first item
        for all users are forced to be zero.
    - user-item-full: parameters that are specific to both user and item, explicitly model for all items.
    """

    def __init__(self,
                 formula: Optional[str]=None,
                 dataset: Optional[ChoiceDataset]=None,
                 coef_variation_dict: Optional[Dict[str, str]]=None,
                 num_param_dict: Optional[Dict[str, int]]=None,
                 num_items: Optional[int]=None,
                 num_users: Optional[int]=None,
                 regularization: Optional[str]=None,
                 regularization_weight: Optional[float]=None,
                 weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
                 model_outside_option: Optional[bool]=False
                 ) -> None:
        """
        Args:
            formula (str): a string representing the utility formula.
                The formula consists of '(variable_name|variation)'s separated by '+', for example:
                "(var1|item) + (var2|user) + (var3|constant)"
                where the first part of each term is the name of the variable
                and the second part is the variation of the coefficient.
                The variation can be one of the following:
                'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
                All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
            data (ChoiceDataset): a ChoiceDataset object for training the model, the parser will infer dimensions of variables
                and sizes of coefficients from the ChoiceDataset.
            coef_variation_dict (Dict[str, str]): variable type to variation level dictionary. Keys of this dictionary
                should be variable names in the dataset (i.e., these starting with `itemsession_`, `price_`, `user_`, etc), or `intercept`
                if the researcher requires an intercept term.
                For each variable name X_var (e.g., `user_income`) or `intercept`, the corresponding dictionary key should
                be one of the following values, this value specifies the "level of variation" of the coefficient.

                - `constant`: the coefficient constant over all users and items: $X \beta$.

                - `user`: user-specific parameters but constant across all items: $X \beta_{u}$.

                - `item`: item-specific parameters but constant across all users, $X \beta_{i}$.
                    Note that the coefficients for the first item are forced to be zero following the standard practice
                    in econometrics.

                - `item-full`: the same configuration as `item`, but does not force the coefficients of the first item to
                    be zeros.

                The following configurations are supported by the package, but we don't recommend using them due to the
                    large number of parameters.
                - `user-item`: parameters that are specific to both user and item, parameter for the first item
                    for all users are forced to be zero.

                - `user-item-full`: parameters that are specific to both user and item, explicitly model for all items.
            num_param_dict (Optional[Dict[str, int]]): variable type to number of parameters dictionary with keys exactly the same
                as the `coef_variation_dict`. Values of `num_param_dict` records numbers of features in each kind of variable.
                If None is supplied, num_param_dict will be a dictionary with the same keys as the `coef_variation_dict` dictionary
                and values of all ones. Default to be None.
            num_items (int): number of items in the dataset.
            num_users (int): number of users in the dataset.
            regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
                regularization added to the log-likelihood.
                - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
                - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
                - None does not modify the log-likelihood.
                Defaults to None.
            regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
                This term controls the strength of regularization. This argument is required if and only if regularization
                is not None.
                Defaults to None.
            weight_initialization (Optional[Union[str, Dict[str, str]]]): controls for how coefficients are initialized;
                users can pass a string from {'normal', 'uniform', 'zero'} to initialize all coefficients in the same way.
                Alternatively, users can pass a dictionary with keys exactly the same as the `coef_variation_dict` dictionary,
                and values from {'normal', 'uniform', 'zero'} to initialize coefficients of different types of variables differently.
                By default, all coefficients are initialized following a standard normal distribution.
            model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
                To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
                In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
                Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
                The utility of the outside option is always set to 0 while computing the probability.
                By default, model_outside_option is set to False and the model does not model the outside option.
        """
        # ==============================================================================================================
        # Check that the model received a valid combination of inputs so that it can be initialized.
        # ==============================================================================================================
        if coef_variation_dict is None and formula is None:
            raise ValueError("Either coef_variation_dict or formula should be provided to specify the model.")

        if (coef_variation_dict is not None) and (formula is not None):
            raise ValueError("Only one of coef_variation_dict or formula should be provided to specify the model.")

        if (formula is not None) and (dataset is None):
            raise ValueError("If formula is provided, data should be provided to specify the model.")


        # ==============================================================================================================
        # Build necessary dictionaries for model initialization.
        # ==============================================================================================================
        if formula is None:
            # Use dictionaries to initialize the model.
            if num_param_dict is None:
                warnings.warn("`num_param_dict` is not provided, all variables will be treated as having one parameter.")
                num_param_dict = {key:1 for key in coef_variation_dict.keys()}

            assert coef_variation_dict.keys() == num_param_dict.keys()

            # variable `var` with variation `spec` to variable `var[spec]`.
            rename = dict()  # old variable name --> new variable name.
            for variable, specificity in coef_variation_dict.items():
                rename[variable] = f"{variable}[{specificity}]"

            for old_name, new_name in rename.items():
                coef_variation_dict[new_name] = coef_variation_dict.pop(old_name)
                num_param_dict[new_name] = num_param_dict.pop(old_name)
        else:
            # Use the formula to infer model.
            coef_variation_dict, num_param_dict = parse_formula(formula, dataset)

        # ==============================================================================================================
        # Model Initialization.
        # ==============================================================================================================
        super(ConditionalLogitModel, self).__init__()

        self.coef_variation_dict = deepcopy(coef_variation_dict)
        self.num_param_dict = deepcopy(num_param_dict)

        self.num_items = num_items
        self.num_users = num_users

        self.regularization = deepcopy(regularization)
        assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
        self.regularization_weight = regularization_weight
        if (self.regularization is not None) and (self.regularization_weight is None):
            raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
        if (self.regularization is None) and (self.regularization_weight is not None):
            raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')

        # check number of parameters specified are all positive.
        for var_type, num_params in self.num_param_dict.items():
            assert num_params > 0, f'num_params needs to be positive, got: {num_params}.'

        # infer the number of parameters for intercept if the researcher forgets.
        for variable in self.coef_variation_dict.keys():
            if self.is_intercept_term(variable) and variable not in self.num_param_dict.keys():
                warnings.warn(f"`{variable}` key found in coef_variation_dict but not in num_param_dict, num_param_dict['{variable}'] has been set to 1.")
                self.num_param_dict[variable] = 1

        # inform coefficients their ways of being initialized.
        self.weight_initialization = deepcopy(weight_initialization)

        # construct trainable parameters.
        coef_dict = dict()
        for var_type, variation in self.coef_variation_dict.items():
            if isinstance(self.weight_initialization, dict):
                if var_type.split('[')[0] in self.weight_initialization.keys():
                    # use the variable-specific initialization if provided.
                    init = self.weight_initialization[var_type.split('[')[0]]
                else:
                    # use default initialization.
                    init = None
            else:
                # initialize all coefficients in the same way.
                init = self.weight_initialization

            coef_dict[var_type] = Coefficient(variation=variation,
                                              num_items=self.num_items,
                                              num_users=self.num_users,
                                              num_params=self.num_param_dict[var_type],
                                              init=init)
        # A ModuleDict is required to properly register all trainable parameters.
        # self.parameter() will fail if a python dictionary is used instead.
        self.coef_dict = nn.ModuleDict(coef_dict)
        self.model_outside_option = model_outside_option

    def __repr__(self) -> str:
        """Return a string representation of the model.

        Returns:
            str: the string representation of the model.
        """
        out_str_lst = ['Conditional logistic discrete choice model, expects input features:\n']
        for var_type, num_params in self.num_param_dict.items():
            out_str_lst.append(f'X[{var_type}] with {num_params} parameters, with {self.coef_variation_dict[var_type]} level variation.')
        return super().__repr__() + '\n' + '\n'.join(out_str_lst) + '\n' + f'device={self.device}'

    @property
    def num_params(self) -> int:
        """Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied
        with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no
        intercept is involved.

        Returns:
            int: the total number of learnable parameters.
        """
        return sum(w.numel() for w in self.parameters())

    def summary(self):
        """Print out the current model parameter."""
        for var_type, coefficient in self.coef_dict.items():
            if coefficient is not None:
                print('Variable Type: ', var_type)
                print(coefficient.coef)

    def forward(self,
                batch: ChoiceDataset,
                manual_coef_value_dict: Optional[Dict[str, torch.Tensor]] = None
                ) -> torch.Tensor:
        """
        Forward pass of the model.

        Args:
            batch: a `ChoiceDataset` object.

            manual_coef_value_dict (Optional[Dict[str, torch.Tensor]], optional): a dictionary with
                keys in {'u', 'i'} etc and tensors as values. If provided, the model will force
                coefficient to be the provided values and compute utility conditioned on the provided
                coefficient values. This feature is useful when the research wishes to plug in particular
                values of coefficients and examine the utility values. If not provided, the model will
                use the learned coefficient values in self.coef_dict.
                Defaults to None.

        Returns:
            torch.Tensor: a tensor of shape (num_trips, num_items) whose (t, i) entry represents
                the utility from item i in trip t for the user involved in that trip.
        """
        x_dict = batch.x_dict

        for variable in self.coef_variation_dict.keys():
            if self.is_intercept_term(variable):
                # intercept term has no input tensor from the ChoiceDataset data structure.
                # the tensor for intercept has only 1 feature, every entry is 1.
                x_dict['intercept'] = torch.ones((len(batch), self.num_items, 1), device=batch.device)
                break

        # compute the utility from each item in each choice session.
        total_utility = torch.zeros((len(batch), self.num_items), device=batch.device)
        # for each type of variables, apply the corresponding coefficient to input x.

        for var_type, coef in self.coef_dict.items():
            # variable type is named as "observable_name[variation]", retrieve the corresponding observable name.
            corresponding_observable = var_type.split("[")[0]
            total_utility += coef(
                x_dict[corresponding_observable],
                batch.user_index,
                manual_coef_value=None if manual_coef_value_dict is None else manual_coef_value_dict[var_type])

        assert total_utility.shape == (len(batch), self.num_items)

        if batch.item_availability is not None:
            # mask out unavailable items.
            total_utility[~batch.item_availability[batch.session_index, :]] = torch.finfo(total_utility.dtype).min / 2

        # accommodate the outside option.
        if self.model_outside_option:
            # the outside option has zero utility.
            util_zero = torch.zeros(total_utility.size(0), 1, device=batch.device)  # (len(batch), 1)  zero tensor.
            # outside option is indicated by item_index == -1, we put it at the end.
            total_utility = torch.cat((total_utility, util_zero), dim=1)  # (len(batch), num_items+1)
        return total_utility


    def negative_log_likelihood(self, batch: ChoiceDataset, y: torch.Tensor, is_train: bool=True) -> torch.Tensor:
        """Computes the log-likelihood for the batch and label.
        TODO: consider remove y, change to label.
        TODO: consider move this method outside the model, the role of the model is to compute the utility.

        Args:
            batch (ChoiceDataset): a ChoiceDataset object containing the data.
            y (torch.Tensor): the label.
            is_train (bool, optional): whether to trace the gradient. Defaults to True.

        Returns:
            torch.Tensor: the negative log-likelihood.
        """
        if is_train:
            self.train()
        else:
            self.eval()
        # (num_trips, num_items)
        total_utility = self.forward(batch)
        # check shapes.
        if self.model_outside_option:
            assert total_utility.shape == (len(batch), self.num_items+1)
            assert torch.all(total_utility[:, -1] == 0), "The last column of total_utility should be all zeros, which corresponds to the outside option."
        else:
            assert total_utility.shape == (len(batch), self.num_items)
        logP = torch.log_softmax(total_utility, dim=1)
        # since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
        # indexing should correctly retrieve the log-likelihood even for outside options.
        nll = - logP[torch.arange(len(y)), y].sum()
        return nll

    def loss(self, *args, **kwargs):
        """The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
        nll = self.negative_log_likelihood(*args, **kwargs)
        if self.regularization is not None:
            L = {'L1': 1, 'L2': 2}[self.regularization]
            for param in self.parameters():
                nll += self.regularization_weight * torch.norm(param, p=L)
        return nll

    @property
    def device(self) -> torch.device:
        """Returns the device of the coefficient.

        Returns:
            torch.device: the device of the model.
        """
        return next(iter(self.coef_dict.values())).device

    @staticmethod
    def is_intercept_term(variable: str):
        # check if the given variable is an intercept (fixed effect) term.
        # intercept (fixed effect) terms are defined as 'intercept[*]' and looks like 'intercept[user]', 'intercept[item]', etc.
        return (variable.startswith('intercept[') and variable.endswith(']'))

    def get_coefficient(self, variable: str) -> torch.Tensor:
        """Retrieve the coefficient tensor for the given variable.

        Args:
            variable (str): the variable name.

        Returns:
            torch.Tensor: the corresponding coefficient tensor of the requested variable.
        """
        return self.state_dict()[f"coef_dict.{variable}.coef"].detach().clone()

device: device property readonly

Returns the device of the coefficient.

Returns:

Type Description
torch.device

the device of the model.

num_params: int property readonly

Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no intercept is involved.

Returns:

Type Description
int

the total number of learnable parameters.

__init__(self, formula=None, dataset=None, coef_variation_dict=None, num_param_dict=None, num_items=None, num_users=None, regularization=None, regularization_weight=None, weight_initialization=None, model_outside_option=False) special

Parameters:

Name Type Description Default
formula str

a string representing the utility formula. The formula consists of '(variable_name|variation)'s separated by '+', for example: "(var1|item) + (var2|user) + (var3|constant)" where the first part of each term is the name of the variable and the second part is the variation of the coefficient. The variation can be one of the following: 'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'. All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.

None
data ChoiceDataset

a ChoiceDataset object for training the model, the parser will infer dimensions of variables and sizes of coefficients from the ChoiceDataset.

required
coef_variation_dict Dict[str, str]

variable type to variation level dictionary. Keys of this dictionary should be variable names in the dataset (i.e., these starting with itemsession_, price_, user_, etc), or intercept if the researcher requires an intercept term. For each variable name X_var (e.g., user_income) or intercept, the corresponding dictionary key should be one of the following values, this value specifies the "level of variation" of the coefficient.

  • constant: the coefficient constant over all users and items: \(X eta\).

  • user: user-specific parameters but constant across all items: \(X eta_{u}\).

  • item: item-specific parameters but constant across all users, \(X eta_{i}\). Note that the coefficients for the first item are forced to be zero following the standard practice in econometrics.

  • item-full: the same configuration as item, but does not force the coefficients of the first item to be zeros.

The following configurations are supported by the package, but we don't recommend using them due to the large number of parameters. - user-item: parameters that are specific to both user and item, parameter for the first item for all users are forced to be zero.

  • user-item-full: parameters that are specific to both user and item, explicitly model for all items.
None
num_param_dict Optional[Dict[str, int]]

variable type to number of parameters dictionary with keys exactly the same as the coef_variation_dict. Values of num_param_dict records numbers of features in each kind of variable. If None is supplied, num_param_dict will be a dictionary with the same keys as the coef_variation_dict dictionary and values of all ones. Default to be None.

None
num_items int

number of items in the dataset.

None
num_users int

number of users in the dataset.

None
regularization Optional[str]

this argument takes values from {'L1', 'L2', None}, which specifies the type of regularization added to the log-likelihood. - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood. - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood. - None does not modify the log-likelihood. Defaults to None.

None
regularization_weight Optional[float]

the weight of parameter norm subtracted from the log-likelihood. This term controls the strength of regularization. This argument is required if and only if regularization is not None. Defaults to None.

None
weight_initialization Optional[Union[str, Dict[str, str]]]

controls for how coefficients are initialized; users can pass a string from {'normal', 'uniform', 'zero'} to initialize all coefficients in the same way. Alternatively, users can pass a dictionary with keys exactly the same as the coef_variation_dict dictionary, and values from {'normal', 'uniform', 'zero'} to initialize coefficients of different types of variables differently. By default, all coefficients are initialized following a standard normal distribution.

None
model_outside_option Optional[bool]

whether to explicitly model the outside option (i.e., the consumer did not buy anything). To enable modeling outside option, the outside option is indicated by item_index[n] == -1 in the item-index-tensor. In this case, the item-index-tensor can contain values in {-1, 0, 1, ..., num_items-1}. Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in {0, 1, ..., num_items-1}. The utility of the outside option is always set to 0 while computing the probability. By default, model_outside_option is set to False and the model does not model the outside option.

False
Source code in torch_choice/model/conditional_logit_model.py
def __init__(self,
             formula: Optional[str]=None,
             dataset: Optional[ChoiceDataset]=None,
             coef_variation_dict: Optional[Dict[str, str]]=None,
             num_param_dict: Optional[Dict[str, int]]=None,
             num_items: Optional[int]=None,
             num_users: Optional[int]=None,
             regularization: Optional[str]=None,
             regularization_weight: Optional[float]=None,
             weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
             model_outside_option: Optional[bool]=False
             ) -> None:
    """
    Args:
        formula (str): a string representing the utility formula.
            The formula consists of '(variable_name|variation)'s separated by '+', for example:
            "(var1|item) + (var2|user) + (var3|constant)"
            where the first part of each term is the name of the variable
            and the second part is the variation of the coefficient.
            The variation can be one of the following:
            'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
            All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
        data (ChoiceDataset): a ChoiceDataset object for training the model, the parser will infer dimensions of variables
            and sizes of coefficients from the ChoiceDataset.
        coef_variation_dict (Dict[str, str]): variable type to variation level dictionary. Keys of this dictionary
            should be variable names in the dataset (i.e., these starting with `itemsession_`, `price_`, `user_`, etc), or `intercept`
            if the researcher requires an intercept term.
            For each variable name X_var (e.g., `user_income`) or `intercept`, the corresponding dictionary key should
            be one of the following values, this value specifies the "level of variation" of the coefficient.

            - `constant`: the coefficient constant over all users and items: $X \beta$.

            - `user`: user-specific parameters but constant across all items: $X \beta_{u}$.

            - `item`: item-specific parameters but constant across all users, $X \beta_{i}$.
                Note that the coefficients for the first item are forced to be zero following the standard practice
                in econometrics.

            - `item-full`: the same configuration as `item`, but does not force the coefficients of the first item to
                be zeros.

            The following configurations are supported by the package, but we don't recommend using them due to the
                large number of parameters.
            - `user-item`: parameters that are specific to both user and item, parameter for the first item
                for all users are forced to be zero.

            - `user-item-full`: parameters that are specific to both user and item, explicitly model for all items.
        num_param_dict (Optional[Dict[str, int]]): variable type to number of parameters dictionary with keys exactly the same
            as the `coef_variation_dict`. Values of `num_param_dict` records numbers of features in each kind of variable.
            If None is supplied, num_param_dict will be a dictionary with the same keys as the `coef_variation_dict` dictionary
            and values of all ones. Default to be None.
        num_items (int): number of items in the dataset.
        num_users (int): number of users in the dataset.
        regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
            regularization added to the log-likelihood.
            - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
            - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
            - None does not modify the log-likelihood.
            Defaults to None.
        regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
            This term controls the strength of regularization. This argument is required if and only if regularization
            is not None.
            Defaults to None.
        weight_initialization (Optional[Union[str, Dict[str, str]]]): controls for how coefficients are initialized;
            users can pass a string from {'normal', 'uniform', 'zero'} to initialize all coefficients in the same way.
            Alternatively, users can pass a dictionary with keys exactly the same as the `coef_variation_dict` dictionary,
            and values from {'normal', 'uniform', 'zero'} to initialize coefficients of different types of variables differently.
            By default, all coefficients are initialized following a standard normal distribution.
        model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
            To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
            In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
            Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
            The utility of the outside option is always set to 0 while computing the probability.
            By default, model_outside_option is set to False and the model does not model the outside option.
    """
    # ==============================================================================================================
    # Check that the model received a valid combination of inputs so that it can be initialized.
    # ==============================================================================================================
    if coef_variation_dict is None and formula is None:
        raise ValueError("Either coef_variation_dict or formula should be provided to specify the model.")

    if (coef_variation_dict is not None) and (formula is not None):
        raise ValueError("Only one of coef_variation_dict or formula should be provided to specify the model.")

    if (formula is not None) and (dataset is None):
        raise ValueError("If formula is provided, data should be provided to specify the model.")


    # ==============================================================================================================
    # Build necessary dictionaries for model initialization.
    # ==============================================================================================================
    if formula is None:
        # Use dictionaries to initialize the model.
        if num_param_dict is None:
            warnings.warn("`num_param_dict` is not provided, all variables will be treated as having one parameter.")
            num_param_dict = {key:1 for key in coef_variation_dict.keys()}

        assert coef_variation_dict.keys() == num_param_dict.keys()

        # variable `var` with variation `spec` to variable `var[spec]`.
        rename = dict()  # old variable name --> new variable name.
        for variable, specificity in coef_variation_dict.items():
            rename[variable] = f"{variable}[{specificity}]"

        for old_name, new_name in rename.items():
            coef_variation_dict[new_name] = coef_variation_dict.pop(old_name)
            num_param_dict[new_name] = num_param_dict.pop(old_name)
    else:
        # Use the formula to infer model.
        coef_variation_dict, num_param_dict = parse_formula(formula, dataset)

    # ==============================================================================================================
    # Model Initialization.
    # ==============================================================================================================
    super(ConditionalLogitModel, self).__init__()

    self.coef_variation_dict = deepcopy(coef_variation_dict)
    self.num_param_dict = deepcopy(num_param_dict)

    self.num_items = num_items
    self.num_users = num_users

    self.regularization = deepcopy(regularization)
    assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
    self.regularization_weight = regularization_weight
    if (self.regularization is not None) and (self.regularization_weight is None):
        raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
    if (self.regularization is None) and (self.regularization_weight is not None):
        raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')

    # check number of parameters specified are all positive.
    for var_type, num_params in self.num_param_dict.items():
        assert num_params > 0, f'num_params needs to be positive, got: {num_params}.'

    # infer the number of parameters for intercept if the researcher forgets.
    for variable in self.coef_variation_dict.keys():
        if self.is_intercept_term(variable) and variable not in self.num_param_dict.keys():
            warnings.warn(f"`{variable}` key found in coef_variation_dict but not in num_param_dict, num_param_dict['{variable}'] has been set to 1.")
            self.num_param_dict[variable] = 1

    # inform coefficients their ways of being initialized.
    self.weight_initialization = deepcopy(weight_initialization)

    # construct trainable parameters.
    coef_dict = dict()
    for var_type, variation in self.coef_variation_dict.items():
        if isinstance(self.weight_initialization, dict):
            if var_type.split('[')[0] in self.weight_initialization.keys():
                # use the variable-specific initialization if provided.
                init = self.weight_initialization[var_type.split('[')[0]]
            else:
                # use default initialization.
                init = None
        else:
            # initialize all coefficients in the same way.
            init = self.weight_initialization

        coef_dict[var_type] = Coefficient(variation=variation,
                                          num_items=self.num_items,
                                          num_users=self.num_users,
                                          num_params=self.num_param_dict[var_type],
                                          init=init)
    # A ModuleDict is required to properly register all trainable parameters.
    # self.parameter() will fail if a python dictionary is used instead.
    self.coef_dict = nn.ModuleDict(coef_dict)
    self.model_outside_option = model_outside_option

__repr__(self) special

Return a string representation of the model.

Returns:

Type Description
str

the string representation of the model.

Source code in torch_choice/model/conditional_logit_model.py
def __repr__(self) -> str:
    """Return a string representation of the model.

    Returns:
        str: the string representation of the model.
    """
    out_str_lst = ['Conditional logistic discrete choice model, expects input features:\n']
    for var_type, num_params in self.num_param_dict.items():
        out_str_lst.append(f'X[{var_type}] with {num_params} parameters, with {self.coef_variation_dict[var_type]} level variation.')
    return super().__repr__() + '\n' + '\n'.join(out_str_lst) + '\n' + f'device={self.device}'

forward(self, batch, manual_coef_value_dict=None)

Forward pass of the model.

Parameters:

Name Type Description Default
batch ChoiceDataset

a ChoiceDataset object.

required
manual_coef_value_dict Optional[Dict[str, torch.Tensor]]

a dictionary with keys in {'u', 'i'} etc and tensors as values. If provided, the model will force coefficient to be the provided values and compute utility conditioned on the provided coefficient values. This feature is useful when the research wishes to plug in particular values of coefficients and examine the utility values. If not provided, the model will use the learned coefficient values in self.coef_dict. Defaults to None.

None

Returns:

Type Description
torch.Tensor

a tensor of shape (num_trips, num_items) whose (t, i) entry represents the utility from item i in trip t for the user involved in that trip.

Source code in torch_choice/model/conditional_logit_model.py
def forward(self,
            batch: ChoiceDataset,
            manual_coef_value_dict: Optional[Dict[str, torch.Tensor]] = None
            ) -> torch.Tensor:
    """
    Forward pass of the model.

    Args:
        batch: a `ChoiceDataset` object.

        manual_coef_value_dict (Optional[Dict[str, torch.Tensor]], optional): a dictionary with
            keys in {'u', 'i'} etc and tensors as values. If provided, the model will force
            coefficient to be the provided values and compute utility conditioned on the provided
            coefficient values. This feature is useful when the research wishes to plug in particular
            values of coefficients and examine the utility values. If not provided, the model will
            use the learned coefficient values in self.coef_dict.
            Defaults to None.

    Returns:
        torch.Tensor: a tensor of shape (num_trips, num_items) whose (t, i) entry represents
            the utility from item i in trip t for the user involved in that trip.
    """
    x_dict = batch.x_dict

    for variable in self.coef_variation_dict.keys():
        if self.is_intercept_term(variable):
            # intercept term has no input tensor from the ChoiceDataset data structure.
            # the tensor for intercept has only 1 feature, every entry is 1.
            x_dict['intercept'] = torch.ones((len(batch), self.num_items, 1), device=batch.device)
            break

    # compute the utility from each item in each choice session.
    total_utility = torch.zeros((len(batch), self.num_items), device=batch.device)
    # for each type of variables, apply the corresponding coefficient to input x.

    for var_type, coef in self.coef_dict.items():
        # variable type is named as "observable_name[variation]", retrieve the corresponding observable name.
        corresponding_observable = var_type.split("[")[0]
        total_utility += coef(
            x_dict[corresponding_observable],
            batch.user_index,
            manual_coef_value=None if manual_coef_value_dict is None else manual_coef_value_dict[var_type])

    assert total_utility.shape == (len(batch), self.num_items)

    if batch.item_availability is not None:
        # mask out unavailable items.
        total_utility[~batch.item_availability[batch.session_index, :]] = torch.finfo(total_utility.dtype).min / 2

    # accommodate the outside option.
    if self.model_outside_option:
        # the outside option has zero utility.
        util_zero = torch.zeros(total_utility.size(0), 1, device=batch.device)  # (len(batch), 1)  zero tensor.
        # outside option is indicated by item_index == -1, we put it at the end.
        total_utility = torch.cat((total_utility, util_zero), dim=1)  # (len(batch), num_items+1)
    return total_utility

get_coefficient(self, variable)

Retrieve the coefficient tensor for the given variable.

Parameters:

Name Type Description Default
variable str

the variable name.

required

Returns:

Type Description
torch.Tensor

the corresponding coefficient tensor of the requested variable.

Source code in torch_choice/model/conditional_logit_model.py
def get_coefficient(self, variable: str) -> torch.Tensor:
    """Retrieve the coefficient tensor for the given variable.

    Args:
        variable (str): the variable name.

    Returns:
        torch.Tensor: the corresponding coefficient tensor of the requested variable.
    """
    return self.state_dict()[f"coef_dict.{variable}.coef"].detach().clone()

loss(self, *args, **kwargs)

The loss function to be optimized. This is a wrapper of negative_log_likelihood + regularization loss if required.

Source code in torch_choice/model/conditional_logit_model.py
def loss(self, *args, **kwargs):
    """The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
    nll = self.negative_log_likelihood(*args, **kwargs)
    if self.regularization is not None:
        L = {'L1': 1, 'L2': 2}[self.regularization]
        for param in self.parameters():
            nll += self.regularization_weight * torch.norm(param, p=L)
    return nll

negative_log_likelihood(self, batch, y, is_train=True)

Computes the log-likelihood for the batch and label. TODO: consider remove y, change to label. TODO: consider move this method outside the model, the role of the model is to compute the utility.

Parameters:

Name Type Description Default
batch ChoiceDataset

a ChoiceDataset object containing the data.

required
y torch.Tensor

the label.

required
is_train bool

whether to trace the gradient. Defaults to True.

True

Returns:

Type Description
torch.Tensor

the negative log-likelihood.

Source code in torch_choice/model/conditional_logit_model.py
def negative_log_likelihood(self, batch: ChoiceDataset, y: torch.Tensor, is_train: bool=True) -> torch.Tensor:
    """Computes the log-likelihood for the batch and label.
    TODO: consider remove y, change to label.
    TODO: consider move this method outside the model, the role of the model is to compute the utility.

    Args:
        batch (ChoiceDataset): a ChoiceDataset object containing the data.
        y (torch.Tensor): the label.
        is_train (bool, optional): whether to trace the gradient. Defaults to True.

    Returns:
        torch.Tensor: the negative log-likelihood.
    """
    if is_train:
        self.train()
    else:
        self.eval()
    # (num_trips, num_items)
    total_utility = self.forward(batch)
    # check shapes.
    if self.model_outside_option:
        assert total_utility.shape == (len(batch), self.num_items+1)
        assert torch.all(total_utility[:, -1] == 0), "The last column of total_utility should be all zeros, which corresponds to the outside option."
    else:
        assert total_utility.shape == (len(batch), self.num_items)
    logP = torch.log_softmax(total_utility, dim=1)
    # since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
    # indexing should correctly retrieve the log-likelihood even for outside options.
    nll = - logP[torch.arange(len(y)), y].sum()
    return nll

summary(self)

Print out the current model parameter.

Source code in torch_choice/model/conditional_logit_model.py
def summary(self):
    """Print out the current model parameter."""
    for var_type, coefficient in self.coef_dict.items():
        if coefficient is not None:
            print('Variable Type: ', var_type)
            print(coefficient.coef)
Source code in torch_choice/model/nested_logit_model.py
class NestedLogitModel(nn.Module):
    def __init__(self,
                 nest_to_item: Dict[object, List[int]],
                 # method 1: specify variation and num param. dictionary.
                 nest_coef_variation_dict: Optional[Dict[str, str]]=None,
                 nest_num_param_dict: Optional[Dict[str, int]]=None,
                 item_coef_variation_dict: Optional[Dict[str, str]]=None,
                 item_num_param_dict: Optional[Dict[str, int]]=None,
                 # method 2: specify formula and dataset.
                 item_formula: Optional[str]=None,
                 nest_formula: Optional[str]=None,
                 dataset: Optional[JointDataset]=None,
                 num_users: Optional[int]=None,
                 shared_lambda: bool=False,
                 regularization: Optional[str]=None,
                 regularization_weight: Optional[float]=None,
                 nest_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
                 item_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
                 model_outside_option: Optional[bool]=False
                 ) -> None:
        """Initialization method of the nested logit model.

        Args:
            nest_to_item (Dict[object, List[int]]): a dictionary maps a nest ID to a list
                of items IDs of the queried nest.

            nest_coef_variation_dict (Dict[str, str]): a dictionary maps a variable type
                (i.e., variable group) to the level of variation for the coefficient of this type
                of variables.
            nest_num_param_dict (Dict[str, int]): a dictionary maps a variable type name to
                the number of parameters in this variable group.

            item_coef_variation_dict (Dict[str, str]): the same as nest_coef_variation_dict but
                for item features.
            item_num_param_dict (Dict[str, int]): the same as nest_num_param_dict but for item
                features.

            {nest, item}_formula (str): a string representing the utility formula for the {nest, item} level logit model.
                The formula consists of '(variable_name|variation)'s separated by '+', for example:
                "(var1|item) + (var2|user) + (var3|constant)"
                where the first part of each term is the name of the variable
                and the second part is the variation of the coefficient.
                The variation can be one of the following:
                'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
                All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
            dataset (JointDataset): a JointDataset object for training the model, the parser will infer dimensions of variables
                and sizes of coefficients for the nest level model from dataset.datasets['nest']. The parser will infer dimensions of variables and sizes of coefficients for the item level model from dataset.datasets['item'].

            num_users (Optional[int], optional): number of users to be modelled, this is only
                required if any of variable type requires user-specific variations.
                Defaults to None.

            shared_lambda (bool): a boolean indicating whether to enforce the elasticity lambda, which
                is the coefficient for inclusive values, to be constant for all nests.
                The lambda enters the nest-level selection as the following
                Utility of choosing nest k = lambda * inclusive value of nest k
                                               + linear combination of some other nest level features
                If set to True, a single lambda will be learned for all nests, otherwise, the
                model learns an individual lambda for each nest.
                Defaults to False.

            regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
                regularization added to the log-likelihood.
                - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
                - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
                - None does not modify the log-likelihood.
                Defaults to None.

            regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
                This term controls the strength of regularization. This argument is required if and only if regularization
                is not None.
                Defaults to None.

            {nest, item}_weight_initialization (Optional[Union[str, Dict[str, str]]]): methods to initialize the weights of
                coefficients for {nest, item} level model. Please refer to the `weight_initialization` keyword in ConditionalLogitModel's documentation for more details.

            model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
                To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
                In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
                Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
                The utility of the outside option is always set to 0 while computing the probability.
                By default, model_outside_option is set to False and the model does not model the outside option.
        """
        # handle nest level model.
        using_formula_to_initiate = (item_formula is not None) and (nest_formula is not None)
        if using_formula_to_initiate:
            # make sure that the research does not specify duplicated information, which might cause conflict.
            if (nest_coef_variation_dict is not None) or (item_coef_variation_dict is not None):
                raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_coef_variation_dict at the same time.')
            if (nest_num_param_dict is not None) or (item_num_param_dict is not None):
                raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_num_param_dict at the same time.')
            if dataset is None:
                raise ValueError('Dataset is required if {item, nest}_formula is specified to initiate the model.')

            nest_coef_variation_dict, nest_num_param_dict = parse_formula(nest_formula, dataset.datasets['nest'])
            item_coef_variation_dict, item_num_param_dict = parse_formula(item_formula, dataset.datasets['item'])

        else:
            # check for conflicting information.
            if (nest_formula is not None) or (item_formula is not None):
                raise ValueError('You should not specify {item, nest}_formula and {item, nest}_coef_variation_dict at the same time.')
            # make sure that the research specifies all the required information.
            if (nest_coef_variation_dict is None) or (item_coef_variation_dict is None):
                raise ValueError('You should specify the {item, nest}_coef_variation_dict to initiate the model.')
            if (nest_num_param_dict is None) or (item_num_param_dict is None):
                raise ValueError('You should specify the {item, nest}_num_param_dict to initiate the model.')

        super(NestedLogitModel, self).__init__()
        self.nest_to_item = nest_to_item
        self.nest_coef_variation_dict = nest_coef_variation_dict
        self.nest_num_param_dict = nest_num_param_dict
        self.item_coef_variation_dict = item_coef_variation_dict
        self.item_num_param_dict = item_num_param_dict
        self.num_users = num_users

        self.nests = list(nest_to_item.keys())
        self.num_nests = len(self.nests)
        self.num_items = sum(len(items) for items in nest_to_item.values())

        # nest coefficients.
        self.nest_coef_dict = self._build_coef_dict(self.nest_coef_variation_dict,
                                                    self.nest_num_param_dict,
                                                    self.num_nests,
                                                    weight_initialization=deepcopy(nest_weight_initialization))

        # item coefficients.
        self.item_coef_dict = self._build_coef_dict(self.item_coef_variation_dict,
                                                    self.item_num_param_dict,
                                                    self.num_items,
                                                    weight_initialization=deepcopy(item_weight_initialization))

        self.shared_lambda = shared_lambda
        if self.shared_lambda:
            self.lambda_weight = nn.Parameter(torch.ones(1), requires_grad=True)
        else:
            self.lambda_weight = nn.Parameter(torch.ones(self.num_nests) / 2, requires_grad=True)
        # breakpoint()
        # self.iv_weights = nn.Parameter(torch.ones(1), requires_grad=True)
        # used to warn users if forgot to call clamp.
        self._clamp_called_flag = True

        self.regularization = regularization
        assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
        self.regularization_weight = regularization_weight
        if (self.regularization is not None) and (self.regularization_weight is None):
            raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
        if (self.regularization is None) and (self.regularization_weight is not None):
            raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')

        self.model_outside_option = model_outside_option

    @property
    def num_params(self) -> int:
        """Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied
        with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no
        intercept is involved.

        Returns:
            int: the total number of learnable parameters.
        """
        return sum(w.numel() for w in self.parameters())

    def _build_coef_dict(self,
                         coef_variation_dict: Dict[str, str],
                         num_param_dict: Dict[str, int],
                         num_items: int,
                         weight_initialization: Optional[Union[str, Dict[str, str]]]=None
                         ) -> nn.ModuleDict:
        """Builds a coefficient dictionary containing all trainable components of the model, mapping coefficient names
            to the corresponding Coefficient Module.
            num_items could be the actual number of items or the number of nests depends on the use case.
            NOTE: torch-choice users don't directly interact with this method.

        Args:
            coef_variation_dict (Dict[str, str]): a dictionary mapping coefficient names (e.g., theta_user) to the level
                of variation (e.g., 'user').
            num_param_dict (Dict[str, int]): a dictionary mapping coefficient names to the number of parameters in this
                coefficient. Be aware that, for example, if there is one K-dimensional coefficient for every user, then
                the `num_param` should be K instead of K x number of users.
            num_items (int): the total number of items in the prediction problem. `num_items` should be the number of nests if _build_coef_dict() is used for nest-level prediction.

        Returns:
            nn.ModuleDict: a PyTorch ModuleDict object mapping from coefficient names to training Coefficient.
        """
        coef_dict = dict()
        for var_type, variation in coef_variation_dict.items():
            num_params = num_param_dict[var_type]

            if isinstance(weight_initialization, dict):
                if var_type.split('[')[0] in weight_initialization.keys():
                    # use the variable-specific initialization if provided.
                    init = weight_initialization[var_type.split('[')[0]]
                else:
                    # use default initialization.
                    init = None
            else:
                # initialize all coefficients in the same way.
                init = weight_initialization

            coef_dict[var_type] = Coefficient(variation=variation,
                                              num_items=num_items,
                                              num_users=self.num_users,
                                              num_params=num_params,
                                              init=init)
        return nn.ModuleDict(coef_dict)


    def forward(self, batch: ChoiceDataset) -> torch.Tensor:
        """An standard forward method for the model, the user feeds a ChoiceDataset batch and the model returns the
            predicted log-likelihood tensor. The main forward passing happens in the _forward() method, but we provide
            this wrapper forward() method for a cleaner API, as forward() only requires a single batch argument.
            For more details about the forward passing, please refer to the _forward() method.

        # TODO: the ConditionalLogitModel returns predicted utility, the NestedLogitModel behaves the same?

        Args:
            batch (ChoiceDataset): a ChoiceDataset object containing the data batch.

        Returns:
            torch.Tensor: a tensor of shape (num_trips, num_items) including the log probability
            of choosing item i in trip t.
        """
        return self._forward(batch['nest'].x_dict,
                             batch['item'].x_dict,
                             batch['item'].user_index,
                             batch['item'].item_availability)

    def _forward(self,
                 nest_x_dict: Dict[str, torch.Tensor],
                 item_x_dict: Dict[str, torch.Tensor],
                 user_index: Optional[torch.LongTensor] = None,
                 item_availability: Optional[torch.BoolTensor] = None
                 ) -> torch.Tensor:
        """"Computes log P[t, i] = the log probability for the user involved in trip t to choose item i.
        Let n denote the ID of the user involved in trip t, then P[t, i] = P_{ni} on page 86 of the
        book "discrete choice methods with simulation" by Train.

        The `_forward` method is an internal API, users should refer to the `forward` method.

        Args:
            nest_x_dict (torch.Tensor): a dictionary mapping from nest-level feature names to the corresponding feature tensor.

            item_x_dict (torch.Tensor): a dictionary mapping from item-level feature names to the corresponding feature tensor.

                More details on the shape of the tensors can be found in the docstring of the `x_dict` method of `ChoiceDataset`.

            user_index (torch.LongTensor): a tensor of shape (num_trips,) indicating which user is
                making decision in each trip. Setting user_index = None assumes the same user is
                making decisions in all trips.
            item_availability (torch.BoolTensor): a boolean tensor with shape (num_trips, num_items)
                indicating the aviliability of items in each trip. If item_availability[t, i] = False,
                the utility of choosing item i in trip t, V[t, i], will be set to -inf.
                Given the decomposition V[t, i] = W[t, k(i)] + Y[t, i] + eps, V[t, i] is set to -inf
                by setting Y[t, i] = -inf for unavilable items.

        Returns:
            torch.Tensor: a tensor of shape (num_trips, num_items) including the log probability
            of choosing item i in trip t.
        """
        if self.shared_lambda:
            self.lambdas = self.lambda_weight.expand(self.num_nests)
        else:
            self.lambdas = self.lambda_weight

        # if not self._clamp_called_flag:
        #     warnings.warn('Did you forget to call clamp_lambdas() after optimizer.step()?')

        # The overall utility of item can be decomposed into V[item] = W[nest] + Y[item] + eps.
        T = list(item_x_dict.values())[0].shape[0]
        device = list(item_x_dict.values())[0].device
        # compute nest-specific utility with shape (T, num_nests).
        W = torch.zeros(T, self.num_nests).to(device)

        for variable in self.nest_coef_variation_dict.keys():
            if self.is_intercept_term(variable):
                nest_x_dict['intercept'] = torch.ones((T, self.num_nests, 1)).to(device)
                break

        for variable in self.item_coef_variation_dict.keys():
            if self.is_intercept_term(variable):
                item_x_dict['intercept'] = torch.ones((T, self.num_items, 1)).to(device)
                break

        for var_type, coef in self.nest_coef_dict.items():
            corresponding_observable = var_type.split("[")[0]
            W += coef(nest_x_dict[corresponding_observable], user_index)

        # compute item-specific utility (T, num_items).
        Y = torch.zeros(T, self.num_items).to(device)
        for var_type, coef in self.item_coef_dict.items():
            corresponding_observable = var_type.split("[")[0]
            Y += coef(item_x_dict[corresponding_observable], user_index)

        if item_availability is not None:
            Y[~item_availability] = torch.finfo(Y.dtype).min / 2

        # =============================================================================
        # compute the inclusive value of each nest.
        inclusive_value = dict()
        for k, Bk in self.nest_to_item.items():
            # for nest k, divide the Y of all items in Bk by lambda_k.
            Y[:, Bk] /= self.lambdas[k]
            # compute inclusive value for nest k.
            # mask out unavilable items.
            inclusive_value[k] = torch.logsumexp(Y[:, Bk], dim=1, keepdim=False)  # (T,)
        # boardcast inclusive value from (T, num_nests) to (T, num_items).
        # for trip t, I[t, i] is the inclusive value of the nest item i belongs to.
        I = torch.zeros(T, self.num_items).to(device)
        for k, Bk in self.nest_to_item.items():
            I[:, Bk] = inclusive_value[k].view(-1, 1)  # (T, |Bk|)

        # logP_item[t, i] = log P(ni|Bk), where Bk is the nest item i is in, n is the user in trip t.
        logP_item = Y - I  # (T, num_items)

        if self.model_outside_option:
            # if the model explicitly models the outside option, we need to add a column of zeros to logP_item.
            # log P(ni|Bk) = 0 for the outside option since Y = 0 and the outside option has its own nest.
            logP_item = torch.cat((logP_item, torch.zeros(T, 1).to(device)), dim=1)
            assert logP_item.shape == (T, self.num_items+1)
            assert torch.all(logP_item[:, -1] == 0)

        # =============================================================================
        # logP_nest[t, i] = log P(Bk), for item i in trip t, the probability of choosing the nest/bucket
        # item i belongs to. logP_nest has shape (T, num_items)
        # logit[t, i] = W[n, k] + lambda[k] I[n, k], where n is the user involved in trip t, k is
        # the nest item i belongs to.
        logit = torch.zeros(T, self.num_items).to(device)
        for k, Bk in self.nest_to_item.items():
            logit[:, Bk] = (W[:, k] + self.lambdas[k] * inclusive_value[k]).view(-1, 1)  # (T, |Bk|)
        # only count each nest once in the logsumexp within the nest level model.
        cols = [x[0] for x in self.nest_to_item.values()]
        if self.model_outside_option:
            # the last column corresponds to the outside option, which has W+lambda*I = 0 since W = I = Y = 0 for the outside option.
            logit = torch.cat((logit, torch.zeros(T, 1).to(device)), dim=1)
            assert logit.shape == (T, self.num_items+1)
            # we have already added W+lambda*I for each "actual" nest, now we add the "fake" nest for the outside option.
            cols.append(-1)
        logP_nest = logit - torch.logsumexp(logit[:, cols], dim=1, keepdim=True)

        # =============================================================================
        # compute the joint log P_{ni} as in the textbook.
        logP = logP_item + logP_nest
        self._clamp_called_flag = False
        return logP

    def log_likelihood(self, *args):
        """Computes the log likelihood of the model, please refer to the negative_log_likelihood() method.

        Returns:
            _type_: the log likelihood of the model.
        """
        return - self.negative_log_likelihood(*args)

    def negative_log_likelihood(self,
                                batch: ChoiceDataset,
                                y: torch.LongTensor,
                                is_train: bool=True) -> torch.scalar_tensor:
        """Computes the negative log likelihood of the model. Please note the log-likelihood is summed over all samples
            in batch instead of the average.

        Args:
            batch (ChoiceDataset): the ChoiceDataset object containing the data.
            y (torch.LongTensor): the label.
            is_train (bool, optional): which mode of the model to be used for the forward passing, if we need Hessian
                of the NLL through auto-grad, `is_train` should be set to True. If we merely need a performance metric,
                then `is_train` can be set to False for better performance.
                Defaults to True.

        Returns:
            torch.scalar_tensor: the negative log likelihood of the model.
        """
        # compute the negative log-likelihood loss directly.
        if is_train:
            self.train()
        else:
            self.eval()
        # (num_trips, num_items)
        logP = self.forward(batch)
        # check shapes
        if self.model_outside_option:
            assert logP.shape == (len(batch['item']), self.num_items+1)
        else:
            assert logP.shape == (len(batch['item']), self.num_items)
        # since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
        # indexing should correctly retrieve the log-likelihood even for outside options.
        nll = - logP[torch.arange(len(y)), y].sum()
        return nll

    def loss(self, *args, **kwargs):
        """The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
        nll = self.negative_log_likelihood(*args, **kwargs)
        if self.regularization is not None:
            L = {'L1': 1, 'L2': 2}[self.regularization]
            for name, param in self.named_parameters():
                if name == 'lambda_weight':
                    # we don't regularize the lambda term, we only regularize coefficients.
                    continue
                nll += self.regularization_weight * torch.norm(param, p=L)
        return nll

    @property
    def device(self) -> torch.device:
        """Returns the device of the coefficient.

        Returns:
            torch.device: the device of the model.
        """
        return next(iter(self.item_coef_dict.values())).device

    @staticmethod
    def is_intercept_term(variable: str):
        # check if the given variable is an intercept (fixed effect) term.
        # intercept (fixed effect) terms are defined as 'intercept[*]' and looks like 'intercept[user]', 'intercept[item]', etc.
        return (variable.startswith('intercept[') and variable.endswith(']'))

    def get_coefficient(self, variable: str, level: Optional[str] = None) -> torch.Tensor:
        """Retrieve the coefficient tensor for the given variable.

        Args:
            variable (str): the variable name.
            level (str): from which level of model to extract the coefficient, can be 'item' or 'nest'. The `level` argument will be discarded if `variable` is `lambda`.

        Returns:
            torch.Tensor: the corresponding coefficient tensor of the requested variable.
        """
        if variable == 'lambda':
            return self.lambda_weight.detach().clone()

        if level not in ['item', 'nest']:
            raise ValueError(f"Level should be either 'item' or 'nest', got {level}.")

        return self.state_dict()[f'{level}_coef_dict.{variable}.coef'].detach().clone()

    # def clamp_lambdas(self):
    #     """
    #     Restrict values of lambdas to 0 < lambda <= 1 to guarantee the utility maximization property
    #     of the model.
    #     This method should be called everytime after optimizer.step().
    #     We add a self_clamp_called_flag to remind researchers if this method is not called.
    #     """
    #     for k in range(len(self.lambdas)):
    #         self.lambdas[k] = torch.clamp(self.lambdas[k], 1e-5, 1)
    #     self._clam_called_flag = True

    # @staticmethod
    # def add_constant(x: torch.Tensor, where: str='prepend') -> torch.Tensor:
    #     """A helper function used to add constant to feature tensor,
    #     x has shape (batch_size, num_classes, num_parameters),
    #     returns a tensor of shape (*, num_parameters+1).
    #     """
    #     batch_size, num_classes, num_parameters = x.shape
    #     ones = torch.ones((batch_size, num_classes, 1))
    #     if where == 'prepend':
    #         new = torch.cat((ones, x), dim=-1)
    #     elif where == 'append':
    #         new = torch.cat((x, ones), dim=-1)
    #     else:
    #         raise Exception
    #     return new

device: device property readonly

Returns the device of the coefficient.

Returns:

Type Description
torch.device

the device of the model.

num_params: int property readonly

Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no intercept is involved.

Returns:

Type Description
int

the total number of learnable parameters.

__init__(self, nest_to_item, nest_coef_variation_dict=None, nest_num_param_dict=None, item_coef_variation_dict=None, item_num_param_dict=None, item_formula=None, nest_formula=None, dataset=None, num_users=None, shared_lambda=False, regularization=None, regularization_weight=None, nest_weight_initialization=None, item_weight_initialization=None, model_outside_option=False) special

Initialization method of the nested logit model.

Parameters:

Name Type Description Default
nest_to_item Dict[object, List[int]]

a dictionary maps a nest ID to a list of items IDs of the queried nest.

required
nest_coef_variation_dict Dict[str, str]

a dictionary maps a variable type (i.e., variable group) to the level of variation for the coefficient of this type of variables.

None
nest_num_param_dict Dict[str, int]

a dictionary maps a variable type name to the number of parameters in this variable group.

None
item_coef_variation_dict Dict[str, str]

the same as nest_coef_variation_dict but for item features.

None
item_num_param_dict Dict[str, int]

the same as nest_num_param_dict but for item features.

None
{nest, item}_formula (str

a string representing the utility formula for the {nest, item} level logit model. The formula consists of '(variable_name|variation)'s separated by '+', for example: "(var1|item) + (var2|user) + (var3|constant)" where the first part of each term is the name of the variable and the second part is the variation of the coefficient. The variation can be one of the following: 'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'. All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.

required
dataset JointDataset

a JointDataset object for training the model, the parser will infer dimensions of variables and sizes of coefficients for the nest level model from dataset.datasets['nest']. The parser will infer dimensions of variables and sizes of coefficients for the item level model from dataset.datasets['item'].

None
num_users Optional[int]

number of users to be modelled, this is only required if any of variable type requires user-specific variations. Defaults to None.

None
shared_lambda bool

a boolean indicating whether to enforce the elasticity lambda, which is the coefficient for inclusive values, to be constant for all nests. The lambda enters the nest-level selection as the following Utility of choosing nest k = lambda * inclusive value of nest k + linear combination of some other nest level features If set to True, a single lambda will be learned for all nests, otherwise, the model learns an individual lambda for each nest. Defaults to False.

False
regularization Optional[str]

this argument takes values from {'L1', 'L2', None}, which specifies the type of regularization added to the log-likelihood. - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood. - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood. - None does not modify the log-likelihood. Defaults to None.

None
regularization_weight Optional[float]

the weight of parameter norm subtracted from the log-likelihood. This term controls the strength of regularization. This argument is required if and only if regularization is not None. Defaults to None.

None
{nest, item}_weight_initialization (Optional[Union[str, Dict[str, str]]]

methods to initialize the weights of coefficients for {nest, item} level model. Please refer to the weight_initialization keyword in ConditionalLogitModel's documentation for more details.

required
model_outside_option Optional[bool]

whether to explicitly model the outside option (i.e., the consumer did not buy anything). To enable modeling outside option, the outside option is indicated by item_index[n] == -1 in the item-index-tensor. In this case, the item-index-tensor can contain values in {-1, 0, 1, ..., num_items-1}. Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in {0, 1, ..., num_items-1}. The utility of the outside option is always set to 0 while computing the probability. By default, model_outside_option is set to False and the model does not model the outside option.

False
Source code in torch_choice/model/nested_logit_model.py
def __init__(self,
             nest_to_item: Dict[object, List[int]],
             # method 1: specify variation and num param. dictionary.
             nest_coef_variation_dict: Optional[Dict[str, str]]=None,
             nest_num_param_dict: Optional[Dict[str, int]]=None,
             item_coef_variation_dict: Optional[Dict[str, str]]=None,
             item_num_param_dict: Optional[Dict[str, int]]=None,
             # method 2: specify formula and dataset.
             item_formula: Optional[str]=None,
             nest_formula: Optional[str]=None,
             dataset: Optional[JointDataset]=None,
             num_users: Optional[int]=None,
             shared_lambda: bool=False,
             regularization: Optional[str]=None,
             regularization_weight: Optional[float]=None,
             nest_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
             item_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
             model_outside_option: Optional[bool]=False
             ) -> None:
    """Initialization method of the nested logit model.

    Args:
        nest_to_item (Dict[object, List[int]]): a dictionary maps a nest ID to a list
            of items IDs of the queried nest.

        nest_coef_variation_dict (Dict[str, str]): a dictionary maps a variable type
            (i.e., variable group) to the level of variation for the coefficient of this type
            of variables.
        nest_num_param_dict (Dict[str, int]): a dictionary maps a variable type name to
            the number of parameters in this variable group.

        item_coef_variation_dict (Dict[str, str]): the same as nest_coef_variation_dict but
            for item features.
        item_num_param_dict (Dict[str, int]): the same as nest_num_param_dict but for item
            features.

        {nest, item}_formula (str): a string representing the utility formula for the {nest, item} level logit model.
            The formula consists of '(variable_name|variation)'s separated by '+', for example:
            "(var1|item) + (var2|user) + (var3|constant)"
            where the first part of each term is the name of the variable
            and the second part is the variation of the coefficient.
            The variation can be one of the following:
            'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
            All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
        dataset (JointDataset): a JointDataset object for training the model, the parser will infer dimensions of variables
            and sizes of coefficients for the nest level model from dataset.datasets['nest']. The parser will infer dimensions of variables and sizes of coefficients for the item level model from dataset.datasets['item'].

        num_users (Optional[int], optional): number of users to be modelled, this is only
            required if any of variable type requires user-specific variations.
            Defaults to None.

        shared_lambda (bool): a boolean indicating whether to enforce the elasticity lambda, which
            is the coefficient for inclusive values, to be constant for all nests.
            The lambda enters the nest-level selection as the following
            Utility of choosing nest k = lambda * inclusive value of nest k
                                           + linear combination of some other nest level features
            If set to True, a single lambda will be learned for all nests, otherwise, the
            model learns an individual lambda for each nest.
            Defaults to False.

        regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
            regularization added to the log-likelihood.
            - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
            - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
            - None does not modify the log-likelihood.
            Defaults to None.

        regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
            This term controls the strength of regularization. This argument is required if and only if regularization
            is not None.
            Defaults to None.

        {nest, item}_weight_initialization (Optional[Union[str, Dict[str, str]]]): methods to initialize the weights of
            coefficients for {nest, item} level model. Please refer to the `weight_initialization` keyword in ConditionalLogitModel's documentation for more details.

        model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
            To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
            In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
            Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
            The utility of the outside option is always set to 0 while computing the probability.
            By default, model_outside_option is set to False and the model does not model the outside option.
    """
    # handle nest level model.
    using_formula_to_initiate = (item_formula is not None) and (nest_formula is not None)
    if using_formula_to_initiate:
        # make sure that the research does not specify duplicated information, which might cause conflict.
        if (nest_coef_variation_dict is not None) or (item_coef_variation_dict is not None):
            raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_coef_variation_dict at the same time.')
        if (nest_num_param_dict is not None) or (item_num_param_dict is not None):
            raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_num_param_dict at the same time.')
        if dataset is None:
            raise ValueError('Dataset is required if {item, nest}_formula is specified to initiate the model.')

        nest_coef_variation_dict, nest_num_param_dict = parse_formula(nest_formula, dataset.datasets['nest'])
        item_coef_variation_dict, item_num_param_dict = parse_formula(item_formula, dataset.datasets['item'])

    else:
        # check for conflicting information.
        if (nest_formula is not None) or (item_formula is not None):
            raise ValueError('You should not specify {item, nest}_formula and {item, nest}_coef_variation_dict at the same time.')
        # make sure that the research specifies all the required information.
        if (nest_coef_variation_dict is None) or (item_coef_variation_dict is None):
            raise ValueError('You should specify the {item, nest}_coef_variation_dict to initiate the model.')
        if (nest_num_param_dict is None) or (item_num_param_dict is None):
            raise ValueError('You should specify the {item, nest}_num_param_dict to initiate the model.')

    super(NestedLogitModel, self).__init__()
    self.nest_to_item = nest_to_item
    self.nest_coef_variation_dict = nest_coef_variation_dict
    self.nest_num_param_dict = nest_num_param_dict
    self.item_coef_variation_dict = item_coef_variation_dict
    self.item_num_param_dict = item_num_param_dict
    self.num_users = num_users

    self.nests = list(nest_to_item.keys())
    self.num_nests = len(self.nests)
    self.num_items = sum(len(items) for items in nest_to_item.values())

    # nest coefficients.
    self.nest_coef_dict = self._build_coef_dict(self.nest_coef_variation_dict,
                                                self.nest_num_param_dict,
                                                self.num_nests,
                                                weight_initialization=deepcopy(nest_weight_initialization))

    # item coefficients.
    self.item_coef_dict = self._build_coef_dict(self.item_coef_variation_dict,
                                                self.item_num_param_dict,
                                                self.num_items,
                                                weight_initialization=deepcopy(item_weight_initialization))

    self.shared_lambda = shared_lambda
    if self.shared_lambda:
        self.lambda_weight = nn.Parameter(torch.ones(1), requires_grad=True)
    else:
        self.lambda_weight = nn.Parameter(torch.ones(self.num_nests) / 2, requires_grad=True)
    # breakpoint()
    # self.iv_weights = nn.Parameter(torch.ones(1), requires_grad=True)
    # used to warn users if forgot to call clamp.
    self._clamp_called_flag = True

    self.regularization = regularization
    assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
    self.regularization_weight = regularization_weight
    if (self.regularization is not None) and (self.regularization_weight is None):
        raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
    if (self.regularization is None) and (self.regularization_weight is not None):
        raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')

    self.model_outside_option = model_outside_option

forward(self, batch)

An standard forward method for the model, the user feeds a ChoiceDataset batch and the model returns the predicted log-likelihood tensor. The main forward passing happens in the _forward() method, but we provide this wrapper forward() method for a cleaner API, as forward() only requires a single batch argument. For more details about the forward passing, please refer to the _forward() method.

TODO: the ConditionalLogitModel returns predicted utility, the NestedLogitModel behaves the same?

Parameters:

Name Type Description Default
batch ChoiceDataset

a ChoiceDataset object containing the data batch.

required

Returns:

Type Description
torch.Tensor

a tensor of shape (num_trips, num_items) including the log probability of choosing item i in trip t.

Source code in torch_choice/model/nested_logit_model.py
def forward(self, batch: ChoiceDataset) -> torch.Tensor:
    """An standard forward method for the model, the user feeds a ChoiceDataset batch and the model returns the
        predicted log-likelihood tensor. The main forward passing happens in the _forward() method, but we provide
        this wrapper forward() method for a cleaner API, as forward() only requires a single batch argument.
        For more details about the forward passing, please refer to the _forward() method.

    # TODO: the ConditionalLogitModel returns predicted utility, the NestedLogitModel behaves the same?

    Args:
        batch (ChoiceDataset): a ChoiceDataset object containing the data batch.

    Returns:
        torch.Tensor: a tensor of shape (num_trips, num_items) including the log probability
        of choosing item i in trip t.
    """
    return self._forward(batch['nest'].x_dict,
                         batch['item'].x_dict,
                         batch['item'].user_index,
                         batch['item'].item_availability)

get_coefficient(self, variable, level=None)

Retrieve the coefficient tensor for the given variable.

Parameters:

Name Type Description Default
variable str

the variable name.

required
level str

from which level of model to extract the coefficient, can be 'item' or 'nest'. The level argument will be discarded if variable is lambda.

None

Returns:

Type Description
torch.Tensor

the corresponding coefficient tensor of the requested variable.

Source code in torch_choice/model/nested_logit_model.py
def get_coefficient(self, variable: str, level: Optional[str] = None) -> torch.Tensor:
    """Retrieve the coefficient tensor for the given variable.

    Args:
        variable (str): the variable name.
        level (str): from which level of model to extract the coefficient, can be 'item' or 'nest'. The `level` argument will be discarded if `variable` is `lambda`.

    Returns:
        torch.Tensor: the corresponding coefficient tensor of the requested variable.
    """
    if variable == 'lambda':
        return self.lambda_weight.detach().clone()

    if level not in ['item', 'nest']:
        raise ValueError(f"Level should be either 'item' or 'nest', got {level}.")

    return self.state_dict()[f'{level}_coef_dict.{variable}.coef'].detach().clone()

log_likelihood(self, *args)

Computes the log likelihood of the model, please refer to the negative_log_likelihood() method.

Returns:

Type Description
_type_

the log likelihood of the model.

Source code in torch_choice/model/nested_logit_model.py
def log_likelihood(self, *args):
    """Computes the log likelihood of the model, please refer to the negative_log_likelihood() method.

    Returns:
        _type_: the log likelihood of the model.
    """
    return - self.negative_log_likelihood(*args)

loss(self, *args, **kwargs)

The loss function to be optimized. This is a wrapper of negative_log_likelihood + regularization loss if required.

Source code in torch_choice/model/nested_logit_model.py
def loss(self, *args, **kwargs):
    """The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
    nll = self.negative_log_likelihood(*args, **kwargs)
    if self.regularization is not None:
        L = {'L1': 1, 'L2': 2}[self.regularization]
        for name, param in self.named_parameters():
            if name == 'lambda_weight':
                # we don't regularize the lambda term, we only regularize coefficients.
                continue
            nll += self.regularization_weight * torch.norm(param, p=L)
    return nll

negative_log_likelihood(self, batch, y, is_train=True)

Computes the negative log likelihood of the model. Please note the log-likelihood is summed over all samples in batch instead of the average.

Parameters:

Name Type Description Default
batch ChoiceDataset

the ChoiceDataset object containing the data.

required
y torch.LongTensor

the label.

required
is_train bool

which mode of the model to be used for the forward passing, if we need Hessian of the NLL through auto-grad, is_train should be set to True. If we merely need a performance metric, then is_train can be set to False for better performance. Defaults to True.

True

Returns:

Type Description
torch.scalar_tensor

the negative log likelihood of the model.

Source code in torch_choice/model/nested_logit_model.py
def negative_log_likelihood(self,
                            batch: ChoiceDataset,
                            y: torch.LongTensor,
                            is_train: bool=True) -> torch.scalar_tensor:
    """Computes the negative log likelihood of the model. Please note the log-likelihood is summed over all samples
        in batch instead of the average.

    Args:
        batch (ChoiceDataset): the ChoiceDataset object containing the data.
        y (torch.LongTensor): the label.
        is_train (bool, optional): which mode of the model to be used for the forward passing, if we need Hessian
            of the NLL through auto-grad, `is_train` should be set to True. If we merely need a performance metric,
            then `is_train` can be set to False for better performance.
            Defaults to True.

    Returns:
        torch.scalar_tensor: the negative log likelihood of the model.
    """
    # compute the negative log-likelihood loss directly.
    if is_train:
        self.train()
    else:
        self.eval()
    # (num_trips, num_items)
    logP = self.forward(batch)
    # check shapes
    if self.model_outside_option:
        assert logP.shape == (len(batch['item']), self.num_items+1)
    else:
        assert logP.shape == (len(batch['item']), self.num_items)
    # since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
    # indexing should correctly retrieve the log-likelihood even for outside options.
    nll = - logP[torch.arange(len(y)), y].sum()
    return nll