API Reference: Torch Choice
Source code in torch_choice/data/choice_dataset.py
class ChoiceDataset(torch.utils.data.Dataset):
def __init__(self,
item_index: torch.LongTensor,
num_items: int = None,
num_users: int = None,
num_sessions: int = None,
label: Optional[torch.LongTensor] = None,
user_index: Optional[torch.LongTensor] = None,
session_index: Optional[torch.LongTensor] = None,
item_availability: Optional[torch.BoolTensor] = None,
**kwargs) -> None:
"""
Initialization methods for the dataset object, researchers should supply all information about the dataset
using this initialization method.
The number of choice instances are called `batch_size` in the documentation. The `batch_size` corresponds to the
file length in wide-format dataset, and often denoted using `N`. We call it `batch_size` to follow the convention
in machine learning literature.
A `choice instance` is a row of the dataset, so there are `batch_size` choice instances in each `ChoiceDataset`.
The dataset consists of:
(1) a collection of `batch_size` tuples (item_id, user_id, session_id, label), where each tuple is a choice instance.
(2) a collection of `observables` associated with item, user, session, etc.
Args:
item_index (torch.LongTensor): a tensor of shape (batch_size) indicating the relevant item in each row
of the dataset, the relevant item can be:
(1) the item bought in this choice instance,
(2) or the item reviewed by the user. In the later case, we need the `label` tensor to specify the rating score.
NOTE: The support for second case is under-development, currently, we are only supporting binary label.
num_items (Optional[int]): the number of items in the dataset. If `None` is provided (default), the number of items will be inferred from the number of unique numbers in `item_index`.
num_users (Optional[int]): the number of users in the dataset. If `None` is provided (default), the number of users will be inferred from the number of unique numbers in `user_index`.
num_sessions (Optional[int]): the number of sessions in the dataset. If `None` is provided (default), the number of sessions will be inferred from the number of unique numbers in `session_index`.
label (Optional[torch.LongTensor], optional): a tensor of shape (batch_size) indicating the label for prediction in
each choice instance. While you want to predict the item bought, you can leave the `label` argument
as `None` in the initialization method, and the model will use `item_index` as the object to be predicted.
But if you are, for example, predicting the rating an user gave an item, label must be provided.
Defaults to None.
user_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
the ID of the user who was involved in each choice instance. If `None` user index is provided, it's assumed
that the choice instances are from the same user.
`user_index` is required if and only if there are multiple users in the dataset, for example:
(1) user-observables is involved in the utility form,
(2) and/or the coefficient is user-specific.
This tensor is used to select the corresponding user observables and coefficients assigned to the
user (like theta_user) for making prediction for that purchase.
Defaults to None.
session_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
the ID of the session when that choice instance occurred. This tensor is used to select the correct
session observables or price observables for making prediction for that choice instance. Therefore, if
there is no session/price observables, you can leave this argument as `None`. In this case, the `ChoiceDataset`
object will assume each choice instance to be in its own session.
Defaults to None.
item_availability (Optional[torch.BoolTensor], optional): A boolean tensor of shape (num_sessions, num_items)
indicating the availability of each item in each session. Utilities of unavailable items would be set to -infinite,
and hence these unavailable items will be set to 0 while making prediction.
We assume all items are available if set to None.
Defaults to None.
Other Kwargs (Observables):
One can specify the following types of observables, where * in shape denotes any positive
integer. Typically * represents the number of observables.
Please refer to the documentation for a detailed guide to use observables.
1. user observables must start with 'user_' and have shape (num_users, *)
2. item observables must start with 'item_' and have shape (num_items, *)
3. session observables must start with 'session_' and have shape (num_sessions, *)
4. taste observables (those vary by user and item) must start with `taste_` and have shape
(num_users, num_items, *).
NOTE: we don't recommend using taste observables, because num_users * num_items is potentially large.
5. price observables (those vary by session and item) must start with `price_` and have
shape (num_sessions, num_items, *)
6. itemsession observables starting with `itemsession_`, this is a more intuitive alias to the price
observable.
"""
# ENHANCEMENT(Tianyu): add item_names for summary.
super(ChoiceDataset, self).__init__()
self.label = label
self.item_index = item_index
self._num_items = num_items
self._num_users = num_users
self._num_sessions = num_sessions
self.user_index = user_index
self.session_index = session_index
if self.session_index is None:
# if any([x.startswith('session_') or x.startswith('price_') for x in kwargs.keys()]):
# if any session sensitive observable is provided, but session index is not,
# infer each row in the dataset to be a session.
# TODO: (design choice) should we assign unique session index to each choice instance or the same session index.
print('No `session_index` is provided, assume each choice instance is in its own session.')
self.session_index = torch.arange(len(self.item_index)).long()
self.item_availability = item_availability
for key, item in kwargs.items():
if self._is_attribute(key):
# all observable should be float.
item = item.float()
setattr(self, key, item)
# TODO: add a validation procedure to check the consistency of the dataset.
def __getitem__(self, indices: Union[int, torch.LongTensor]) -> "ChoiceDataset":
"""Retrieves samples corresponding to the provided index or list of indices.
Args:
indices (Union[int, torch.LongTensor]): a single integer index or a tensor of indices.
Returns:
ChoiceDataset: a subset of the dataset.
"""
if isinstance(indices, int):
# convert single integer index to an array of indices.
indices = torch.LongTensor([indices])
new_dict = dict()
new_dict['item_index'] = self.item_index[indices].clone()
# copy optional attributes.
new_dict['label'] = self.label[indices].clone() if self.label is not None else None
new_dict['user_index'] = self.user_index[indices].clone() if self.user_index is not None else None
new_dict['session_index'] = self.session_index[indices].clone() if self.session_index is not None else None
# item_availability has shape (num_sessions, num_items), no need to re-index it.
new_dict['item_availability'] = self.item_availability
# copy other attributes.
for key, val in self.__dict__.items():
if key not in new_dict.keys():
if torch.is_tensor(val):
new_dict[key] = val.clone()
else:
new_dict[key] = copy.deepcopy(val)
subset = self._from_dict(new_dict)
# make sure the new dataset inherits the num_sessions, num_items, and num_users from parent.
subset._num_users = self.num_users
subset._num_items = self.num_items
subset._num_sessions = self.num_sessions
return subset
def __len__(self) -> int:
"""Returns number of samples in this dataset.
Returns:
int: length of the dataset.
"""
return len(self.item_index)
def __contains__(self, key: str) -> bool:
return key in self.keys
def __eq__(self, other: "ChoiceDataset") -> bool:
"""Returns whether all tensor attributes of both ChoiceDatasets are equal."""
if not isinstance(other, ChoiceDataset):
raise TypeError('You can only compare with ChoiceDataset objects.')
else:
flag = True
for key, val in self.__dict__.items():
if torch.is_tensor(val):
# ignore NaNs while comparing.
if not torch.equal(torch.nan_to_num(val), torch.nan_to_num(other.__dict__[key])):
print('Attribute {} is not equal.'.format(key))
flag = False
return flag
@property
def device(self) -> str:
"""Returns the device of the dataset.
Returns:
str: the device of the dataset.
"""
for attr in self.__dict__.values():
if torch.is_tensor(attr):
return attr.device
@property
def num_users(self) -> int:
"""Returns number of users involved in this dataset, returns 1 if there is no user identity.
Returns:
int: the number of users involved in this dataset.
"""
if self._num_users is not None:
return self._num_users
elif self.user_index is not None:
num_unique = len(torch.unique(self.user_index))
expected_num_users = int(self.user_index.max()) + 1
if num_unique != expected_num_users:
warnings.warn(f"The number of users is inferred from the number of unique users in the user_index tensor. The user_index tensor in the ChoiceDataset ranges from {int(self.user_index.min())} to {int(self.user_index.max())}. The ChoiceDataset assumes user_index to be 0-indexed and encoded using consecutive integers. There are {expected_num_users} users expected given max(user_index). However, there are {num_unique} unique values in the user_index . This could be caused by missing users in the dataset (i.e., some users are not in user_index at all). If this is not expected, please check the user_index tensor. For a safer behavior, please provide the number of users explicitly by using the num_users keyword while initializing the ChoiceDataset class.")
else:
warnings.warn(f"The number of users is inferred from the number of unique users in the user_index tensor. This might lead to unexpected behaviors if some users never appeared in the user_index tensor. For a safer behavior, please provide the number of users explicitly by using the num_users keyword while initializing the ChoiceDataset class.")
# infer from the number of unique users using the user_index.
return len(torch.unique(self.user_index))
else:
return 1
@property
def num_items(self) -> int:
"""Returns the number of items involved in this dataset.
Returns:
int: the number of items involved in this dataset.
"""
if self._num_items is not None:
# return the _num_items provided in the constructor.
return self._num_items
else:
# infer the number of items from item_index.
# the -1 is an optional special symbol for outside option, do not count it towards the number of items.
num_unique = len(torch.unique(self.item_index[self.item_index != -1]))
expected_num_items = int(self.item_index[self.item_index != -1].max()) + 1
if num_unique != expected_num_items:
warnings.warn(f"The number of items is inferred from the number of unique items, excluding -1's denoting outside options, in the item_index tensor. The item_index tensor in the ChoiceDataset ranges from {int(self.item_index[self.item_index != -1].min())} to {int(self.item_index[self.item_index != -1].max())}, excluding -1's. The ChoiceDataset assumes item_index to be 0-indexed and encoded using consecutive integers. There are {expected_num_items} items expected given max(item_index). However, there are {num_unique} unique values in item_index. This could be caused by missing items in the dataset (i.e., some items are not in item_index at all). If this is not expected, please check the item_index tensor. For a safer behavior, please provide the number of items explicitly by using the num_items keyword while initializing the ChoiceDataset class.")
else:
warnings.warn(f"The number of items is inferred from the number of unique items, excluding -1's denoting outside options, in the item_index tensor. This might lead to unexpected behaviors if some items never appeared in the item_index tensor. For a safer behavior, please provide the number of items explicitly by using the num_items keyword while initializing the ChoiceDataset class.")
return len(torch.unique(self.item_index[self.item_index != -1]))
@property
def num_sessions(self) -> int:
"""Returns the number of sessions involved in this dataset.
Returns:
int: the number of sessions involved in this dataset.
"""
if self._num_sessions is not None:
# return the _num_sessions provided in the constructor.
return self._num_sessions
else:
num_unique = len(torch.unique(self.session_index))
expected_num_sessions = int(self.session_index.max()) + 1
if num_unique != expected_num_sessions:
warnings.warn(f"The number of sessions is inferred from the number of unique sessions in the session_index tensor. The session_index tensor in the ChoiceDataset ranges from {int(self.session_index.min())} to {int(self.session_index.max())}. The ChoiceDataset assumes session_index to be 0-indexed and encoded using consecutive integers. There are {expected_num_sessions} sessions expected given max(session_index). However, there are {num_unique} unique values in the session_index . This could be caused by missing sessions in the dataset (i.e., some sessions are not in session_index at all). If this is not expected, please check the session_index tensor. For a safer behavior, please provide the number of sessions explicitly by using the num_sessions keyword while initializing the ChoiceDataset class.")
else:
warnings.warn(f"The number of sessions is inferred from the number of unique sessions in the session_index tensor. This might lead to unexpected behaviors if some sessions never appeared in the session_index tensor. For a safer behavior, please provide the number of sessions explicitly by using the num_sessions keyword while initializing the ChoiceDataset class.")
# infer the number of sessions from session_index.
return len(torch.unique(self.session_index))
@property
def x_dict(self) -> Dict[object, torch.Tensor]:
"""Formats attributes of in this dataset into shape (num_sessions, num_items, num_params) and returns in a dictionary format.
Models in this package are expecting this dictionary based data format.
Returns:
Dict[object, torch.Tensor]: a dictionary with attribute names in the dataset as keys, and reshaped attribute
tensors as values.
"""
out = dict()
for key, val in self.__dict__.items():
if self._is_attribute(key): # only include attributes.
out[key] = self._expand_tensor(key, val) # reshape to (num_sessions, num_items, num_params).
return out
@classmethod
def _from_dict(cls, dictionary: Dict[str, torch.tensor]) -> "ChoiceDataset":
"""Creates an instance of ChoiceDataset from a dictionary of arguments.
Args:
dictionary (Dict[str, torch.tensor]): a dictionary with keys as argument names and values as arguments.
Returns:
ChoiceDataset: the created copy of dataset.
"""
dataset = cls(**dictionary)
for key, item in dictionary.items():
setattr(dataset, key, item)
return dataset
def apply_tensor(self, func: callable) -> "ChoiceDataset":
"""This s a helper method to apply the provided function to all tensors and tensor values of all dictionaries.
Args:
func (callable): a callable function to be applied on tensors and tensor-values of dictionaries.
Returns:
ChoiceDataset: the modified dataset.
"""
for key, item in self.__dict__.items():
if torch.is_tensor(item):
setattr(self, key, func(item))
# boardcast func to dictionary of tensors as well.
elif isinstance(getattr(self, key), dict):
for obj_key, obj_item in getattr(self, key).items():
if torch.is_tensor(obj_item):
setattr(getattr(self, key), obj_key, func(obj_item))
return self
def to(self, device: Union[str, torch.device]) -> "ChoiceDataset":
"""Moves all tensors in this dataset to the specified PyTorch device.
Args:
device (Union[str, torch.device]): the destination device.
Returns:
ChoiceDataset: the modified dataset on the new device.
"""
return self.apply_tensor(lambda x: x.to(device))
def clone(self) -> "ChoiceDataset":
"""Creates a copy of self.
Returns:
ChoiceDataset: a copy of self.
"""
dictionary = {}
for k, v in self.__dict__.items():
if torch.is_tensor(v):
dictionary[k] = v.clone()
else:
dictionary[k] = copy.deepcopy(v)
new = self.__class__._from_dict(dictionary)
new._num_users = self.num_users
new._num_items = self.num_items
new._num_sessions = self.num_sessions
return new
def _check_device_consistency(self) -> None:
"""Checks if all tensors in this dataset are on the same device.
Raises:
Exception: an exception is raised if not all tensors are on the same device.
"""
# assert all tensors are on the same device.
devices = list()
for val in self.__dict__.values():
if torch.is_tensor(val):
devices.append(val.device)
if len(set(devices)) > 1:
raise Exception(f'Found tensors on different devices: {set(devices)}.',
'Use dataset.to() method to align devices.')
def _size_repr(self, value: object) -> List[int]:
"""A helper method to get the string-representation of object sizes, this is helpful while constructing the
string representation of the dataset.
Args:
value (object): an object to examine its size.
Returns:
List[int]: list of integers representing the size of the object, length of the list is equal to dimension of `value`.
"""
if torch.is_tensor(value):
return list(value.size())
elif isinstance(value, int) or isinstance(value, float):
return [1]
elif isinstance(value, list) or isinstance(value, tuple):
return [len(value)]
else:
return []
def __repr__(self) -> str:
"""A method to get a string representation of the dataset.
Returns:
str: the string representation of the dataset.
"""
# don't print shapes of internal attributes like _num_users and _num_items.
info = [f'{key}={self._size_repr(item)}' for key, item in self.__dict__.items() if not key.startswith('_')]
return f"{self.__class__.__name__}(num_items={self.num_items}, num_users={self.num_users}, num_sessions={self.num_sessions}, {', '.join(info)}, device={self.device})"
# ==================================================================================================================
# methods for checking attribute categories.
# ==================================================================================================================
@staticmethod
def _is_item_attribute(key: str) -> bool:
return key.startswith('item_') and (key != 'item_availability') and (key != 'item_index')
@staticmethod
def _is_user_attribute(key: str) -> bool:
return key.startswith('user_') and (key != 'user_index')
@staticmethod
def _is_session_attribute(key: str) -> bool:
return key.startswith('session_') and (key != 'session_index')
@staticmethod
def _is_useritem_attribute(key: str) -> bool:
return key.startswith('useritem_') or key.startswith('itemuser_')
@staticmethod
def _is_price_attribute(key: str) -> bool:
return key.startswith('price_') or key.startswith('itemsession_') or key.startswith('sessionitem_')
@staticmethod
def _is_usersession_attribute(key: str) -> bool:
return key.startswith('usersession_') or key.startswith('sessionuser_')
@staticmethod
def _is_usersessionitem_attribute(key: str) -> bool:
return key.startswith('usersessionitem_') or key.startswith('useritemsession_') \
or key.startswith('itemusersession_') or key.startswith('itemsessionuser_') \
or key.startswith('sessionuseritem_') or key.startswith('sessionitemuser_')
def _is_attribute(self, key: str) -> bool:
return self._is_item_attribute(key) \
or self._is_user_attribute(key) \
or self._is_session_attribute(key) \
or self._is_useritem_attribute(key) \
or self._is_price_attribute(key) \
or self._is_usersession_attribute(key) \
or self._is_usersessionitem_attribute(key)
def _expand_tensor(self, key: str, val: torch.Tensor) -> torch.Tensor:
"""Expands attribute tensor to (len(self), num_items, num_params) shape for prediction tasks, this method
won't reshape the tensor at all if the `key` (i.e., name of the tensor) suggests its not an attribute of any kind.
Args:
key (str): name of the attribute used to determine the raw shape of the tensor. For example, 'item_obs' means
the raw tensor is in shape (num_items, num_params).
val (torch.Tensor): the attribute tensor to be reshaped.
Returns:
torch.Tensor: the reshaped tensor with shape (num_sessions, num_items, num_params).
"""
if not self._is_attribute(key):
# this is a sanity check.
raise ValueError(f'Warning: the input key {key} is not an attribute of the dataset, will NOT modify the provided tensor.')
num_params = val.shape[-1] # the number of parameters/coefficients/observables.
# convert attribute tensors to (len(self), num_items, num_params) shape.
if self._is_user_attribute(key):
# user_attribute (num_users, *)
out = val[self.user_index, :].view(
len(self), 1, num_params).expand(-1, self.num_items, -1)
elif self._is_item_attribute(key):
# item_attribute (num_items, *)
out = val.view(1, self.num_items, num_params).expand(
len(self), -1, -1)
elif self._is_useritem_attribute(key):
# useritem_attribute (num_users, num_items, *)
out = val[self.user_index, :, :]
elif self._is_session_attribute(key):
# session_attribute (num_sessions, *)
out = val[self.session_index, :].view(
len(self), 1, num_params).expand(-1, self.num_items, -1)
elif self._is_price_attribute(key):
# price_attribute (num_sessions, num_items, *)
out = val[self.session_index, :, :]
elif self._is_usersession_attribute(key):
# user-session (num_users, num_sessions, *)
out = val[self.user_index, self.session_index, :] # (len(self), *)
out = out.view(len(self), 1, num_params).expand(-1, self.num_items, -1) # (len(self), num_items, *)
elif self._is_usersessionitem_attribute(key):
# usersessionitem_attribute has shape (num_users, num_sessions, num_items, *)
out = val[self.user_index, self.session_index, :, :] # (len(self), num_items, *)
else:
raise ValueError(f'Warning: the input key {key} is not an attribute of the dataset, will NOT modify the provided tensor.')
assert out.shape == (len(self), self.num_items, num_params), f'Error: the output shape {out.shape} is not correct, expected: {(len(self), self.num_items, num_params)}.'
return out
@staticmethod
def unique(tensor: torch.Tensor) -> Tuple[np.ndarray]:
arr = tensor.cpu().numpy()
unique, counts = np.unique(arr, return_counts=True)
count_sort_ind = np.argsort(-counts)
unique = unique[count_sort_ind]
counts = counts[count_sort_ind]
return unique, counts
def summary(self) -> None:
"""A method to summarize the dataset.
Returns:
str: the string representation of the dataset.
"""
summary = ['ChoiceDataset with {} sessions, {} items, {} users, {} purchase records (observations) .'.format(
self.num_sessions, self.num_items, self.num_users if self.user_index is not None else 'single', len(self))]
# summarize users.
if self.user_index is not None:
unique, counts = self.unique(self.user_index)
summary.append(f"The most frequent user is {unique[0]} with {counts[0]} observations; the least frequent user is {unique[-1]} with {counts[-1]} observations; on average, there are {counts.astype(float).mean():.2f} observations per user.")
N = len(unique)
K = min(5, N)
string = f'{K} most frequent users are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
summary.append(string)
string = f'{K} least frequent users are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
summary.append(string)
# summarize items.
unique, counts = self.unique(self.item_index)
N = len(unique)
K = min(5, N)
summary.append(f"The most frequent item is {unique[0]}, it was chosen {counts[0]} times; the least frequent item is {unique[-1]} it was {counts[-1]} times; on average, each item was purchased {counts.astype(float).mean():.2f} times.")
string = f'{K} most frequent items are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
summary.append(string)
string = f'{K} least frequent items are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
summary.append(string)
summary.append('Attribute Summaries:')
for key, item in self.__dict__.items():
if self._is_attribute(key) and torch.is_tensor(item):
summary.append("Observable Tensor '{}' with shape {}".format(key, item.shape))
# price attributes are 3-dimensional tensors, ignore for cleanness here.
if (not self._is_price_attribute(key)) and (not self._is_usersessionitem_attribute(key)) and (not self._is_useritem_attribute(key)) and (not self._is_usersession_attribute(key)):
summary.append(str(pd.DataFrame(item.to('cpu').float().numpy()).describe()))
print('\n'.join(summary) + f"\ndevice={self.device}")
return None
device: str
property
readonly
Returns the device of the dataset.
Returns:
Type | Description |
---|---|
str |
the device of the dataset. |
num_items: int
property
readonly
Returns the number of items involved in this dataset.
Returns:
Type | Description |
---|---|
int |
the number of items involved in this dataset. |
num_sessions: int
property
readonly
Returns the number of sessions involved in this dataset.
Returns:
Type | Description |
---|---|
int |
the number of sessions involved in this dataset. |
num_users: int
property
readonly
Returns number of users involved in this dataset, returns 1 if there is no user identity.
Returns:
Type | Description |
---|---|
int |
the number of users involved in this dataset. |
x_dict: Dict[object, torch.Tensor]
property
readonly
Formats attributes of in this dataset into shape (num_sessions, num_items, num_params) and returns in a dictionary format. Models in this package are expecting this dictionary based data format.
Returns:
Type | Description |
---|---|
Dict[object, torch.Tensor] |
a dictionary with attribute names in the dataset as keys, and reshaped attribute tensors as values. |
__eq__(self, other)
special
Returns whether all tensor attributes of both ChoiceDatasets are equal.
Source code in torch_choice/data/choice_dataset.py
def __eq__(self, other: "ChoiceDataset") -> bool:
"""Returns whether all tensor attributes of both ChoiceDatasets are equal."""
if not isinstance(other, ChoiceDataset):
raise TypeError('You can only compare with ChoiceDataset objects.')
else:
flag = True
for key, val in self.__dict__.items():
if torch.is_tensor(val):
# ignore NaNs while comparing.
if not torch.equal(torch.nan_to_num(val), torch.nan_to_num(other.__dict__[key])):
print('Attribute {} is not equal.'.format(key))
flag = False
return flag
__getitem__(self, indices)
special
Retrieves samples corresponding to the provided index or list of indices.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
indices |
Union[int, torch.LongTensor] |
a single integer index or a tensor of indices. |
required |
Returns:
Type | Description |
---|---|
ChoiceDataset |
a subset of the dataset. |
Source code in torch_choice/data/choice_dataset.py
def __getitem__(self, indices: Union[int, torch.LongTensor]) -> "ChoiceDataset":
"""Retrieves samples corresponding to the provided index or list of indices.
Args:
indices (Union[int, torch.LongTensor]): a single integer index or a tensor of indices.
Returns:
ChoiceDataset: a subset of the dataset.
"""
if isinstance(indices, int):
# convert single integer index to an array of indices.
indices = torch.LongTensor([indices])
new_dict = dict()
new_dict['item_index'] = self.item_index[indices].clone()
# copy optional attributes.
new_dict['label'] = self.label[indices].clone() if self.label is not None else None
new_dict['user_index'] = self.user_index[indices].clone() if self.user_index is not None else None
new_dict['session_index'] = self.session_index[indices].clone() if self.session_index is not None else None
# item_availability has shape (num_sessions, num_items), no need to re-index it.
new_dict['item_availability'] = self.item_availability
# copy other attributes.
for key, val in self.__dict__.items():
if key not in new_dict.keys():
if torch.is_tensor(val):
new_dict[key] = val.clone()
else:
new_dict[key] = copy.deepcopy(val)
subset = self._from_dict(new_dict)
# make sure the new dataset inherits the num_sessions, num_items, and num_users from parent.
subset._num_users = self.num_users
subset._num_items = self.num_items
subset._num_sessions = self.num_sessions
return subset
__init__(self, item_index, num_items=None, num_users=None, num_sessions=None, label=None, user_index=None, session_index=None, item_availability=None, **kwargs)
special
Initialization methods for the dataset object, researchers should supply all information about the dataset using this initialization method.
The number of choice instances are called batch_size
in the documentation. The batch_size
corresponds to the
file length in wide-format dataset, and often denoted using N
. We call it batch_size
to follow the convention
in machine learning literature.
A choice instance
is a row of the dataset, so there are batch_size
choice instances in each ChoiceDataset
.
The dataset consists of:
(1) a collection of batch_size
tuples (item_id, user_id, session_id, label), where each tuple is a choice instance.
(2) a collection of observables
associated with item, user, session, etc.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_index |
torch.LongTensor |
a tensor of shape (batch_size) indicating the relevant item in each row
of the dataset, the relevant item can be:
(1) the item bought in this choice instance,
(2) or the item reviewed by the user. In the later case, we need the |
required |
num_items |
Optional[int] |
the number of items in the dataset. If |
None |
num_users |
Optional[int] |
the number of users in the dataset. If |
None |
num_sessions |
Optional[int] |
the number of sessions in the dataset. If |
None |
label |
Optional[torch.LongTensor] |
a tensor of shape (batch_size) indicating the label for prediction in
each choice instance. While you want to predict the item bought, you can leave the |
None |
user_index |
Optional[torch.LongTensor] |
a tensor of shape num_purchases (batch_size) indicating
the ID of the user who was involved in each choice instance. If |
None |
session_index |
Optional[torch.LongTensor] |
a tensor of shape num_purchases (batch_size) indicating
the ID of the session when that choice instance occurred. This tensor is used to select the correct
session observables or price observables for making prediction for that choice instance. Therefore, if
there is no session/price observables, you can leave this argument as |
None |
item_availability |
Optional[torch.BoolTensor] |
A boolean tensor of shape (num_sessions, num_items) indicating the availability of each item in each session. Utilities of unavailable items would be set to -infinite, and hence these unavailable items will be set to 0 while making prediction. We assume all items are available if set to None. Defaults to None. |
None |
Other Kwargs (Observables):
One can specify the following types of observables, where * in shape denotes any positive
integer. Typically * represents the number of observables.
Please refer to the documentation for a detailed guide to use observables.
1. user observables must start with 'user_' and have shape (num_users, )
2. item observables must start with 'item_' and have shape (num_items, )
3. session observables must start with 'session_' and have shape (num_sessions, )
4. taste observables (those vary by user and item) must start with taste_
and have shape
(num_users, num_items, ).
NOTE: we don't recommend using taste observables, because num_users * num_items is potentially large.
5. price observables (those vary by session and item) must start with price_
and have
shape (num_sessions, num_items, *)
6. itemsession observables starting with itemsession_
, this is a more intuitive alias to the price
observable.
Source code in torch_choice/data/choice_dataset.py
def __init__(self,
item_index: torch.LongTensor,
num_items: int = None,
num_users: int = None,
num_sessions: int = None,
label: Optional[torch.LongTensor] = None,
user_index: Optional[torch.LongTensor] = None,
session_index: Optional[torch.LongTensor] = None,
item_availability: Optional[torch.BoolTensor] = None,
**kwargs) -> None:
"""
Initialization methods for the dataset object, researchers should supply all information about the dataset
using this initialization method.
The number of choice instances are called `batch_size` in the documentation. The `batch_size` corresponds to the
file length in wide-format dataset, and often denoted using `N`. We call it `batch_size` to follow the convention
in machine learning literature.
A `choice instance` is a row of the dataset, so there are `batch_size` choice instances in each `ChoiceDataset`.
The dataset consists of:
(1) a collection of `batch_size` tuples (item_id, user_id, session_id, label), where each tuple is a choice instance.
(2) a collection of `observables` associated with item, user, session, etc.
Args:
item_index (torch.LongTensor): a tensor of shape (batch_size) indicating the relevant item in each row
of the dataset, the relevant item can be:
(1) the item bought in this choice instance,
(2) or the item reviewed by the user. In the later case, we need the `label` tensor to specify the rating score.
NOTE: The support for second case is under-development, currently, we are only supporting binary label.
num_items (Optional[int]): the number of items in the dataset. If `None` is provided (default), the number of items will be inferred from the number of unique numbers in `item_index`.
num_users (Optional[int]): the number of users in the dataset. If `None` is provided (default), the number of users will be inferred from the number of unique numbers in `user_index`.
num_sessions (Optional[int]): the number of sessions in the dataset. If `None` is provided (default), the number of sessions will be inferred from the number of unique numbers in `session_index`.
label (Optional[torch.LongTensor], optional): a tensor of shape (batch_size) indicating the label for prediction in
each choice instance. While you want to predict the item bought, you can leave the `label` argument
as `None` in the initialization method, and the model will use `item_index` as the object to be predicted.
But if you are, for example, predicting the rating an user gave an item, label must be provided.
Defaults to None.
user_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
the ID of the user who was involved in each choice instance. If `None` user index is provided, it's assumed
that the choice instances are from the same user.
`user_index` is required if and only if there are multiple users in the dataset, for example:
(1) user-observables is involved in the utility form,
(2) and/or the coefficient is user-specific.
This tensor is used to select the corresponding user observables and coefficients assigned to the
user (like theta_user) for making prediction for that purchase.
Defaults to None.
session_index (Optional[torch.LongTensor], optional): a tensor of shape num_purchases (batch_size) indicating
the ID of the session when that choice instance occurred. This tensor is used to select the correct
session observables or price observables for making prediction for that choice instance. Therefore, if
there is no session/price observables, you can leave this argument as `None`. In this case, the `ChoiceDataset`
object will assume each choice instance to be in its own session.
Defaults to None.
item_availability (Optional[torch.BoolTensor], optional): A boolean tensor of shape (num_sessions, num_items)
indicating the availability of each item in each session. Utilities of unavailable items would be set to -infinite,
and hence these unavailable items will be set to 0 while making prediction.
We assume all items are available if set to None.
Defaults to None.
Other Kwargs (Observables):
One can specify the following types of observables, where * in shape denotes any positive
integer. Typically * represents the number of observables.
Please refer to the documentation for a detailed guide to use observables.
1. user observables must start with 'user_' and have shape (num_users, *)
2. item observables must start with 'item_' and have shape (num_items, *)
3. session observables must start with 'session_' and have shape (num_sessions, *)
4. taste observables (those vary by user and item) must start with `taste_` and have shape
(num_users, num_items, *).
NOTE: we don't recommend using taste observables, because num_users * num_items is potentially large.
5. price observables (those vary by session and item) must start with `price_` and have
shape (num_sessions, num_items, *)
6. itemsession observables starting with `itemsession_`, this is a more intuitive alias to the price
observable.
"""
# ENHANCEMENT(Tianyu): add item_names for summary.
super(ChoiceDataset, self).__init__()
self.label = label
self.item_index = item_index
self._num_items = num_items
self._num_users = num_users
self._num_sessions = num_sessions
self.user_index = user_index
self.session_index = session_index
if self.session_index is None:
# if any([x.startswith('session_') or x.startswith('price_') for x in kwargs.keys()]):
# if any session sensitive observable is provided, but session index is not,
# infer each row in the dataset to be a session.
# TODO: (design choice) should we assign unique session index to each choice instance or the same session index.
print('No `session_index` is provided, assume each choice instance is in its own session.')
self.session_index = torch.arange(len(self.item_index)).long()
self.item_availability = item_availability
for key, item in kwargs.items():
if self._is_attribute(key):
# all observable should be float.
item = item.float()
setattr(self, key, item)
# TODO: add a validation procedure to check the consistency of the dataset.
__len__(self)
special
__repr__(self)
special
A method to get a string representation of the dataset.
Returns:
Type | Description |
---|---|
str |
the string representation of the dataset. |
Source code in torch_choice/data/choice_dataset.py
def __repr__(self) -> str:
"""A method to get a string representation of the dataset.
Returns:
str: the string representation of the dataset.
"""
# don't print shapes of internal attributes like _num_users and _num_items.
info = [f'{key}={self._size_repr(item)}' for key, item in self.__dict__.items() if not key.startswith('_')]
return f"{self.__class__.__name__}(num_items={self.num_items}, num_users={self.num_users}, num_sessions={self.num_sessions}, {', '.join(info)}, device={self.device})"
apply_tensor(self, func)
This s a helper method to apply the provided function to all tensors and tensor values of all dictionaries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
callable |
a callable function to be applied on tensors and tensor-values of dictionaries. |
required |
Returns:
Type | Description |
---|---|
ChoiceDataset |
the modified dataset. |
Source code in torch_choice/data/choice_dataset.py
def apply_tensor(self, func: callable) -> "ChoiceDataset":
"""This s a helper method to apply the provided function to all tensors and tensor values of all dictionaries.
Args:
func (callable): a callable function to be applied on tensors and tensor-values of dictionaries.
Returns:
ChoiceDataset: the modified dataset.
"""
for key, item in self.__dict__.items():
if torch.is_tensor(item):
setattr(self, key, func(item))
# boardcast func to dictionary of tensors as well.
elif isinstance(getattr(self, key), dict):
for obj_key, obj_item in getattr(self, key).items():
if torch.is_tensor(obj_item):
setattr(getattr(self, key), obj_key, func(obj_item))
return self
clone(self)
Creates a copy of self.
Returns:
Type | Description |
---|---|
ChoiceDataset |
a copy of self. |
Source code in torch_choice/data/choice_dataset.py
def clone(self) -> "ChoiceDataset":
"""Creates a copy of self.
Returns:
ChoiceDataset: a copy of self.
"""
dictionary = {}
for k, v in self.__dict__.items():
if torch.is_tensor(v):
dictionary[k] = v.clone()
else:
dictionary[k] = copy.deepcopy(v)
new = self.__class__._from_dict(dictionary)
new._num_users = self.num_users
new._num_items = self.num_items
new._num_sessions = self.num_sessions
return new
summary(self)
A method to summarize the dataset.
Returns:
Type | Description |
---|---|
str |
the string representation of the dataset. |
Source code in torch_choice/data/choice_dataset.py
def summary(self) -> None:
"""A method to summarize the dataset.
Returns:
str: the string representation of the dataset.
"""
summary = ['ChoiceDataset with {} sessions, {} items, {} users, {} purchase records (observations) .'.format(
self.num_sessions, self.num_items, self.num_users if self.user_index is not None else 'single', len(self))]
# summarize users.
if self.user_index is not None:
unique, counts = self.unique(self.user_index)
summary.append(f"The most frequent user is {unique[0]} with {counts[0]} observations; the least frequent user is {unique[-1]} with {counts[-1]} observations; on average, there are {counts.astype(float).mean():.2f} observations per user.")
N = len(unique)
K = min(5, N)
string = f'{K} most frequent users are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
summary.append(string)
string = f'{K} least frequent users are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
summary.append(string)
# summarize items.
unique, counts = self.unique(self.item_index)
N = len(unique)
K = min(5, N)
summary.append(f"The most frequent item is {unique[0]}, it was chosen {counts[0]} times; the least frequent item is {unique[-1]} it was {counts[-1]} times; on average, each item was purchased {counts.astype(float).mean():.2f} times.")
string = f'{K} most frequent items are: ' + ', '.join([f'{unique[i]}({counts[i]} times)' for i in range(K)]) + '.'
summary.append(string)
string = f'{K} least frequent items are: ' + ', '.join([f'{unique[N-i]}({counts[N-i]} times)' for i in range(1, K+1)]) + '.'
summary.append(string)
summary.append('Attribute Summaries:')
for key, item in self.__dict__.items():
if self._is_attribute(key) and torch.is_tensor(item):
summary.append("Observable Tensor '{}' with shape {}".format(key, item.shape))
# price attributes are 3-dimensional tensors, ignore for cleanness here.
if (not self._is_price_attribute(key)) and (not self._is_usersessionitem_attribute(key)) and (not self._is_useritem_attribute(key)) and (not self._is_usersession_attribute(key)):
summary.append(str(pd.DataFrame(item.to('cpu').float().numpy()).describe()))
print('\n'.join(summary) + f"\ndevice={self.device}")
return None
to(self, device)
Moves all tensors in this dataset to the specified PyTorch device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device |
Union[str, torch.device] |
the destination device. |
required |
Returns:
Type | Description |
---|---|
ChoiceDataset |
the modified dataset on the new device. |
Source code in torch_choice/data/choice_dataset.py
def to(self, device: Union[str, torch.device]) -> "ChoiceDataset":
"""Moves all tensors in this dataset to the specified PyTorch device.
Args:
device (Union[str, torch.device]): the destination device.
Returns:
ChoiceDataset: the modified dataset on the new device.
"""
return self.apply_tensor(lambda x: x.to(device))
A helper class for joining several pytorch datasets, using JointDataset and pytorch data loader allows for sampling the same batch index from several datasets.
The JointDataset class is a wrapper for the torch.utils.data.ChoiceDataset class, it is particularly useful when we need to make prediction from multiple datasets. For example, you have data on consumer purchase records in a fast food store, and suppose every customer will purchase exactly a single main food and a single drink. In this case, you have two separate datasets: FoodDataset and DrinkDataset. You may want to use PyTorch sampler to sample them in a dependent manner: you want to take the i-th sample from both datasets, so that you know what (food, drink) combo the i-th customer purchased. You can do this by using the JointDataset class.
Source code in torch_choice/data/joint_dataset.py
class JointDataset(torch.utils.data.Dataset):
"""A helper class for joining several pytorch datasets, using JointDataset
and pytorch data loader allows for sampling the same batch index from several
datasets.
The JointDataset class is a wrapper for the torch.utils.data.ChoiceDataset class, it is particularly useful when we
need to make prediction from multiple datasets. For example, you have data on consumer purchase records in a fast food
store, and suppose every customer will purchase exactly a single main food and a single drink. In this case, you have
two separate datasets: FoodDataset and DrinkDataset. You may want to use PyTorch sampler to sample them in a dependent
manner: you want to take the i-th sample from both datasets, so that you know what (food, drink) combo the i-th customer
purchased. You can do this by using the JointDataset class.
"""
def __init__(self, **datasets) -> None:
"""The initialize methods.
Args:
Arbitrarily many datasets with arbitrary names as keys. In the example above, you can construct
```
dataset = JointDataset(food=FoodDataset, drink=DrinkDataset)
```
All datasets should have the same length.
"""
super(JointDataset, self).__init__()
self.datasets = datasets
# check the length of sub-datasets are the same.
assert len(set([len(d) for d in self.datasets.values()])) == 1
def __len__(self) -> int:
"""Get the number of samples in the joint dataset.
Returns:
int: the number of samples in the joint dataset, which is the same as the number of samples in each dataset contained.
"""
for d in self.datasets.values():
return len(d)
def __getitem__(self, indices: Union[int, torch.LongTensor]) -> Dict[str, ChoiceDataset]:
"""Queries samples from the dataset by index.
Args:
indices (Union[int, torch.LongTensor]): an integer or a 1D tensor of multiple indices.
Returns:
Dict[str, ChoiceDataset]: the subset of the dataset. Keys of the dictionary will be names of each dataset
contained (the same as the keys of the ``datasets`` argument in the constructor). Values will be subsets
of contained datasets, sliced using the provided indices.
"""
return dict((name, d[indices]) for (name, d) in self.datasets.items())
def __repr__(self) -> str:
"""A method to get a string representation of the dataset.
Returns:
str: the string representation of the dataset.
"""
out = [f'JointDataset with {len(self.datasets)} sub-datasets: (']
for name, dataset in self.datasets.items():
out.append(f'\t{name}: {str(dataset)}')
out.append(')')
return '\n'.join(out)
@property
def device(self) -> str:
"""Returns the device of datasets contained in the joint dataset.
Returns:
str: the device of the dataset.
"""
for d in self.datasets.values():
return d.device
def to(self, device: Union[str, torch.device]) -> "JointDataset":
"""Moves all datasets in this dataset to the specified PyTorch device.
Args:
device (Union[str, torch.device]): the destination device.
Returns:
ChoiceDataset: the modified dataset on the new device.
"""
for d in self.datasets.values():
d = d.to(device)
return self
def clone(self) -> "JointDataset":
"""Returns a copy of the dataset.
Returns:
JointDataset: a copy of the dataset.
"""
return JointDataset(**{name: d.clone() for (name, d) in self.datasets.items()})
@property
def item_index(self) -> torch.LongTensor:
"""Returns the current index of each dataset.
Returns:
torch.LongTensor: the indices of items chosen.
"""
return self.datasets["item"].item_index
device: str
property
readonly
Returns the device of datasets contained in the joint dataset.
Returns:
Type | Description |
---|---|
str |
the device of the dataset. |
item_index: LongTensor
property
readonly
Returns the current index of each dataset.
Returns:
Type | Description |
---|---|
torch.LongTensor |
the indices of items chosen. |
__getitem__(self, indices)
special
Queries samples from the dataset by index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
indices |
Union[int, torch.LongTensor] |
an integer or a 1D tensor of multiple indices. |
required |
Returns:
Type | Description |
---|---|
Dict[str, ChoiceDataset] |
the subset of the dataset. Keys of the dictionary will be names of each dataset
contained (the same as the keys of the |
Source code in torch_choice/data/joint_dataset.py
def __getitem__(self, indices: Union[int, torch.LongTensor]) -> Dict[str, ChoiceDataset]:
"""Queries samples from the dataset by index.
Args:
indices (Union[int, torch.LongTensor]): an integer or a 1D tensor of multiple indices.
Returns:
Dict[str, ChoiceDataset]: the subset of the dataset. Keys of the dictionary will be names of each dataset
contained (the same as the keys of the ``datasets`` argument in the constructor). Values will be subsets
of contained datasets, sliced using the provided indices.
"""
return dict((name, d[indices]) for (name, d) in self.datasets.items())
__init__(self, **datasets)
special
The initialize methods.
Source code in torch_choice/data/joint_dataset.py
def __init__(self, **datasets) -> None:
"""The initialize methods.
Args:
Arbitrarily many datasets with arbitrary names as keys. In the example above, you can construct
```
dataset = JointDataset(food=FoodDataset, drink=DrinkDataset)
```
All datasets should have the same length.
"""
super(JointDataset, self).__init__()
self.datasets = datasets
# check the length of sub-datasets are the same.
assert len(set([len(d) for d in self.datasets.values()])) == 1
__len__(self)
special
Get the number of samples in the joint dataset.
Returns:
Type | Description |
---|---|
int |
the number of samples in the joint dataset, which is the same as the number of samples in each dataset contained. |
Source code in torch_choice/data/joint_dataset.py
__repr__(self)
special
A method to get a string representation of the dataset.
Returns:
Type | Description |
---|---|
str |
the string representation of the dataset. |
Source code in torch_choice/data/joint_dataset.py
def __repr__(self) -> str:
"""A method to get a string representation of the dataset.
Returns:
str: the string representation of the dataset.
"""
out = [f'JointDataset with {len(self.datasets)} sub-datasets: (']
for name, dataset in self.datasets.items():
out.append(f'\t{name}: {str(dataset)}')
out.append(')')
return '\n'.join(out)
clone(self)
Returns a copy of the dataset.
Returns:
Type | Description |
---|---|
JointDataset |
a copy of the dataset. |
to(self, device)
Moves all datasets in this dataset to the specified PyTorch device.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device |
Union[str, torch.device] |
the destination device. |
required |
Returns:
Type | Description |
---|---|
ChoiceDataset |
the modified dataset on the new device. |
Source code in torch_choice/data/joint_dataset.py
def to(self, device: Union[str, torch.device]) -> "JointDataset":
"""Moves all datasets in this dataset to the specified PyTorch device.
Args:
device (Union[str, torch.device]): the destination device.
Returns:
ChoiceDataset: the modified dataset on the new device.
"""
for d in self.datasets.values():
d = d.to(device)
return self
The more generalized version of conditional logit model, the model allows for research specific variable types(groups) and different levels of variations for coefficient.
The model allows for the following levels for variable variations:
!!! note "unless the -full
flag is specified (which means we want to explicitly model coefficients"
for all items), for all variation levels related to item (item specific and user-item specific),
the model force coefficients for the first item to be zero. This design follows standard
econometric practice.
-
constant: constant over all users and items,
-
user: user-specific parameters but constant across all items,
-
item: item-specific parameters but constant across all users, parameters for the first item are forced to be zero.
-
item-full: item-specific parameters but constant across all users, explicitly model for all items.
-
user-item: parameters that are specific to both user and item, parameter for the first item for all users are forced to be zero.
- user-item-full: parameters that are specific to both user and item, explicitly model for all items.
Source code in torch_choice/model/conditional_logit_model.py
class ConditionalLogitModel(nn.Module):
"""The more generalized version of conditional logit model, the model allows for research specific
variable types(groups) and different levels of variations for coefficient.
The model allows for the following levels for variable variations:
NOTE: unless the `-full` flag is specified (which means we want to explicitly model coefficients
for all items), for all variation levels related to item (item specific and user-item specific),
the model force coefficients for the first item to be zero. This design follows standard
econometric practice.
- constant: constant over all users and items,
- user: user-specific parameters but constant across all items,
- item: item-specific parameters but constant across all users, parameters for the first item are
forced to be zero.
- item-full: item-specific parameters but constant across all users, explicitly model for all items.
- user-item: parameters that are specific to both user and item, parameter for the first item
for all users are forced to be zero.
- user-item-full: parameters that are specific to both user and item, explicitly model for all items.
"""
def __init__(self,
formula: Optional[str]=None,
dataset: Optional[ChoiceDataset]=None,
coef_variation_dict: Optional[Dict[str, str]]=None,
num_param_dict: Optional[Dict[str, int]]=None,
num_items: Optional[int]=None,
num_users: Optional[int]=None,
regularization: Optional[str]=None,
regularization_weight: Optional[float]=None,
weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
model_outside_option: Optional[bool]=False
) -> None:
"""
Args:
formula (str): a string representing the utility formula.
The formula consists of '(variable_name|variation)'s separated by '+', for example:
"(var1|item) + (var2|user) + (var3|constant)"
where the first part of each term is the name of the variable
and the second part is the variation of the coefficient.
The variation can be one of the following:
'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
data (ChoiceDataset): a ChoiceDataset object for training the model, the parser will infer dimensions of variables
and sizes of coefficients from the ChoiceDataset.
coef_variation_dict (Dict[str, str]): variable type to variation level dictionary. Keys of this dictionary
should be variable names in the dataset (i.e., these starting with `itemsession_`, `price_`, `user_`, etc), or `intercept`
if the researcher requires an intercept term.
For each variable name X_var (e.g., `user_income`) or `intercept`, the corresponding dictionary key should
be one of the following values, this value specifies the "level of variation" of the coefficient.
- `constant`: the coefficient constant over all users and items: $X \beta$.
- `user`: user-specific parameters but constant across all items: $X \beta_{u}$.
- `item`: item-specific parameters but constant across all users, $X \beta_{i}$.
Note that the coefficients for the first item are forced to be zero following the standard practice
in econometrics.
- `item-full`: the same configuration as `item`, but does not force the coefficients of the first item to
be zeros.
The following configurations are supported by the package, but we don't recommend using them due to the
large number of parameters.
- `user-item`: parameters that are specific to both user and item, parameter for the first item
for all users are forced to be zero.
- `user-item-full`: parameters that are specific to both user and item, explicitly model for all items.
num_param_dict (Optional[Dict[str, int]]): variable type to number of parameters dictionary with keys exactly the same
as the `coef_variation_dict`. Values of `num_param_dict` records numbers of features in each kind of variable.
If None is supplied, num_param_dict will be a dictionary with the same keys as the `coef_variation_dict` dictionary
and values of all ones. Default to be None.
num_items (int): number of items in the dataset.
num_users (int): number of users in the dataset.
regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
regularization added to the log-likelihood.
- 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
- 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
- None does not modify the log-likelihood.
Defaults to None.
regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
This term controls the strength of regularization. This argument is required if and only if regularization
is not None.
Defaults to None.
weight_initialization (Optional[Union[str, Dict[str, str]]]): controls for how coefficients are initialized;
users can pass a string from {'normal', 'uniform', 'zero'} to initialize all coefficients in the same way.
Alternatively, users can pass a dictionary with keys exactly the same as the `coef_variation_dict` dictionary,
and values from {'normal', 'uniform', 'zero'} to initialize coefficients of different types of variables differently.
By default, all coefficients are initialized following a standard normal distribution.
model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
The utility of the outside option is always set to 0 while computing the probability.
By default, model_outside_option is set to False and the model does not model the outside option.
"""
# ==============================================================================================================
# Check that the model received a valid combination of inputs so that it can be initialized.
# ==============================================================================================================
if coef_variation_dict is None and formula is None:
raise ValueError("Either coef_variation_dict or formula should be provided to specify the model.")
if (coef_variation_dict is not None) and (formula is not None):
raise ValueError("Only one of coef_variation_dict or formula should be provided to specify the model.")
if (formula is not None) and (dataset is None):
raise ValueError("If formula is provided, data should be provided to specify the model.")
# ==============================================================================================================
# Build necessary dictionaries for model initialization.
# ==============================================================================================================
if formula is None:
# Use dictionaries to initialize the model.
if num_param_dict is None:
warnings.warn("`num_param_dict` is not provided, all variables will be treated as having one parameter.")
num_param_dict = {key:1 for key in coef_variation_dict.keys()}
assert coef_variation_dict.keys() == num_param_dict.keys()
# variable `var` with variation `spec` to variable `var[spec]`.
rename = dict() # old variable name --> new variable name.
for variable, specificity in coef_variation_dict.items():
rename[variable] = f"{variable}[{specificity}]"
for old_name, new_name in rename.items():
coef_variation_dict[new_name] = coef_variation_dict.pop(old_name)
num_param_dict[new_name] = num_param_dict.pop(old_name)
else:
# Use the formula to infer model.
coef_variation_dict, num_param_dict = parse_formula(formula, dataset)
# ==============================================================================================================
# Model Initialization.
# ==============================================================================================================
super(ConditionalLogitModel, self).__init__()
self.coef_variation_dict = deepcopy(coef_variation_dict)
self.num_param_dict = deepcopy(num_param_dict)
self.num_items = num_items
self.num_users = num_users
self.regularization = deepcopy(regularization)
assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
self.regularization_weight = regularization_weight
if (self.regularization is not None) and (self.regularization_weight is None):
raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
if (self.regularization is None) and (self.regularization_weight is not None):
raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')
# check number of parameters specified are all positive.
for var_type, num_params in self.num_param_dict.items():
assert num_params > 0, f'num_params needs to be positive, got: {num_params}.'
# infer the number of parameters for intercept if the researcher forgets.
for variable in self.coef_variation_dict.keys():
if self.is_intercept_term(variable) and variable not in self.num_param_dict.keys():
warnings.warn(f"`{variable}` key found in coef_variation_dict but not in num_param_dict, num_param_dict['{variable}'] has been set to 1.")
self.num_param_dict[variable] = 1
# inform coefficients their ways of being initialized.
self.weight_initialization = deepcopy(weight_initialization)
# construct trainable parameters.
coef_dict = dict()
for var_type, variation in self.coef_variation_dict.items():
if isinstance(self.weight_initialization, dict):
if var_type.split('[')[0] in self.weight_initialization.keys():
# use the variable-specific initialization if provided.
init = self.weight_initialization[var_type.split('[')[0]]
else:
# use default initialization.
init = None
else:
# initialize all coefficients in the same way.
init = self.weight_initialization
coef_dict[var_type] = Coefficient(variation=variation,
num_items=self.num_items,
num_users=self.num_users,
num_params=self.num_param_dict[var_type],
init=init)
# A ModuleDict is required to properly register all trainable parameters.
# self.parameter() will fail if a python dictionary is used instead.
self.coef_dict = nn.ModuleDict(coef_dict)
self.model_outside_option = model_outside_option
def __repr__(self) -> str:
"""Return a string representation of the model.
Returns:
str: the string representation of the model.
"""
out_str_lst = ['Conditional logistic discrete choice model, expects input features:\n']
for var_type, num_params in self.num_param_dict.items():
out_str_lst.append(f'X[{var_type}] with {num_params} parameters, with {self.coef_variation_dict[var_type]} level variation.')
return super().__repr__() + '\n' + '\n'.join(out_str_lst) + '\n' + f'device={self.device}'
@property
def num_params(self) -> int:
"""Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied
with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no
intercept is involved.
Returns:
int: the total number of learnable parameters.
"""
return sum(w.numel() for w in self.parameters())
def summary(self):
"""Print out the current model parameter."""
for var_type, coefficient in self.coef_dict.items():
if coefficient is not None:
print('Variable Type: ', var_type)
print(coefficient.coef)
def forward(self,
batch: ChoiceDataset,
manual_coef_value_dict: Optional[Dict[str, torch.Tensor]] = None
) -> torch.Tensor:
"""
Forward pass of the model.
Args:
batch: a `ChoiceDataset` object.
manual_coef_value_dict (Optional[Dict[str, torch.Tensor]], optional): a dictionary with
keys in {'u', 'i'} etc and tensors as values. If provided, the model will force
coefficient to be the provided values and compute utility conditioned on the provided
coefficient values. This feature is useful when the research wishes to plug in particular
values of coefficients and examine the utility values. If not provided, the model will
use the learned coefficient values in self.coef_dict.
Defaults to None.
Returns:
torch.Tensor: a tensor of shape (num_trips, num_items) whose (t, i) entry represents
the utility from item i in trip t for the user involved in that trip.
"""
x_dict = batch.x_dict
for variable in self.coef_variation_dict.keys():
if self.is_intercept_term(variable):
# intercept term has no input tensor from the ChoiceDataset data structure.
# the tensor for intercept has only 1 feature, every entry is 1.
x_dict['intercept'] = torch.ones((len(batch), self.num_items, 1), device=batch.device)
break
# compute the utility from each item in each choice session.
total_utility = torch.zeros((len(batch), self.num_items), device=batch.device)
# for each type of variables, apply the corresponding coefficient to input x.
for var_type, coef in self.coef_dict.items():
# variable type is named as "observable_name[variation]", retrieve the corresponding observable name.
corresponding_observable = var_type.split("[")[0]
total_utility += coef(
x_dict[corresponding_observable],
batch.user_index,
manual_coef_value=None if manual_coef_value_dict is None else manual_coef_value_dict[var_type])
assert total_utility.shape == (len(batch), self.num_items)
if batch.item_availability is not None:
# mask out unavailable items.
total_utility[~batch.item_availability[batch.session_index, :]] = torch.finfo(total_utility.dtype).min / 2
# accommodate the outside option.
if self.model_outside_option:
# the outside option has zero utility.
util_zero = torch.zeros(total_utility.size(0), 1, device=batch.device) # (len(batch), 1) zero tensor.
# outside option is indicated by item_index == -1, we put it at the end.
total_utility = torch.cat((total_utility, util_zero), dim=1) # (len(batch), num_items+1)
return total_utility
def negative_log_likelihood(self, batch: ChoiceDataset, y: torch.Tensor, is_train: bool=True) -> torch.Tensor:
"""Computes the log-likelihood for the batch and label.
TODO: consider remove y, change to label.
TODO: consider move this method outside the model, the role of the model is to compute the utility.
Args:
batch (ChoiceDataset): a ChoiceDataset object containing the data.
y (torch.Tensor): the label.
is_train (bool, optional): whether to trace the gradient. Defaults to True.
Returns:
torch.Tensor: the negative log-likelihood.
"""
if is_train:
self.train()
else:
self.eval()
# (num_trips, num_items)
total_utility = self.forward(batch)
# check shapes.
if self.model_outside_option:
assert total_utility.shape == (len(batch), self.num_items+1)
assert torch.all(total_utility[:, -1] == 0), "The last column of total_utility should be all zeros, which corresponds to the outside option."
else:
assert total_utility.shape == (len(batch), self.num_items)
logP = torch.log_softmax(total_utility, dim=1)
# since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
# indexing should correctly retrieve the log-likelihood even for outside options.
nll = - logP[torch.arange(len(y)), y].sum()
return nll
def loss(self, *args, **kwargs):
"""The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
nll = self.negative_log_likelihood(*args, **kwargs)
if self.regularization is not None:
L = {'L1': 1, 'L2': 2}[self.regularization]
for param in self.parameters():
nll += self.regularization_weight * torch.norm(param, p=L)
return nll
@property
def device(self) -> torch.device:
"""Returns the device of the coefficient.
Returns:
torch.device: the device of the model.
"""
return next(iter(self.coef_dict.values())).device
@staticmethod
def is_intercept_term(variable: str):
# check if the given variable is an intercept (fixed effect) term.
# intercept (fixed effect) terms are defined as 'intercept[*]' and looks like 'intercept[user]', 'intercept[item]', etc.
return (variable.startswith('intercept[') and variable.endswith(']'))
def get_coefficient(self, variable: str) -> torch.Tensor:
"""Retrieve the coefficient tensor for the given variable.
Args:
variable (str): the variable name.
Returns:
torch.Tensor: the corresponding coefficient tensor of the requested variable.
"""
return self.state_dict()[f"coef_dict.{variable}.coef"].detach().clone()
device: device
property
readonly
Returns the device of the coefficient.
Returns:
Type | Description |
---|---|
torch.device |
the device of the model. |
num_params: int
property
readonly
Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no intercept is involved.
Returns:
Type | Description |
---|---|
int |
the total number of learnable parameters. |
__init__(self, formula=None, dataset=None, coef_variation_dict=None, num_param_dict=None, num_items=None, num_users=None, regularization=None, regularization_weight=None, weight_initialization=None, model_outside_option=False)
special
Parameters:
Name | Type | Description | Default |
---|---|---|---|
formula |
str |
a string representing the utility formula. The formula consists of '(variable_name|variation)'s separated by '+', for example: "(var1|item) + (var2|user) + (var3|constant)" where the first part of each term is the name of the variable and the second part is the variation of the coefficient. The variation can be one of the following: 'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'. All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names. |
None |
data |
ChoiceDataset |
a ChoiceDataset object for training the model, the parser will infer dimensions of variables and sizes of coefficients from the ChoiceDataset. |
required |
coef_variation_dict |
Dict[str, str] |
variable type to variation level dictionary. Keys of this dictionary
should be variable names in the dataset (i.e., these starting with
The following configurations are supported by the package, but we don't recommend using them due to the
large number of parameters.
-
|
None |
num_param_dict |
Optional[Dict[str, int]] |
variable type to number of parameters dictionary with keys exactly the same
as the |
None |
num_items |
int |
number of items in the dataset. |
None |
num_users |
int |
number of users in the dataset. |
None |
regularization |
Optional[str] |
this argument takes values from {'L1', 'L2', None}, which specifies the type of regularization added to the log-likelihood. - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood. - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood. - None does not modify the log-likelihood. Defaults to None. |
None |
regularization_weight |
Optional[float] |
the weight of parameter norm subtracted from the log-likelihood. This term controls the strength of regularization. This argument is required if and only if regularization is not None. Defaults to None. |
None |
weight_initialization |
Optional[Union[str, Dict[str, str]]] |
controls for how coefficients are initialized;
users can pass a string from {'normal', 'uniform', 'zero'} to initialize all coefficients in the same way.
Alternatively, users can pass a dictionary with keys exactly the same as the |
None |
model_outside_option |
Optional[bool] |
whether to explicitly model the outside option (i.e., the consumer did not buy anything).
To enable modeling outside option, the outside option is indicated by |
False |
Source code in torch_choice/model/conditional_logit_model.py
def __init__(self,
formula: Optional[str]=None,
dataset: Optional[ChoiceDataset]=None,
coef_variation_dict: Optional[Dict[str, str]]=None,
num_param_dict: Optional[Dict[str, int]]=None,
num_items: Optional[int]=None,
num_users: Optional[int]=None,
regularization: Optional[str]=None,
regularization_weight: Optional[float]=None,
weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
model_outside_option: Optional[bool]=False
) -> None:
"""
Args:
formula (str): a string representing the utility formula.
The formula consists of '(variable_name|variation)'s separated by '+', for example:
"(var1|item) + (var2|user) + (var3|constant)"
where the first part of each term is the name of the variable
and the second part is the variation of the coefficient.
The variation can be one of the following:
'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
data (ChoiceDataset): a ChoiceDataset object for training the model, the parser will infer dimensions of variables
and sizes of coefficients from the ChoiceDataset.
coef_variation_dict (Dict[str, str]): variable type to variation level dictionary. Keys of this dictionary
should be variable names in the dataset (i.e., these starting with `itemsession_`, `price_`, `user_`, etc), or `intercept`
if the researcher requires an intercept term.
For each variable name X_var (e.g., `user_income`) or `intercept`, the corresponding dictionary key should
be one of the following values, this value specifies the "level of variation" of the coefficient.
- `constant`: the coefficient constant over all users and items: $X \beta$.
- `user`: user-specific parameters but constant across all items: $X \beta_{u}$.
- `item`: item-specific parameters but constant across all users, $X \beta_{i}$.
Note that the coefficients for the first item are forced to be zero following the standard practice
in econometrics.
- `item-full`: the same configuration as `item`, but does not force the coefficients of the first item to
be zeros.
The following configurations are supported by the package, but we don't recommend using them due to the
large number of parameters.
- `user-item`: parameters that are specific to both user and item, parameter for the first item
for all users are forced to be zero.
- `user-item-full`: parameters that are specific to both user and item, explicitly model for all items.
num_param_dict (Optional[Dict[str, int]]): variable type to number of parameters dictionary with keys exactly the same
as the `coef_variation_dict`. Values of `num_param_dict` records numbers of features in each kind of variable.
If None is supplied, num_param_dict will be a dictionary with the same keys as the `coef_variation_dict` dictionary
and values of all ones. Default to be None.
num_items (int): number of items in the dataset.
num_users (int): number of users in the dataset.
regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
regularization added to the log-likelihood.
- 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
- 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
- None does not modify the log-likelihood.
Defaults to None.
regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
This term controls the strength of regularization. This argument is required if and only if regularization
is not None.
Defaults to None.
weight_initialization (Optional[Union[str, Dict[str, str]]]): controls for how coefficients are initialized;
users can pass a string from {'normal', 'uniform', 'zero'} to initialize all coefficients in the same way.
Alternatively, users can pass a dictionary with keys exactly the same as the `coef_variation_dict` dictionary,
and values from {'normal', 'uniform', 'zero'} to initialize coefficients of different types of variables differently.
By default, all coefficients are initialized following a standard normal distribution.
model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
The utility of the outside option is always set to 0 while computing the probability.
By default, model_outside_option is set to False and the model does not model the outside option.
"""
# ==============================================================================================================
# Check that the model received a valid combination of inputs so that it can be initialized.
# ==============================================================================================================
if coef_variation_dict is None and formula is None:
raise ValueError("Either coef_variation_dict or formula should be provided to specify the model.")
if (coef_variation_dict is not None) and (formula is not None):
raise ValueError("Only one of coef_variation_dict or formula should be provided to specify the model.")
if (formula is not None) and (dataset is None):
raise ValueError("If formula is provided, data should be provided to specify the model.")
# ==============================================================================================================
# Build necessary dictionaries for model initialization.
# ==============================================================================================================
if formula is None:
# Use dictionaries to initialize the model.
if num_param_dict is None:
warnings.warn("`num_param_dict` is not provided, all variables will be treated as having one parameter.")
num_param_dict = {key:1 for key in coef_variation_dict.keys()}
assert coef_variation_dict.keys() == num_param_dict.keys()
# variable `var` with variation `spec` to variable `var[spec]`.
rename = dict() # old variable name --> new variable name.
for variable, specificity in coef_variation_dict.items():
rename[variable] = f"{variable}[{specificity}]"
for old_name, new_name in rename.items():
coef_variation_dict[new_name] = coef_variation_dict.pop(old_name)
num_param_dict[new_name] = num_param_dict.pop(old_name)
else:
# Use the formula to infer model.
coef_variation_dict, num_param_dict = parse_formula(formula, dataset)
# ==============================================================================================================
# Model Initialization.
# ==============================================================================================================
super(ConditionalLogitModel, self).__init__()
self.coef_variation_dict = deepcopy(coef_variation_dict)
self.num_param_dict = deepcopy(num_param_dict)
self.num_items = num_items
self.num_users = num_users
self.regularization = deepcopy(regularization)
assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
self.regularization_weight = regularization_weight
if (self.regularization is not None) and (self.regularization_weight is None):
raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
if (self.regularization is None) and (self.regularization_weight is not None):
raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')
# check number of parameters specified are all positive.
for var_type, num_params in self.num_param_dict.items():
assert num_params > 0, f'num_params needs to be positive, got: {num_params}.'
# infer the number of parameters for intercept if the researcher forgets.
for variable in self.coef_variation_dict.keys():
if self.is_intercept_term(variable) and variable not in self.num_param_dict.keys():
warnings.warn(f"`{variable}` key found in coef_variation_dict but not in num_param_dict, num_param_dict['{variable}'] has been set to 1.")
self.num_param_dict[variable] = 1
# inform coefficients their ways of being initialized.
self.weight_initialization = deepcopy(weight_initialization)
# construct trainable parameters.
coef_dict = dict()
for var_type, variation in self.coef_variation_dict.items():
if isinstance(self.weight_initialization, dict):
if var_type.split('[')[0] in self.weight_initialization.keys():
# use the variable-specific initialization if provided.
init = self.weight_initialization[var_type.split('[')[0]]
else:
# use default initialization.
init = None
else:
# initialize all coefficients in the same way.
init = self.weight_initialization
coef_dict[var_type] = Coefficient(variation=variation,
num_items=self.num_items,
num_users=self.num_users,
num_params=self.num_param_dict[var_type],
init=init)
# A ModuleDict is required to properly register all trainable parameters.
# self.parameter() will fail if a python dictionary is used instead.
self.coef_dict = nn.ModuleDict(coef_dict)
self.model_outside_option = model_outside_option
__repr__(self)
special
Return a string representation of the model.
Returns:
Type | Description |
---|---|
str |
the string representation of the model. |
Source code in torch_choice/model/conditional_logit_model.py
def __repr__(self) -> str:
"""Return a string representation of the model.
Returns:
str: the string representation of the model.
"""
out_str_lst = ['Conditional logistic discrete choice model, expects input features:\n']
for var_type, num_params in self.num_param_dict.items():
out_str_lst.append(f'X[{var_type}] with {num_params} parameters, with {self.coef_variation_dict[var_type]} level variation.')
return super().__repr__() + '\n' + '\n'.join(out_str_lst) + '\n' + f'device={self.device}'
forward(self, batch, manual_coef_value_dict=None)
Forward pass of the model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
ChoiceDataset |
a |
required |
manual_coef_value_dict |
Optional[Dict[str, torch.Tensor]] |
a dictionary with keys in {'u', 'i'} etc and tensors as values. If provided, the model will force coefficient to be the provided values and compute utility conditioned on the provided coefficient values. This feature is useful when the research wishes to plug in particular values of coefficients and examine the utility values. If not provided, the model will use the learned coefficient values in self.coef_dict. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
torch.Tensor |
a tensor of shape (num_trips, num_items) whose (t, i) entry represents the utility from item i in trip t for the user involved in that trip. |
Source code in torch_choice/model/conditional_logit_model.py
def forward(self,
batch: ChoiceDataset,
manual_coef_value_dict: Optional[Dict[str, torch.Tensor]] = None
) -> torch.Tensor:
"""
Forward pass of the model.
Args:
batch: a `ChoiceDataset` object.
manual_coef_value_dict (Optional[Dict[str, torch.Tensor]], optional): a dictionary with
keys in {'u', 'i'} etc and tensors as values. If provided, the model will force
coefficient to be the provided values and compute utility conditioned on the provided
coefficient values. This feature is useful when the research wishes to plug in particular
values of coefficients and examine the utility values. If not provided, the model will
use the learned coefficient values in self.coef_dict.
Defaults to None.
Returns:
torch.Tensor: a tensor of shape (num_trips, num_items) whose (t, i) entry represents
the utility from item i in trip t for the user involved in that trip.
"""
x_dict = batch.x_dict
for variable in self.coef_variation_dict.keys():
if self.is_intercept_term(variable):
# intercept term has no input tensor from the ChoiceDataset data structure.
# the tensor for intercept has only 1 feature, every entry is 1.
x_dict['intercept'] = torch.ones((len(batch), self.num_items, 1), device=batch.device)
break
# compute the utility from each item in each choice session.
total_utility = torch.zeros((len(batch), self.num_items), device=batch.device)
# for each type of variables, apply the corresponding coefficient to input x.
for var_type, coef in self.coef_dict.items():
# variable type is named as "observable_name[variation]", retrieve the corresponding observable name.
corresponding_observable = var_type.split("[")[0]
total_utility += coef(
x_dict[corresponding_observable],
batch.user_index,
manual_coef_value=None if manual_coef_value_dict is None else manual_coef_value_dict[var_type])
assert total_utility.shape == (len(batch), self.num_items)
if batch.item_availability is not None:
# mask out unavailable items.
total_utility[~batch.item_availability[batch.session_index, :]] = torch.finfo(total_utility.dtype).min / 2
# accommodate the outside option.
if self.model_outside_option:
# the outside option has zero utility.
util_zero = torch.zeros(total_utility.size(0), 1, device=batch.device) # (len(batch), 1) zero tensor.
# outside option is indicated by item_index == -1, we put it at the end.
total_utility = torch.cat((total_utility, util_zero), dim=1) # (len(batch), num_items+1)
return total_utility
get_coefficient(self, variable)
Retrieve the coefficient tensor for the given variable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variable |
str |
the variable name. |
required |
Returns:
Type | Description |
---|---|
torch.Tensor |
the corresponding coefficient tensor of the requested variable. |
Source code in torch_choice/model/conditional_logit_model.py
def get_coefficient(self, variable: str) -> torch.Tensor:
"""Retrieve the coefficient tensor for the given variable.
Args:
variable (str): the variable name.
Returns:
torch.Tensor: the corresponding coefficient tensor of the requested variable.
"""
return self.state_dict()[f"coef_dict.{variable}.coef"].detach().clone()
loss(self, *args, **kwargs)
The loss function to be optimized. This is a wrapper of negative_log_likelihood
+ regularization loss if required.
Source code in torch_choice/model/conditional_logit_model.py
def loss(self, *args, **kwargs):
"""The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
nll = self.negative_log_likelihood(*args, **kwargs)
if self.regularization is not None:
L = {'L1': 1, 'L2': 2}[self.regularization]
for param in self.parameters():
nll += self.regularization_weight * torch.norm(param, p=L)
return nll
negative_log_likelihood(self, batch, y, is_train=True)
Computes the log-likelihood for the batch and label. TODO: consider remove y, change to label. TODO: consider move this method outside the model, the role of the model is to compute the utility.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
ChoiceDataset |
a ChoiceDataset object containing the data. |
required |
y |
torch.Tensor |
the label. |
required |
is_train |
bool |
whether to trace the gradient. Defaults to True. |
True |
Returns:
Type | Description |
---|---|
torch.Tensor |
the negative log-likelihood. |
Source code in torch_choice/model/conditional_logit_model.py
def negative_log_likelihood(self, batch: ChoiceDataset, y: torch.Tensor, is_train: bool=True) -> torch.Tensor:
"""Computes the log-likelihood for the batch and label.
TODO: consider remove y, change to label.
TODO: consider move this method outside the model, the role of the model is to compute the utility.
Args:
batch (ChoiceDataset): a ChoiceDataset object containing the data.
y (torch.Tensor): the label.
is_train (bool, optional): whether to trace the gradient. Defaults to True.
Returns:
torch.Tensor: the negative log-likelihood.
"""
if is_train:
self.train()
else:
self.eval()
# (num_trips, num_items)
total_utility = self.forward(batch)
# check shapes.
if self.model_outside_option:
assert total_utility.shape == (len(batch), self.num_items+1)
assert torch.all(total_utility[:, -1] == 0), "The last column of total_utility should be all zeros, which corresponds to the outside option."
else:
assert total_utility.shape == (len(batch), self.num_items)
logP = torch.log_softmax(total_utility, dim=1)
# since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
# indexing should correctly retrieve the log-likelihood even for outside options.
nll = - logP[torch.arange(len(y)), y].sum()
return nll
summary(self)
Print out the current model parameter.
Source code in torch_choice/model/nested_logit_model.py
class NestedLogitModel(nn.Module):
def __init__(self,
nest_to_item: Dict[object, List[int]],
# method 1: specify variation and num param. dictionary.
nest_coef_variation_dict: Optional[Dict[str, str]]=None,
nest_num_param_dict: Optional[Dict[str, int]]=None,
item_coef_variation_dict: Optional[Dict[str, str]]=None,
item_num_param_dict: Optional[Dict[str, int]]=None,
# method 2: specify formula and dataset.
item_formula: Optional[str]=None,
nest_formula: Optional[str]=None,
dataset: Optional[JointDataset]=None,
num_users: Optional[int]=None,
shared_lambda: bool=False,
regularization: Optional[str]=None,
regularization_weight: Optional[float]=None,
nest_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
item_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
model_outside_option: Optional[bool]=False
) -> None:
"""Initialization method of the nested logit model.
Args:
nest_to_item (Dict[object, List[int]]): a dictionary maps a nest ID to a list
of items IDs of the queried nest.
nest_coef_variation_dict (Dict[str, str]): a dictionary maps a variable type
(i.e., variable group) to the level of variation for the coefficient of this type
of variables.
nest_num_param_dict (Dict[str, int]): a dictionary maps a variable type name to
the number of parameters in this variable group.
item_coef_variation_dict (Dict[str, str]): the same as nest_coef_variation_dict but
for item features.
item_num_param_dict (Dict[str, int]): the same as nest_num_param_dict but for item
features.
{nest, item}_formula (str): a string representing the utility formula for the {nest, item} level logit model.
The formula consists of '(variable_name|variation)'s separated by '+', for example:
"(var1|item) + (var2|user) + (var3|constant)"
where the first part of each term is the name of the variable
and the second part is the variation of the coefficient.
The variation can be one of the following:
'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
dataset (JointDataset): a JointDataset object for training the model, the parser will infer dimensions of variables
and sizes of coefficients for the nest level model from dataset.datasets['nest']. The parser will infer dimensions of variables and sizes of coefficients for the item level model from dataset.datasets['item'].
num_users (Optional[int], optional): number of users to be modelled, this is only
required if any of variable type requires user-specific variations.
Defaults to None.
shared_lambda (bool): a boolean indicating whether to enforce the elasticity lambda, which
is the coefficient for inclusive values, to be constant for all nests.
The lambda enters the nest-level selection as the following
Utility of choosing nest k = lambda * inclusive value of nest k
+ linear combination of some other nest level features
If set to True, a single lambda will be learned for all nests, otherwise, the
model learns an individual lambda for each nest.
Defaults to False.
regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
regularization added to the log-likelihood.
- 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
- 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
- None does not modify the log-likelihood.
Defaults to None.
regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
This term controls the strength of regularization. This argument is required if and only if regularization
is not None.
Defaults to None.
{nest, item}_weight_initialization (Optional[Union[str, Dict[str, str]]]): methods to initialize the weights of
coefficients for {nest, item} level model. Please refer to the `weight_initialization` keyword in ConditionalLogitModel's documentation for more details.
model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
The utility of the outside option is always set to 0 while computing the probability.
By default, model_outside_option is set to False and the model does not model the outside option.
"""
# handle nest level model.
using_formula_to_initiate = (item_formula is not None) and (nest_formula is not None)
if using_formula_to_initiate:
# make sure that the research does not specify duplicated information, which might cause conflict.
if (nest_coef_variation_dict is not None) or (item_coef_variation_dict is not None):
raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_coef_variation_dict at the same time.')
if (nest_num_param_dict is not None) or (item_num_param_dict is not None):
raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_num_param_dict at the same time.')
if dataset is None:
raise ValueError('Dataset is required if {item, nest}_formula is specified to initiate the model.')
nest_coef_variation_dict, nest_num_param_dict = parse_formula(nest_formula, dataset.datasets['nest'])
item_coef_variation_dict, item_num_param_dict = parse_formula(item_formula, dataset.datasets['item'])
else:
# check for conflicting information.
if (nest_formula is not None) or (item_formula is not None):
raise ValueError('You should not specify {item, nest}_formula and {item, nest}_coef_variation_dict at the same time.')
# make sure that the research specifies all the required information.
if (nest_coef_variation_dict is None) or (item_coef_variation_dict is None):
raise ValueError('You should specify the {item, nest}_coef_variation_dict to initiate the model.')
if (nest_num_param_dict is None) or (item_num_param_dict is None):
raise ValueError('You should specify the {item, nest}_num_param_dict to initiate the model.')
super(NestedLogitModel, self).__init__()
self.nest_to_item = nest_to_item
self.nest_coef_variation_dict = nest_coef_variation_dict
self.nest_num_param_dict = nest_num_param_dict
self.item_coef_variation_dict = item_coef_variation_dict
self.item_num_param_dict = item_num_param_dict
self.num_users = num_users
self.nests = list(nest_to_item.keys())
self.num_nests = len(self.nests)
self.num_items = sum(len(items) for items in nest_to_item.values())
# nest coefficients.
self.nest_coef_dict = self._build_coef_dict(self.nest_coef_variation_dict,
self.nest_num_param_dict,
self.num_nests,
weight_initialization=deepcopy(nest_weight_initialization))
# item coefficients.
self.item_coef_dict = self._build_coef_dict(self.item_coef_variation_dict,
self.item_num_param_dict,
self.num_items,
weight_initialization=deepcopy(item_weight_initialization))
self.shared_lambda = shared_lambda
if self.shared_lambda:
self.lambda_weight = nn.Parameter(torch.ones(1), requires_grad=True)
else:
self.lambda_weight = nn.Parameter(torch.ones(self.num_nests) / 2, requires_grad=True)
# breakpoint()
# self.iv_weights = nn.Parameter(torch.ones(1), requires_grad=True)
# used to warn users if forgot to call clamp.
self._clamp_called_flag = True
self.regularization = regularization
assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
self.regularization_weight = regularization_weight
if (self.regularization is not None) and (self.regularization_weight is None):
raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
if (self.regularization is None) and (self.regularization_weight is not None):
raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')
self.model_outside_option = model_outside_option
@property
def num_params(self) -> int:
"""Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied
with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no
intercept is involved.
Returns:
int: the total number of learnable parameters.
"""
return sum(w.numel() for w in self.parameters())
def _build_coef_dict(self,
coef_variation_dict: Dict[str, str],
num_param_dict: Dict[str, int],
num_items: int,
weight_initialization: Optional[Union[str, Dict[str, str]]]=None
) -> nn.ModuleDict:
"""Builds a coefficient dictionary containing all trainable components of the model, mapping coefficient names
to the corresponding Coefficient Module.
num_items could be the actual number of items or the number of nests depends on the use case.
NOTE: torch-choice users don't directly interact with this method.
Args:
coef_variation_dict (Dict[str, str]): a dictionary mapping coefficient names (e.g., theta_user) to the level
of variation (e.g., 'user').
num_param_dict (Dict[str, int]): a dictionary mapping coefficient names to the number of parameters in this
coefficient. Be aware that, for example, if there is one K-dimensional coefficient for every user, then
the `num_param` should be K instead of K x number of users.
num_items (int): the total number of items in the prediction problem. `num_items` should be the number of nests if _build_coef_dict() is used for nest-level prediction.
Returns:
nn.ModuleDict: a PyTorch ModuleDict object mapping from coefficient names to training Coefficient.
"""
coef_dict = dict()
for var_type, variation in coef_variation_dict.items():
num_params = num_param_dict[var_type]
if isinstance(weight_initialization, dict):
if var_type.split('[')[0] in weight_initialization.keys():
# use the variable-specific initialization if provided.
init = weight_initialization[var_type.split('[')[0]]
else:
# use default initialization.
init = None
else:
# initialize all coefficients in the same way.
init = weight_initialization
coef_dict[var_type] = Coefficient(variation=variation,
num_items=num_items,
num_users=self.num_users,
num_params=num_params,
init=init)
return nn.ModuleDict(coef_dict)
def forward(self, batch: ChoiceDataset) -> torch.Tensor:
"""An standard forward method for the model, the user feeds a ChoiceDataset batch and the model returns the
predicted log-likelihood tensor. The main forward passing happens in the _forward() method, but we provide
this wrapper forward() method for a cleaner API, as forward() only requires a single batch argument.
For more details about the forward passing, please refer to the _forward() method.
# TODO: the ConditionalLogitModel returns predicted utility, the NestedLogitModel behaves the same?
Args:
batch (ChoiceDataset): a ChoiceDataset object containing the data batch.
Returns:
torch.Tensor: a tensor of shape (num_trips, num_items) including the log probability
of choosing item i in trip t.
"""
return self._forward(batch['nest'].x_dict,
batch['item'].x_dict,
batch['item'].user_index,
batch['item'].item_availability)
def _forward(self,
nest_x_dict: Dict[str, torch.Tensor],
item_x_dict: Dict[str, torch.Tensor],
user_index: Optional[torch.LongTensor] = None,
item_availability: Optional[torch.BoolTensor] = None
) -> torch.Tensor:
""""Computes log P[t, i] = the log probability for the user involved in trip t to choose item i.
Let n denote the ID of the user involved in trip t, then P[t, i] = P_{ni} on page 86 of the
book "discrete choice methods with simulation" by Train.
The `_forward` method is an internal API, users should refer to the `forward` method.
Args:
nest_x_dict (torch.Tensor): a dictionary mapping from nest-level feature names to the corresponding feature tensor.
item_x_dict (torch.Tensor): a dictionary mapping from item-level feature names to the corresponding feature tensor.
More details on the shape of the tensors can be found in the docstring of the `x_dict` method of `ChoiceDataset`.
user_index (torch.LongTensor): a tensor of shape (num_trips,) indicating which user is
making decision in each trip. Setting user_index = None assumes the same user is
making decisions in all trips.
item_availability (torch.BoolTensor): a boolean tensor with shape (num_trips, num_items)
indicating the aviliability of items in each trip. If item_availability[t, i] = False,
the utility of choosing item i in trip t, V[t, i], will be set to -inf.
Given the decomposition V[t, i] = W[t, k(i)] + Y[t, i] + eps, V[t, i] is set to -inf
by setting Y[t, i] = -inf for unavilable items.
Returns:
torch.Tensor: a tensor of shape (num_trips, num_items) including the log probability
of choosing item i in trip t.
"""
if self.shared_lambda:
self.lambdas = self.lambda_weight.expand(self.num_nests)
else:
self.lambdas = self.lambda_weight
# if not self._clamp_called_flag:
# warnings.warn('Did you forget to call clamp_lambdas() after optimizer.step()?')
# The overall utility of item can be decomposed into V[item] = W[nest] + Y[item] + eps.
T = list(item_x_dict.values())[0].shape[0]
device = list(item_x_dict.values())[0].device
# compute nest-specific utility with shape (T, num_nests).
W = torch.zeros(T, self.num_nests).to(device)
for variable in self.nest_coef_variation_dict.keys():
if self.is_intercept_term(variable):
nest_x_dict['intercept'] = torch.ones((T, self.num_nests, 1)).to(device)
break
for variable in self.item_coef_variation_dict.keys():
if self.is_intercept_term(variable):
item_x_dict['intercept'] = torch.ones((T, self.num_items, 1)).to(device)
break
for var_type, coef in self.nest_coef_dict.items():
corresponding_observable = var_type.split("[")[0]
W += coef(nest_x_dict[corresponding_observable], user_index)
# compute item-specific utility (T, num_items).
Y = torch.zeros(T, self.num_items).to(device)
for var_type, coef in self.item_coef_dict.items():
corresponding_observable = var_type.split("[")[0]
Y += coef(item_x_dict[corresponding_observable], user_index)
if item_availability is not None:
Y[~item_availability] = torch.finfo(Y.dtype).min / 2
# =============================================================================
# compute the inclusive value of each nest.
inclusive_value = dict()
for k, Bk in self.nest_to_item.items():
# for nest k, divide the Y of all items in Bk by lambda_k.
Y[:, Bk] /= self.lambdas[k]
# compute inclusive value for nest k.
# mask out unavilable items.
inclusive_value[k] = torch.logsumexp(Y[:, Bk], dim=1, keepdim=False) # (T,)
# boardcast inclusive value from (T, num_nests) to (T, num_items).
# for trip t, I[t, i] is the inclusive value of the nest item i belongs to.
I = torch.zeros(T, self.num_items).to(device)
for k, Bk in self.nest_to_item.items():
I[:, Bk] = inclusive_value[k].view(-1, 1) # (T, |Bk|)
# logP_item[t, i] = log P(ni|Bk), where Bk is the nest item i is in, n is the user in trip t.
logP_item = Y - I # (T, num_items)
if self.model_outside_option:
# if the model explicitly models the outside option, we need to add a column of zeros to logP_item.
# log P(ni|Bk) = 0 for the outside option since Y = 0 and the outside option has its own nest.
logP_item = torch.cat((logP_item, torch.zeros(T, 1).to(device)), dim=1)
assert logP_item.shape == (T, self.num_items+1)
assert torch.all(logP_item[:, -1] == 0)
# =============================================================================
# logP_nest[t, i] = log P(Bk), for item i in trip t, the probability of choosing the nest/bucket
# item i belongs to. logP_nest has shape (T, num_items)
# logit[t, i] = W[n, k] + lambda[k] I[n, k], where n is the user involved in trip t, k is
# the nest item i belongs to.
logit = torch.zeros(T, self.num_items).to(device)
for k, Bk in self.nest_to_item.items():
logit[:, Bk] = (W[:, k] + self.lambdas[k] * inclusive_value[k]).view(-1, 1) # (T, |Bk|)
# only count each nest once in the logsumexp within the nest level model.
cols = [x[0] for x in self.nest_to_item.values()]
if self.model_outside_option:
# the last column corresponds to the outside option, which has W+lambda*I = 0 since W = I = Y = 0 for the outside option.
logit = torch.cat((logit, torch.zeros(T, 1).to(device)), dim=1)
assert logit.shape == (T, self.num_items+1)
# we have already added W+lambda*I for each "actual" nest, now we add the "fake" nest for the outside option.
cols.append(-1)
logP_nest = logit - torch.logsumexp(logit[:, cols], dim=1, keepdim=True)
# =============================================================================
# compute the joint log P_{ni} as in the textbook.
logP = logP_item + logP_nest
self._clamp_called_flag = False
return logP
def log_likelihood(self, *args):
"""Computes the log likelihood of the model, please refer to the negative_log_likelihood() method.
Returns:
_type_: the log likelihood of the model.
"""
return - self.negative_log_likelihood(*args)
def negative_log_likelihood(self,
batch: ChoiceDataset,
y: torch.LongTensor,
is_train: bool=True) -> torch.scalar_tensor:
"""Computes the negative log likelihood of the model. Please note the log-likelihood is summed over all samples
in batch instead of the average.
Args:
batch (ChoiceDataset): the ChoiceDataset object containing the data.
y (torch.LongTensor): the label.
is_train (bool, optional): which mode of the model to be used for the forward passing, if we need Hessian
of the NLL through auto-grad, `is_train` should be set to True. If we merely need a performance metric,
then `is_train` can be set to False for better performance.
Defaults to True.
Returns:
torch.scalar_tensor: the negative log likelihood of the model.
"""
# compute the negative log-likelihood loss directly.
if is_train:
self.train()
else:
self.eval()
# (num_trips, num_items)
logP = self.forward(batch)
# check shapes
if self.model_outside_option:
assert logP.shape == (len(batch['item']), self.num_items+1)
else:
assert logP.shape == (len(batch['item']), self.num_items)
# since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
# indexing should correctly retrieve the log-likelihood even for outside options.
nll = - logP[torch.arange(len(y)), y].sum()
return nll
def loss(self, *args, **kwargs):
"""The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
nll = self.negative_log_likelihood(*args, **kwargs)
if self.regularization is not None:
L = {'L1': 1, 'L2': 2}[self.regularization]
for name, param in self.named_parameters():
if name == 'lambda_weight':
# we don't regularize the lambda term, we only regularize coefficients.
continue
nll += self.regularization_weight * torch.norm(param, p=L)
return nll
@property
def device(self) -> torch.device:
"""Returns the device of the coefficient.
Returns:
torch.device: the device of the model.
"""
return next(iter(self.item_coef_dict.values())).device
@staticmethod
def is_intercept_term(variable: str):
# check if the given variable is an intercept (fixed effect) term.
# intercept (fixed effect) terms are defined as 'intercept[*]' and looks like 'intercept[user]', 'intercept[item]', etc.
return (variable.startswith('intercept[') and variable.endswith(']'))
def get_coefficient(self, variable: str, level: Optional[str] = None) -> torch.Tensor:
"""Retrieve the coefficient tensor for the given variable.
Args:
variable (str): the variable name.
level (str): from which level of model to extract the coefficient, can be 'item' or 'nest'. The `level` argument will be discarded if `variable` is `lambda`.
Returns:
torch.Tensor: the corresponding coefficient tensor of the requested variable.
"""
if variable == 'lambda':
return self.lambda_weight.detach().clone()
if level not in ['item', 'nest']:
raise ValueError(f"Level should be either 'item' or 'nest', got {level}.")
return self.state_dict()[f'{level}_coef_dict.{variable}.coef'].detach().clone()
# def clamp_lambdas(self):
# """
# Restrict values of lambdas to 0 < lambda <= 1 to guarantee the utility maximization property
# of the model.
# This method should be called everytime after optimizer.step().
# We add a self_clamp_called_flag to remind researchers if this method is not called.
# """
# for k in range(len(self.lambdas)):
# self.lambdas[k] = torch.clamp(self.lambdas[k], 1e-5, 1)
# self._clam_called_flag = True
# @staticmethod
# def add_constant(x: torch.Tensor, where: str='prepend') -> torch.Tensor:
# """A helper function used to add constant to feature tensor,
# x has shape (batch_size, num_classes, num_parameters),
# returns a tensor of shape (*, num_parameters+1).
# """
# batch_size, num_classes, num_parameters = x.shape
# ones = torch.ones((batch_size, num_classes, 1))
# if where == 'prepend':
# new = torch.cat((ones, x), dim=-1)
# elif where == 'append':
# new = torch.cat((x, ones), dim=-1)
# else:
# raise Exception
# return new
device: device
property
readonly
Returns the device of the coefficient.
Returns:
Type | Description |
---|---|
torch.device |
the device of the model. |
num_params: int
property
readonly
Get the total number of parameters. For example, if there is only an user-specific coefficient to be multiplied with the K-dimensional observable, then the total number of parameters would be K x number of users, assuming no intercept is involved.
Returns:
Type | Description |
---|---|
int |
the total number of learnable parameters. |
__init__(self, nest_to_item, nest_coef_variation_dict=None, nest_num_param_dict=None, item_coef_variation_dict=None, item_num_param_dict=None, item_formula=None, nest_formula=None, dataset=None, num_users=None, shared_lambda=False, regularization=None, regularization_weight=None, nest_weight_initialization=None, item_weight_initialization=None, model_outside_option=False)
special
Initialization method of the nested logit model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nest_to_item |
Dict[object, List[int]] |
a dictionary maps a nest ID to a list of items IDs of the queried nest. |
required |
nest_coef_variation_dict |
Dict[str, str] |
a dictionary maps a variable type (i.e., variable group) to the level of variation for the coefficient of this type of variables. |
None |
nest_num_param_dict |
Dict[str, int] |
a dictionary maps a variable type name to the number of parameters in this variable group. |
None |
item_coef_variation_dict |
Dict[str, str] |
the same as nest_coef_variation_dict but for item features. |
None |
item_num_param_dict |
Dict[str, int] |
the same as nest_num_param_dict but for item features. |
None |
{nest, |
item}_formula (str |
a string representing the utility formula for the {nest, item} level logit model. The formula consists of '(variable_name|variation)'s separated by '+', for example: "(var1|item) + (var2|user) + (var3|constant)" where the first part of each term is the name of the variable and the second part is the variation of the coefficient. The variation can be one of the following: 'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'. All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names. |
required |
dataset |
JointDataset |
a JointDataset object for training the model, the parser will infer dimensions of variables and sizes of coefficients for the nest level model from dataset.datasets['nest']. The parser will infer dimensions of variables and sizes of coefficients for the item level model from dataset.datasets['item']. |
None |
num_users |
Optional[int] |
number of users to be modelled, this is only required if any of variable type requires user-specific variations. Defaults to None. |
None |
shared_lambda |
bool |
a boolean indicating whether to enforce the elasticity lambda, which is the coefficient for inclusive values, to be constant for all nests. The lambda enters the nest-level selection as the following Utility of choosing nest k = lambda * inclusive value of nest k + linear combination of some other nest level features If set to True, a single lambda will be learned for all nests, otherwise, the model learns an individual lambda for each nest. Defaults to False. |
False |
regularization |
Optional[str] |
this argument takes values from {'L1', 'L2', None}, which specifies the type of regularization added to the log-likelihood. - 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood. - 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood. - None does not modify the log-likelihood. Defaults to None. |
None |
regularization_weight |
Optional[float] |
the weight of parameter norm subtracted from the log-likelihood. This term controls the strength of regularization. This argument is required if and only if regularization is not None. Defaults to None. |
None |
{nest, |
item}_weight_initialization (Optional[Union[str, Dict[str, str]]] |
methods to initialize the weights of
coefficients for {nest, item} level model. Please refer to the |
required |
model_outside_option |
Optional[bool] |
whether to explicitly model the outside option (i.e., the consumer did not buy anything).
To enable modeling outside option, the outside option is indicated by |
False |
Source code in torch_choice/model/nested_logit_model.py
def __init__(self,
nest_to_item: Dict[object, List[int]],
# method 1: specify variation and num param. dictionary.
nest_coef_variation_dict: Optional[Dict[str, str]]=None,
nest_num_param_dict: Optional[Dict[str, int]]=None,
item_coef_variation_dict: Optional[Dict[str, str]]=None,
item_num_param_dict: Optional[Dict[str, int]]=None,
# method 2: specify formula and dataset.
item_formula: Optional[str]=None,
nest_formula: Optional[str]=None,
dataset: Optional[JointDataset]=None,
num_users: Optional[int]=None,
shared_lambda: bool=False,
regularization: Optional[str]=None,
regularization_weight: Optional[float]=None,
nest_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
item_weight_initialization: Optional[Union[str, Dict[str, str]]]=None,
model_outside_option: Optional[bool]=False
) -> None:
"""Initialization method of the nested logit model.
Args:
nest_to_item (Dict[object, List[int]]): a dictionary maps a nest ID to a list
of items IDs of the queried nest.
nest_coef_variation_dict (Dict[str, str]): a dictionary maps a variable type
(i.e., variable group) to the level of variation for the coefficient of this type
of variables.
nest_num_param_dict (Dict[str, int]): a dictionary maps a variable type name to
the number of parameters in this variable group.
item_coef_variation_dict (Dict[str, str]): the same as nest_coef_variation_dict but
for item features.
item_num_param_dict (Dict[str, int]): the same as nest_num_param_dict but for item
features.
{nest, item}_formula (str): a string representing the utility formula for the {nest, item} level logit model.
The formula consists of '(variable_name|variation)'s separated by '+', for example:
"(var1|item) + (var2|user) + (var3|constant)"
where the first part of each term is the name of the variable
and the second part is the variation of the coefficient.
The variation can be one of the following:
'constant', 'item', 'item-full', 'user', 'user-item', 'user-item-full'.
All spaces in the formula will be ignored, hence please do not use spaces in variable/observable names.
dataset (JointDataset): a JointDataset object for training the model, the parser will infer dimensions of variables
and sizes of coefficients for the nest level model from dataset.datasets['nest']. The parser will infer dimensions of variables and sizes of coefficients for the item level model from dataset.datasets['item'].
num_users (Optional[int], optional): number of users to be modelled, this is only
required if any of variable type requires user-specific variations.
Defaults to None.
shared_lambda (bool): a boolean indicating whether to enforce the elasticity lambda, which
is the coefficient for inclusive values, to be constant for all nests.
The lambda enters the nest-level selection as the following
Utility of choosing nest k = lambda * inclusive value of nest k
+ linear combination of some other nest level features
If set to True, a single lambda will be learned for all nests, otherwise, the
model learns an individual lambda for each nest.
Defaults to False.
regularization (Optional[str]): this argument takes values from {'L1', 'L2', None}, which specifies the type of
regularization added to the log-likelihood.
- 'L1' will subtract regularization_weight * 1-norm of parameters from the log-likelihood.
- 'L2' will subtract regularization_weight * 2-norm of parameters from the log-likelihood.
- None does not modify the log-likelihood.
Defaults to None.
regularization_weight (Optional[float]): the weight of parameter norm subtracted from the log-likelihood.
This term controls the strength of regularization. This argument is required if and only if regularization
is not None.
Defaults to None.
{nest, item}_weight_initialization (Optional[Union[str, Dict[str, str]]]): methods to initialize the weights of
coefficients for {nest, item} level model. Please refer to the `weight_initialization` keyword in ConditionalLogitModel's documentation for more details.
model_outside_option (Optional[bool]): whether to explicitly model the outside option (i.e., the consumer did not buy anything).
To enable modeling outside option, the outside option is indicated by `item_index[n] == -1` in the item-index-tensor.
In this case, the item-index-tensor can contain values in `{-1, 0, 1, ..., num_items-1}`.
Otherwise, if the outside option is not modelled, the item-index-tensor should only contain values in `{0, 1, ..., num_items-1}`.
The utility of the outside option is always set to 0 while computing the probability.
By default, model_outside_option is set to False and the model does not model the outside option.
"""
# handle nest level model.
using_formula_to_initiate = (item_formula is not None) and (nest_formula is not None)
if using_formula_to_initiate:
# make sure that the research does not specify duplicated information, which might cause conflict.
if (nest_coef_variation_dict is not None) or (item_coef_variation_dict is not None):
raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_coef_variation_dict at the same time.')
if (nest_num_param_dict is not None) or (item_num_param_dict is not None):
raise ValueError('You specify the {item, nest}_formula to initiate the model, you should not specify the {item, nest}_num_param_dict at the same time.')
if dataset is None:
raise ValueError('Dataset is required if {item, nest}_formula is specified to initiate the model.')
nest_coef_variation_dict, nest_num_param_dict = parse_formula(nest_formula, dataset.datasets['nest'])
item_coef_variation_dict, item_num_param_dict = parse_formula(item_formula, dataset.datasets['item'])
else:
# check for conflicting information.
if (nest_formula is not None) or (item_formula is not None):
raise ValueError('You should not specify {item, nest}_formula and {item, nest}_coef_variation_dict at the same time.')
# make sure that the research specifies all the required information.
if (nest_coef_variation_dict is None) or (item_coef_variation_dict is None):
raise ValueError('You should specify the {item, nest}_coef_variation_dict to initiate the model.')
if (nest_num_param_dict is None) or (item_num_param_dict is None):
raise ValueError('You should specify the {item, nest}_num_param_dict to initiate the model.')
super(NestedLogitModel, self).__init__()
self.nest_to_item = nest_to_item
self.nest_coef_variation_dict = nest_coef_variation_dict
self.nest_num_param_dict = nest_num_param_dict
self.item_coef_variation_dict = item_coef_variation_dict
self.item_num_param_dict = item_num_param_dict
self.num_users = num_users
self.nests = list(nest_to_item.keys())
self.num_nests = len(self.nests)
self.num_items = sum(len(items) for items in nest_to_item.values())
# nest coefficients.
self.nest_coef_dict = self._build_coef_dict(self.nest_coef_variation_dict,
self.nest_num_param_dict,
self.num_nests,
weight_initialization=deepcopy(nest_weight_initialization))
# item coefficients.
self.item_coef_dict = self._build_coef_dict(self.item_coef_variation_dict,
self.item_num_param_dict,
self.num_items,
weight_initialization=deepcopy(item_weight_initialization))
self.shared_lambda = shared_lambda
if self.shared_lambda:
self.lambda_weight = nn.Parameter(torch.ones(1), requires_grad=True)
else:
self.lambda_weight = nn.Parameter(torch.ones(self.num_nests) / 2, requires_grad=True)
# breakpoint()
# self.iv_weights = nn.Parameter(torch.ones(1), requires_grad=True)
# used to warn users if forgot to call clamp.
self._clamp_called_flag = True
self.regularization = regularization
assert self.regularization in ['L1', 'L2', None], f"Provided regularization={self.regularization} is not allowed, allowed values are ['L1', 'L2', None]."
self.regularization_weight = regularization_weight
if (self.regularization is not None) and (self.regularization_weight is None):
raise ValueError(f'You specified regularization type {self.regularization} without providing regularization_weight.')
if (self.regularization is None) and (self.regularization_weight is not None):
raise ValueError(f'You specified no regularization but you provide regularization_weight={self.regularization_weight}, you should leave regularization_weight as None if you do not want to regularize the model.')
self.model_outside_option = model_outside_option
forward(self, batch)
An standard forward method for the model, the user feeds a ChoiceDataset batch and the model returns the predicted log-likelihood tensor. The main forward passing happens in the _forward() method, but we provide this wrapper forward() method for a cleaner API, as forward() only requires a single batch argument. For more details about the forward passing, please refer to the _forward() method.
TODO: the ConditionalLogitModel returns predicted utility, the NestedLogitModel behaves the same?
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
ChoiceDataset |
a ChoiceDataset object containing the data batch. |
required |
Returns:
Type | Description |
---|---|
torch.Tensor |
a tensor of shape (num_trips, num_items) including the log probability of choosing item i in trip t. |
Source code in torch_choice/model/nested_logit_model.py
def forward(self, batch: ChoiceDataset) -> torch.Tensor:
"""An standard forward method for the model, the user feeds a ChoiceDataset batch and the model returns the
predicted log-likelihood tensor. The main forward passing happens in the _forward() method, but we provide
this wrapper forward() method for a cleaner API, as forward() only requires a single batch argument.
For more details about the forward passing, please refer to the _forward() method.
# TODO: the ConditionalLogitModel returns predicted utility, the NestedLogitModel behaves the same?
Args:
batch (ChoiceDataset): a ChoiceDataset object containing the data batch.
Returns:
torch.Tensor: a tensor of shape (num_trips, num_items) including the log probability
of choosing item i in trip t.
"""
return self._forward(batch['nest'].x_dict,
batch['item'].x_dict,
batch['item'].user_index,
batch['item'].item_availability)
get_coefficient(self, variable, level=None)
Retrieve the coefficient tensor for the given variable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
variable |
str |
the variable name. |
required |
level |
str |
from which level of model to extract the coefficient, can be 'item' or 'nest'. The |
None |
Returns:
Type | Description |
---|---|
torch.Tensor |
the corresponding coefficient tensor of the requested variable. |
Source code in torch_choice/model/nested_logit_model.py
def get_coefficient(self, variable: str, level: Optional[str] = None) -> torch.Tensor:
"""Retrieve the coefficient tensor for the given variable.
Args:
variable (str): the variable name.
level (str): from which level of model to extract the coefficient, can be 'item' or 'nest'. The `level` argument will be discarded if `variable` is `lambda`.
Returns:
torch.Tensor: the corresponding coefficient tensor of the requested variable.
"""
if variable == 'lambda':
return self.lambda_weight.detach().clone()
if level not in ['item', 'nest']:
raise ValueError(f"Level should be either 'item' or 'nest', got {level}.")
return self.state_dict()[f'{level}_coef_dict.{variable}.coef'].detach().clone()
log_likelihood(self, *args)
Computes the log likelihood of the model, please refer to the negative_log_likelihood() method.
Returns:
Type | Description |
---|---|
_type_ |
the log likelihood of the model. |
loss(self, *args, **kwargs)
The loss function to be optimized. This is a wrapper of negative_log_likelihood
+ regularization loss if required.
Source code in torch_choice/model/nested_logit_model.py
def loss(self, *args, **kwargs):
"""The loss function to be optimized. This is a wrapper of `negative_log_likelihood` + regularization loss if required."""
nll = self.negative_log_likelihood(*args, **kwargs)
if self.regularization is not None:
L = {'L1': 1, 'L2': 2}[self.regularization]
for name, param in self.named_parameters():
if name == 'lambda_weight':
# we don't regularize the lambda term, we only regularize coefficients.
continue
nll += self.regularization_weight * torch.norm(param, p=L)
return nll
negative_log_likelihood(self, batch, y, is_train=True)
Computes the negative log likelihood of the model. Please note the log-likelihood is summed over all samples in batch instead of the average.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
batch |
ChoiceDataset |
the ChoiceDataset object containing the data. |
required |
y |
torch.LongTensor |
the label. |
required |
is_train |
bool |
which mode of the model to be used for the forward passing, if we need Hessian
of the NLL through auto-grad, |
True |
Returns:
Type | Description |
---|---|
torch.scalar_tensor |
the negative log likelihood of the model. |
Source code in torch_choice/model/nested_logit_model.py
def negative_log_likelihood(self,
batch: ChoiceDataset,
y: torch.LongTensor,
is_train: bool=True) -> torch.scalar_tensor:
"""Computes the negative log likelihood of the model. Please note the log-likelihood is summed over all samples
in batch instead of the average.
Args:
batch (ChoiceDataset): the ChoiceDataset object containing the data.
y (torch.LongTensor): the label.
is_train (bool, optional): which mode of the model to be used for the forward passing, if we need Hessian
of the NLL through auto-grad, `is_train` should be set to True. If we merely need a performance metric,
then `is_train` can be set to False for better performance.
Defaults to True.
Returns:
torch.scalar_tensor: the negative log likelihood of the model.
"""
# compute the negative log-likelihood loss directly.
if is_train:
self.train()
else:
self.eval()
# (num_trips, num_items)
logP = self.forward(batch)
# check shapes
if self.model_outside_option:
assert logP.shape == (len(batch['item']), self.num_items+1)
else:
assert logP.shape == (len(batch['item']), self.num_items)
# since y == -1 indicates the outside option and the last column of total_utility is the outside option, the following
# indexing should correctly retrieve the log-likelihood even for outside options.
nll = - logP[torch.arange(len(y)), y].sum()
return nll