"""
Set of various measurements that can be used to track outcomes of interest
throughout a simulation. Diagnostics may optionally be included to measure
ancillary information for each measurement, such as variance or
kurtosis.
"""
from abc import ABC, abstractmethod
import networkx as nx
from networkx import wiener_index
import numpy as np
import pandas as pd
from scipy.stats import skew, kurtosis, shapiro
import matplotlib.pyplot as plt
from trecs.logging import VerboseMode
from trecs.base import (
BaseObservable,
register_observables,
)
class Diagnostics:
"""
Class to generate diagnostics on measurements.
Attributes
-----------
measurement_diagnostics: pandas dataframe
Dataframe containing diagnostics statistics at each timestep.
last_observation: `None` or :obj:`numpy.ndarray`
1-D numpy array containing the values for the specified metric
at the most recent timestep.
columns: list
List of strings containing the column titles.
"""
def __init__(
self,
columns=None,
):
if columns is None:
columns = [
"mean",
"std",
"median",
"min",
"max",
"skew",
"kurtosis",
"sw_stat",
"sw_p",
"n",
]
self.columns = columns
self.measurement_diagnostics = pd.DataFrame(columns=columns)
self.last_observation = None
def diagnose(self, observation):
"""
Calculates diagnostic measurements on the latest observation
from the recommender system. Also stores the current observation for
later reference.
Parameters
-----------
observation: :obj:`numpy.ndarray`
1-D numpy array containing the values for the specified metric
at this timestep.
"""
# rudimentary type-checks
if not isinstance(observation, np.ndarray):
raise TypeError("Diagnostics can only be performed on numpy arrays")
if observation.ndim != 1:
raise ValueError("Diagnostics can only be performed on 1-d numpy arrays")
self.last_observation = observation
values = []
sw_test = None
col_to_fn = {
"mean": np.mean,
"std": np.std,
"median": np.median,
"min": np.min,
"max": np.max,
"skew": skew,
"kurtosis": kurtosis,
}
for col in self.columns:
if col in col_to_fn:
values.append(col_to_fn[col](observation))
elif col == "sw_stat":
if sw_test is None:
sw_test = shapiro(observation)
values.append(sw_test.statistic)
elif col == "sw_p":
if sw_test is None:
sw_test = shapiro(observation)
if observation.size >= 5000:
sw_p = np.nan
else:
sw_p = sw_test.pvalue
values.append(sw_p)
elif col == "n":
values.append(observation.size)
diagnostics = pd.Series(
values,
index=self.measurement_diagnostics.columns,
)
self.measurement_diagnostics = self.measurement_diagnostics.append(
diagnostics, ignore_index=True
)
def hist(self, split_indices=None):
"""
Draws a histogram of the most recent observation values.
Parameters
-----------
split_indices: list or None
Contains "splits" that determine which values
to use for distinct histograms. For example,
if there are 100 observation values and the
split index is 50, then two separate histograms are
created from the first 50 values and the second 50
values.
"""
if len(split_indices) > 4:
raise RuntimeError("Too many split indices")
colors = ["blue", "orange", "red", "yellow", "green"]
if split_indices is not None and len(split_indices) > 0:
splits = [0] + split_indices + [self.last_observation.size]
for i in range(len(splits) - 1):
values = self.last_observation[splits[i] : splits[i + 1]]
plt.hist(values, alpha=0.7, color=colors[i])
else:
plt.hist(self.last_observation, bins="auto")
plt.ylabel("observation count (total n={})".format(self.last_observation.size))
def get_diagnostics(self):
"""
Returns
--------
`pd.DataFrame`:
Dataframe containing diagnostics statistics at each timestep.
"""
return self.measurement_diagnostics
[docs]class Measurement(BaseObservable, VerboseMode, ABC):
"""
Abstract observable class to store measurements.
Parameters
-----------
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
measurement_history: list
List of measurements. A new element is added at each timestep.
name: str
Name of the measurement quantity.
"""
def __init__(self, name, verbose=False):
self.name = name
VerboseMode.__init__(self, __name__.upper(), verbose)
self.measurement_history = list()
[docs] def get_measurement(self):
"""
Returns measurements. See
:func:`~base.base_components.BaseObservable.get_observable`
for more details.
Returns
--------
dict:
Measurements
"""
return self.get_observable(data=self.measurement_history)
[docs] def observe(self, observation, copy=True): # pylint: disable=arguments-differ
"""
Stores measurements. It can be called by implementations to ensure
consistency when storing different measurements.
Parameters
-----------
observation: array_like or int or float or None
Element that will be stored
copy: bool, default True
If ``True``, the function stores a copy of observation. Useful for
:obj:`numpy.ndarray`.
"""
# avoid in-place modification issues by copying lists and
# numpy arrays
if isinstance(observation, (list, np.ndarray)) and copy:
to_append = np.copy(observation)
else:
to_append = observation
self.measurement_history.append(to_append)
# print(self.measurement_diagnostics.head())
[docs] @abstractmethod
def measure(self, recommender):
"""
Function that should calculate some outcome of interest of the system
at the current timestep
"""
[docs] def get_timesteps(self):
"""
Returns the number of measurements stored (which is equivalent to the
number of timesteps that the system has been measuring).
Returns
--------
int:
Length of ``measurement_history``
"""
return len(self.measurement_history)
class MeasurementModule: # pylint: disable=too-few-public-methods
"""
Mixin for observers of :class:`Measurement` observables. Implements the
`Observer design pattern`_.
.. _`Observer design pattern`: https://en.wikipedia.org/wiki/Observer_pattern
This mixin allows the system to monitor metrics. That is, at each timestep,
an element will be added to the
:attr:`~metrics.measurement.Measurement.measurement_history` lists of each
metric that the system is monitoring.
Attributes
------------
metrics: list
List of metrics that the system will monitor.
"""
def __init__(self):
self.metrics = list()
def add_metrics(self, *args):
"""
Adds metrics to the :attr:`metrics` list. This allows the system to
monitor these metrics.
Parameters
-----------
args: :class:`~metrics.measurement.Measurement`
Accepts a variable number of metrics that inherits from
:class:`~metrics.measurement.Measurement`
"""
register_observables(
observer=self.metrics, observables=list(args), observable_type=Measurement
)
# after adding a new metric, we always perform an initial measurement
for metric in args:
metric.measure(self)
def measure_content(self):
"""
Calls method in the :class:`Measurements` module to record metrics.
For more details, see the :class:`Measurements` class and its measure
method.
"""
for metric in self.metrics:
metric.measure(self)
[docs]class InteractionMeasurement(Measurement):
"""
Keeps track of the interactions between users and items.
Specifically, at each timestep, it stores a histogram of length
:math:`|I|`, where element :math:`i` is the number of interactions
received by item :math:`i`.
Parameters
-----------
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"interaction_histogram"``
Name of the measurement component.
"""
def __init__(self, name="interaction_histogram", verbose=False):
Measurement.__init__(self, name, verbose)
@staticmethod
def _generate_interaction_histogram(interactions, num_users, num_items):
"""
Generates a histogram of the number of interactions per item at the
given timestep.
Parameters
-----------
interactions : :obj:`numpy.ndarray`
Array of user interactions.
num_users : int
Number of users in the system
num_items : int
Number of items in the system
Returns
---------
:obj:`numpy.ndarray`:
Histogram of the number of interactions aggregated by items at the given timestep.
"""
histogram = np.zeros(num_items)
np.add.at(histogram, interactions, 1)
# Check that there's one interaction per user
if histogram.sum() != num_users:
raise ValueError("The sum of interactions must be equal to the number of users")
return histogram
[docs] def measure(self, recommender):
"""
Measures and stores a histogram of the number of interactions per
item at the given timestep.
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from :class:`~models.recommender.BaseRecommender`.
"""
if recommender.interactions.size == 0:
# at beginning of simulation, there are no interactions
self.observe(None)
return
histogram = self._generate_interaction_histogram(
recommender.interactions, recommender.num_users, recommender.num_items
)
self.observe(histogram, copy=True)
[docs]class InteractionSimilarity(Measurement, Diagnostics):
"""
Keeps track of the average Jaccard similarity between interactions with items
between pairs of users at each timestep. The pairs of users must be passed
in by the user.
Parameters
-----------
pairs: iterable of tuples
Contains tuples representing each pair of users. Each user should
be represented as an index into the user profiles matrix.
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"interaction_similarity"``
Name of the measurement component.
"""
def __init__(
self, pairs, name="interaction_similarity", verbose=False, diagnostics=False, **kwargs
):
self.pairs = pairs
# will eventually be a matrix where each row corresponds to 1 user
self.interaction_hist = None
self.diagnostics = diagnostics
Measurement.__init__(self, name, verbose)
if diagnostics:
Diagnostics.__init__(self, **kwargs)
[docs] def measure(self, recommender):
"""
Measures the average Jaccard index of items that pairs of users have interacted
with in the system. Intuitively, a higher average Jaccard index corresponds to
increasing "homogenization" in that user behavior is becoming more and more
similar (i.e., users have all interacted with the same items).
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from
:class:`~models.recommender.BaseRecommender`.
"""
similarity = 0
interactions = recommender.interactions
if interactions.size == 0:
self.observe(None) # no interactions yet
return
if self.interaction_hist is None:
self.interaction_hist = np.copy(interactions).reshape((-1, 1))
else:
self.interaction_hist = np.hstack(
[self.interaction_hist, interactions.reshape((-1, 1))]
)
pair_sim = []
for pair in self.pairs:
itemset_1 = set(self.interaction_hist[pair[0], :])
itemset_2 = set(self.interaction_hist[pair[1], :])
common = len(itemset_1.intersection(itemset_2))
union = len(itemset_1.union(itemset_2))
similarity += common / union / len(self.pairs)
if self.diagnostics:
pair_sim.append(common / union)
self.observe(similarity)
if self.diagnostics:
self.diagnose(np.array(pair_sim))
[docs]class RecSimilarity(Measurement):
"""
Keeps track of the average Jaccard similarity between items seen by pairs
of users at each timestep. The pairs of users must be passed in by the
user.
Parameters
-----------
pairs: iterable of tuples
Contains tuples representing each pair of users. Each user should
be represented as an index into the user profiles matrix.
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"rec_similarity"``
Name of the measurement component.
"""
def __init__(self, pairs, name="rec_similarity", verbose=False):
self.pairs = pairs
Measurement.__init__(self, name, verbose)
[docs] def measure(self, recommender):
"""
Measures the average Jaccard index of items shown to pairs of users in
the system. Intuitively, a higher average Jaccard index corresponds to
increasing "homogenization" in that the recommender system is starting
to treat each user the same way (i.e., show them the same items).
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from
:class:`~models.recommender.BaseRecommender`.
"""
similarity = 0
items_shown = recommender.items_shown
if items_shown.size == 0:
# at the beginning of the simulation, there are no recommendations yet
self.observe(None)
return
for pair in self.pairs:
itemset_1 = set(items_shown[pair[0], :])
itemset_2 = set(items_shown[pair[1], :])
common = len(itemset_1.intersection(itemset_2))
union = len(itemset_1.union(itemset_2))
similarity += common / union / len(self.pairs)
self.observe(similarity)
[docs]class InteractionSpread(InteractionMeasurement):
"""
Measures the diversity of the interactions between users and items.
Specifically, at each timestep, it measures whether interactions are spread
among many items or only a few items.
This class inherits from :class:`.InteractionMeasurement`.
Parameters
-----------
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by InteractionMeasurement: :class:`.InteractionMeasurement`
name: str, default ``"interaction_spread"``
Name of the measurement component.
_old_histogram: None, list, array_like
A copy of the histogram at the previous timestep.
"""
def __init__(self, verbose=False):
self.histogram = None
self._old_histogram = None
InteractionMeasurement.__init__(self, name="interaction_spread", verbose=verbose)
[docs] def measure(self, recommender):
"""
Measures the diversity of user interactions -- that is, whether
interactions are spread among many items or only a few items.
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from
:class:`~models.recommender.BaseRecommender`.
"""
interactions = recommender.interactions
if interactions.size == 0: # initially, there are no interactions
self.observe(None)
return
histogram = self._generate_interaction_histogram(
interactions, recommender.num_users, recommender.num_items
)
histogram[::-1].sort()
if self._old_histogram is None:
self._old_histogram = np.zeros(recommender.num_items)
self.observe(np.trapz(self._old_histogram, dx=1) - np.trapz(histogram, dx=1), copy=False)
self._old_histogram = np.copy(histogram)
self.histogram = histogram
class RecallMeasurement(Measurement):
"""
Measures the proportion of relevant items (i.e., those users interacted with) falling
within the top k ranked items shown.
Parameters
-----------
k: int
The rank at which recall should be evaluated.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"recall_at_k"``
Name of the measurement component.
"""
# Note: RecallMeasurement evalutes recall for the top-k (i.e., highest predicted value)
# items regardless of whether these items derive from the recommender or from randomly
# interleaved items. Currently, this metric will only be correct for
# cases in which users iteract with one item per timestep
def __init__(self, k=5, name="recall_at_k", verbose=False):
self.k = k
Measurement.__init__(self, name, verbose)
def measure(self, recommender):
"""
Measures the proportion of relevant items (i.e., those users interacted with) falling
within the top k ranked items shown..
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from
:class:`~models.recommender.BaseRecommender`.
"""
if self.k >= recommender.num_items_per_iter:
raise ValueError("k must be smaller than the number of items per iteration")
interactions = recommender.interactions
if interactions.size == 0:
self.observe(None) # no interactions yet
return
else:
shown_item_scores = np.take(recommender.predicted_scores.value, recommender.items_shown)
shown_item_ranks = np.argsort(shown_item_scores, axis=1)
top_k_items = np.take(recommender.items_shown, shown_item_ranks[:, self.k :])
recall = (
len(np.where(np.isin(recommender.interactions, top_k_items))[0])
/ recommender.num_users
)
self.observe(recall)
[docs]class MSEMeasurement(Measurement, Diagnostics):
"""
Measures the mean squared error (MSE) between real and predicted user scores.
It can be used to evaluate how accurate the model predictions are.
This class inherits from :class:`.Measurement`.
Parameters
-----------
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str (optional, default: "mse")
Name of the measurement component.
"""
def __init__(self, verbose=False, diagnostics=False, **kwargs):
self.diagnostics = diagnostics
Measurement.__init__(self, "mse", verbose=verbose)
if diagnostics:
Diagnostics.__init__(self, **kwargs)
[docs] def measure(self, recommender):
"""
Measures and records the mean squared error between the user preferences
predicted by the system and the users' actual preferences.
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from :class:`~models.recommender.BaseRecommender`.
"""
diff = recommender.predicted_scores.value - recommender.users.actual_user_scores.value
self.observe((diff ** 2).mean(), copy=False)
if self.diagnostics:
self.diagnose(
(
recommender.predicted_scores.value.mean(axis=1)
- recommender.users.actual_user_scores.value.mean(axis=1)
)
** 2
)
class RMSEMeasurement(Measurement):
"""
Measures the root mean squared error (RMSE) between real and predicted user scores.
It can be used to evaluate how accurate the model predictions are.
This class inherits from :class:`.Measurement`.
Parameters
-----------
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"mse"``
Name of the measurement component.
"""
def __init__(self, verbose=False):
Measurement.__init__(self, "rmse", verbose=verbose)
def measure(self, recommender):
"""
Measures and records the mean squared error between the user preferences
predicted by the system and the users' actual preferences.
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from :class:`~models.recommender.BaseRecommender`.
"""
diff = recommender.predicted_scores.value - recommender.users.actual_user_scores.value
self.observe((diff ** 2).mean() ** 0.5, copy=False)
[docs]class DiffusionTreeMeasurement(Measurement):
"""
Class that implements an information diffusion tree. The current
implementation assumes that agents using this class (i.e., a model)
implement an :attr:`~models.bass.BassModel.infection_state` matrix that
denotes the initial state of information.
In this implementation, the nodes represent users and are labeled with the
user indices. A branch between nodes `u` and `v` indicates that user `u`
passed information onto user `v` -- that is, `u` "infected" `v`.
Trees are implemented using the `Networkx library`_. Please refer to
Networkx's `documentation`_ for more details.
.. _Networkx library: http://networkx.github.io
.. _documentation: https://networkx.github.io/documentation/stable/
Parameters
-----------
infection_state: :class:`~models.bass.InfectionState`
The initial "infection state" of all users
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"num_infected"``
Name of the metric that is recorded at each time step. Note that,
in this case, the metric stored in
:attr:`~.Measurement.measurement_history` is actually the
**number of infected users**. The diffusion tree itself is kept in
the :attr:`.diffusion_tree` data structure.
diffusion_tree: :obj:`networkx.Graph`
Diffusion tree.
_old_infection_state: array_like
Infection state at the previous timestep.
"""
def __init__(self, verbose=False):
self._old_infection_state = None
self.diffusion_tree = nx.Graph()
Measurement.__init__(self, "num_infected", verbose=verbose)
def _find_parents(self, user_profiles, new_infected_users):
"""Find the users who infected the newly infected users"""
if (self._old_infection_state == 0).all():
# Node is root
return None
# TODO: function is_following() based on code below:
# candidates must have been previously infected
prev_infected_users = np.where(self._old_infection_state > 0)[0]
# candidates must be connected to newly infected users
candidate_parents = user_profiles[:, prev_infected_users][new_infected_users]
if not isinstance(candidate_parents, np.ndarray):
candidate_parents = candidate_parents.toarray() # convert sparse to numpy if needed
# randomly select parent out of those who were infected, use random multiplication
candidate_parents = candidate_parents * np.random.rand(*candidate_parents.shape)
parents = prev_infected_users[np.argmax(candidate_parents, axis=1)]
return parents
def _add_to_graph(self, user_profiles, new_infected_users):
"""Add the newly infected users to the graph with edges to the users
who infected them
"""
self.diffusion_tree.add_nodes_from(new_infected_users)
parents = self._find_parents(user_profiles, new_infected_users)
# connect parent(s) and child(ren)
if parents is not None:
edges = np.vstack((parents, new_infected_users)).T
self.diffusion_tree.add_edges_from(edges)
def _manage_new_infections(self, user_profiles, current_infection_state):
"""Add new infected users to graph and return number of newly infected
users
Parameters
------------
user_profiles: :obj:`numpy.ndarray`
:math:`|U|\\times|A|` numpy adjacency matrix.
current_infection_state: :class:`~models.bass.InfectionState`
Matrix that contains state about recovered,
infected, and susceptible individuals.
"""
if self._old_infection_state is None:
self._old_infection_state = np.zeros(current_infection_state.value.shape)
new_infections = current_infection_state.infected_users()[0] # only extract user indices
if len(new_infections) == 0:
# no new infections
return 0
self._add_to_graph(user_profiles, new_infections)
# return number of new infections
return len(new_infections)
[docs] def measure(self, recommender):
"""
Updates tree with new infections and stores information about new
infections. In :attr:`~.Measurement.measurement_history`, it stores the
total number of infected users in the system -- that is, the number of
nodes in the tree.
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from :class:`~models.recommender.BaseRecommender`.
"""
self._manage_new_infections(recommender.users_hat.value, recommender.infection_state)
self.observe(self.diffusion_tree.number_of_nodes(), copy=False)
self._old_infection_state = np.copy(recommender.infection_state.value)
[docs] def draw_tree(self):
"""
Plots the tree using the Networkx library API.
"""
nx.draw(self.diffusion_tree, with_labels=True)
[docs]class StructuralVirality(DiffusionTreeMeasurement):
"""
This class extends :class:`DiffusionTreeMeasurement` with the concept of
structural virality developed by Goel, Anderson, Hofman, and Watts in
`The Structural Virality of Online Diffusion`_. It is used in
:class:`~models.bass.BassModel`.
.. _The Structural Virality of Online Diffusion: https://5harad.com/papers/twiral.pdf
"""
def __init__(self, verbose=False):
DiffusionTreeMeasurement.__init__(self, verbose)
[docs] def get_structural_virality(self):
"""
Returns a measure of structural virality.
Returns
--------
Structural virality: float
"""
num_nodes = self.diffusion_tree.number_of_nodes()
return wiener_index(self.diffusion_tree) / (num_nodes * (num_nodes - 1))
[docs]class AverageFeatureScoreRange(Measurement):
"""
Measures the average range (across users) of item attributes for items
users were recommended at a time step.
This metric is based on the item diversity measure used in :
Willemsen, M. C., Graus, M. P.,
& Knijnenburg, B. P. (2016). Understanding the role of latent feature
diversification on choice difficulty and satisfaction. User Modeling
and User-Adapted Interaction, 26(4), 347-389.
This class inherits from :class:`.Measurement`.
Parameters
-----------
verbose: bool, default False
If ``True``, enables verbose mode. Disabled by default.
Attributes
-----------
Inherited by Measurement: :class:`.Measurement`
name: str, default ``"afsr"``
Name of the measurement component.
"""
def __init__(self, name="afsr", verbose=False):
Measurement.__init__(self, name, verbose)
[docs] def measure(self, recommender):
"""
Measures the average range (across users) of item attributes for items
users were recommended at a time step. Used as a measure of within
list recommendation diversity
This metric is based on the item diversity measure used in :
Willemsen, M. C., Graus, M. P.,
& Knijnenburg, B. P. (2016). Understanding the role of latent feature
diversification on choice difficulty and satisfaction. User Modeling
and User-Adapted Interaction, 26(4), 347-389.
Parameters
------------
recommender: :class:`~models.recommender.BaseRecommender`
Model that inherits from
:class:`~models.recommender.BaseRecommender`.
"""
items_shown = recommender.items_shown
if items_shown.size == 0:
# at beginning of simulation, there are no recommendations,
# so we log a `None` value
self.observe(None)
return
recommended_item_attr = recommender.items_hat.value[:, items_shown]
afsr = np.mean(
recommended_item_attr.max(axis=(0, 2)) - recommended_item_attr.min(axis=(0, 2))
)
self.observe(afsr)