Module streamauc.streaming_metrics

Classes

class StreamingMetrics (num_thresholds: int = 200, num_classes: int = 2, thresholds: Union[List[float], numpy.ndarray, None] = None)

Class for keeping track of metrics for many thresholds in a minibatch-wise, iterative, fashion.

Parameters

num_thresholds : int, optional
Number of thresholds to evaluate the curve. Default is 200.
num_classes : int
Number of classes in the multiclass setting. Must be >= 2.
thresholds : list of float, optional
List of specific thresholds to evaluate the metrics at. A probability >= threshold is defined as a positive prediction for the respective class.
Expand source code
class StreamingMetrics:
    """
    Class for keeping track of metrics for many thresholds in a
    minibatch-wise, iterative, fashion.

    Parameters
    ----------
    num_thresholds : int, optional
        Number of thresholds to evaluate the curve. Default is 200.
    num_classes : int
        Number of classes in the multiclass setting. Must be >= 2.
    thresholds : list of float, optional
        List of specific thresholds to evaluate the metrics at.
        A probability >= threshold is defined as a positive prediction for
        the respective class.
    """

    def __init__(
        self,
        num_thresholds: int = 200,
        num_classes: int = 2,
        thresholds: Optional[Union[List[float], np.ndarray]] = None,
    ):
        if num_classes < 2:
            raise ValueError("Argument `num_classes` must be an integer >= 2.")

        self.num_classes = num_classes

        self.num_thresholds, self.thresholds = _validate_thresholds(
            num_thresholds, thresholds
        )

        self._confusion_matrix = np.zeros(
            (self.num_thresholds, self.num_classes, 2, 2),
            dtype=int,
        )

    @property
    def confusion_matrix(self) -> np.ndarray:
        """
        For each threshold, and for each class, there is a 2x2 confusion
        matrix. The entries of each confusion matrix correspond to the
        labels of:
        np.array([ ["TP","FN"],
                   ["FP", "TN"]
                   ])

        That is, the indices are given by:
        TP: self.confusion_matrix[..., 0,0]
        FP: self.confusion_matrix[..., 1,0]
        FN: self.confusion_matrix[..., 0,1]
        TN: self.confusion_matrix[..., 1,1]

        Note, that this corresponds to the flipped confusion matrix of
        sklearn. We prefer this order, as it corresponds to many written
        references, e.g. the wikipedia page.

        That is, self.confusion_matrix = np.flip(sklearn_confusion_matrix)

        """
        return self._confusion_matrix

    def reset(self):
        """
        Reset the intermediate values for the confusion matrix.
        """
        self._confusion_matrix = np.zeros(
            (self.num_thresholds, self.num_classes, 2, 2),
            dtype=int,
        )

    def update(
        self,
        y_true: np.ndarray,
        y_score: np.ndarray,
        check_inputs: bool = True,
    ):
        """
        Update the intermediate values based on streaming data.

        Parameters
        ----------
        y_true : np.ndarray
            Ground truth labels of shape [-1] (or [-1, 1], [-1, 1, 1, 1]...)
            with values indicating the class index. Alternatively, may also
            be one-hot encoded labels of shape [-1, num_classes].
        y_score : np.ndarray
            Predicted probabilities for each class of shape[-1, num_classes]

        Raises
        ------
        ValueError
            If the shapes of `y_true` and `y_pred` do not match.
        """

        y_true = np.squeeze(y_true).astype(int)
        y_score = np.squeeze(y_score)

        if check_inputs:
            if y_true.ndim > 2:
                raise ValueError(
                    f"Unknown shape of y_true: {y_true.shape},"
                    f"must be squeezable to either [-1, num_classes] or [-1]."
                )
            if y_true.ndim == 2 and np.any(y_true.sum(-1) != 1):
                raise ValueError("The provided one-hot encoding is invalid.")
            if y_score.ndim > 2:
                raise ValueError(
                    f"Unknown shape of y_true: {y_true.shape},"
                    f"must be squeezable to either [-1, num_classes] or [-1]."
                )

            if not (y_true.shape[0] == y_score.shape[0]):
                raise ValueError(
                    "Number of samples in y_true and y_pred must match"
                )

            if (y_score.ndim == 2) and (y_score.shape[1] != self.num_classes):
                raise ValueError(f"Invalid shape of y_pred: {y_score.shape}")

        if y_true.ndim == 2 and y_true.shape[1] == self.num_classes:
            y_onehot = y_true
        else:
            y_onehot = onehot_encode(y_true, num_classes=self.num_classes)

        # use numpy broadcasting to get predictions
        # breakpoint()
        pred_pos = y_score[np.newaxis, ...] >= self.thresholds.reshape(
            -1, 1, 1
        )

        tp, fp, fn, tn = compute_confusion_matrix(
            pred_pos=pred_pos, y_onehot=y_onehot
        )

        # update confusion matrix entry

        self._confusion_matrix[..., 0, 0] += tp
        self._confusion_matrix[..., 1, 0] += fp
        self._confusion_matrix[..., 1, 1] += tn
        self._confusion_matrix[..., 0, 1] += fn

    def _total(self) -> np.ndarray:
        """
        Calculate total for each threshold and class.
        Of course, this should be the same value for all thresholds.

        Returns
        -------
        np.ndarray
            Total at each threshold.
        """
        total = self.confusion_matrix.sum(-1).sum(-1)
        return total

    def true_positives(self) -> np.ndarray:
        """
        Calculate true positives for each threshold and class.

        Returns
        -------
        np.ndarray
            True positives at each threshold.
        """

        return self.confusion_matrix[..., 0, 0]
        # tp = np.diagonal(self._confusion_matrix, axis1=1, axis2=2)
        # return tp

    def false_positives(self) -> np.ndarray:
        """
        Calculate false positives for each threshold and class.

        Returns
        -------
        np.ndarray
            False positives at each threshold.
        """
        # tp = self.true_positives()
        # pp = self.predicted_positives()
        #
        # fp = pp - tp
        # return fp

        return self.confusion_matrix[..., 1, 0]

    def true_negatives(self) -> np.ndarray:
        """
        Calculate the total negatives for each threshold and class.

        Returns
        -------
        np.ndarray
            Negatives at each threshold.
        """

        return self.confusion_matrix[..., 1, 1]

    def false_negatives(self) -> np.ndarray:
        """
        Calculate false negatives for each threshold and class.

        Returns
        -------
        np.ndarray
            False negatives at each threshold.
        """
        # tp = self.true_positives()
        # p = self.positives()
        #
        # fn = p - tp
        return self.confusion_matrix[..., 0, 1]

    def positives(self) -> np.ndarray:
        """
        Calculate the total positives for each threshold and class.

        Returns
        -------
        np.ndarray
            Positives at each threshold.
        """
        # return np.sum(self._confusion_matrix, axis=-1)
        return (
            self.confusion_matrix[..., 0, 0] + self.confusion_matrix[..., 0, 1]
        )

    def negatives(self) -> np.ndarray:
        """
        Calculate the total negatives for each threshold and class.

        Returns
        -------
        np.ndarray
            Negatives at each threshold.
        """

        return self._total() - self.positives()

    def predicted_positives(self):
        """
        Calculate predicted positives for each threshold and class.

        Returns
        -------
        np.ndarray
            Predicted positives at each threshold.
        """

        return (
            self.confusion_matrix[..., 0, 0] + self.confusion_matrix[..., 1, 0]
        )

    def predicted_negatives(self) -> np.ndarray:
        """
        Calculate predicted negatives for each threshold and class.

        Returns
        -------
        np.ndarray
            Predicted positives at each threshold.
        """
        #
        # pp = self.predicted_positives()
        # total = self._total()
        #
        # pn = total - pp
        # return pn

        return (
            self.confusion_matrix[..., 0, 1] + self.confusion_matrix[..., 1, 1]
        )

    def calc_metric(
        self,
        metric: Callable,
        method: AggregationMethod = AggregationMethod.MACRO,
        class_index: Optional[int] = None,
        check_inputs=True,
    ):
        tp = self.true_positives()
        fp = self.false_positives()
        fn = self.false_negatives()
        tn = self.true_negatives()

        return metric(
            tp=tp,
            fp=fp,
            fn=fn,
            tn=tn,
            class_index=class_index,
            method=method,
            check_inputs=check_inputs,
        )

    def auc(
        self,
        metric_xaxis: Callable = metrics.fpr,
        metric_yaxis: Callable = metrics.tpr,
        method: AggregationMethod = AggregationMethod.ONE_VS_ALL,
        class_index: Optional[int] = None,
        check_inputs=True,
    ):

        metric_args = dict(
            tp=self.true_positives(),
            fp=self.false_positives(),
            fn=self.false_negatives(),
            tn=self.true_negatives(),
            class_index=class_index,
            method=method,
            check_inputs=check_inputs,
        )

        x_values = metric_xaxis(**metric_args)
        y_values = metric_yaxis(**metric_args)
        return auc(x_values, y_values)

    def precision_recall_curve(
        self,
        method: AggregationMethod = AggregationMethod.ONE_VS_ALL,
        class_index: Optional[int] = None,
        check_inputs: bool = True,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Compute precision and recall at all thresholds for plotting and auc
        computation. We adopt the behaviour of sklearn, in that the
        precision corresponding to a recall of 0 is 1.

        (Technically its undefined since its tp/(tp+fp) with tp=fp=0,
        but the value of 1 serves for stable plotting.)

        Parameters
        ----------
        method : AggregationMethod
            Method used to compute precision and recall for multiple classes.
            Micro and macro refer to the averaging method.
            Macro computes the metric for each class, and then averages the
            metrics.
            If '1-vs-all' the index for the positive class has to be defined
            in 'class_index'. All other classes will be summarized as the
            negative class.

            Must be one of ["macro","micro","1-vs-all"].

        class_index : int, optional
            Class index for "1-vs-all" calculation.
             Required if `method` is "1-vs-all".

        Returns
        -------
        precision : np.ndarray
            Precision values at each threshold.
        recall : np.ndarray
            Recall values at each threshold.
        """
        tp = self.true_positives()
        fp = self.false_positives()
        fn = self.false_negatives()

        precision = metrics.precision(
            tp=tp,
            fp=fp,
            method=method,
            class_index=class_index,
            check_inputs=check_inputs,
        )

        recall = metrics.recall(
            tp=tp,
            fn=fn,
            method=method,
            class_index=class_index,
            check_inputs=check_inputs,
        )
        # ensure precision 1 at recall 0
        # precision[0] = 1.0
        return (
            precision[::-1][1:].squeeze(),
            recall[::-1][1:].squeeze(),
            self.thresholds[1:][::-1].squeeze(),
        )

    def roc_curve(
        self,
        method: AggregationMethod = AggregationMethod.ONE_VS_ALL,
        class_index: Optional[int] = None,
        check_inputs: bool = True,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        tp = self.true_positives()
        fp = self.false_positives()
        fn = self.false_negatives()
        tn = self.true_negatives()

        _tpr = metrics.tpr(
            tp=tp,
            fn=fn,
            method=method,
            class_index=class_index,
            check_inputs=check_inputs,
        )
        _fpr = metrics.fpr(
            fp=fp,
            tn=tn,
            method=method,
            class_index=class_index,
            check_inputs=check_inputs,
        )
        return _fpr.squeeze(), _tpr.squeeze(), self.thresholds.squeeze()

    def plot_roc_curve(
        self,
        class_names: Optional[List[str]] = None,
        method=AggregationMethod.ONE_VS_ALL,
        class_index: Optional[int] = None,
        **kwargs,
    ) -> plt.Figure:  #
        # pragma: nocover
        fpr, tpr, thresholds = self.roc_curve(
            method=method,
            class_index=class_index,
        )

        if method != AggregationMethod.ONE_VS_ALL:
            class_names = None
        return plot_curve_and_auc(
            x_values=fpr,
            y_values=tpr,
            thresholds=thresholds,
            class_names=class_names,
            **kwargs,
        )

    def plot_precision_recall_curve(
        self,
        class_names: Optional[List[str]] = None,
        method=AggregationMethod.ONE_VS_ALL,
        class_index: Optional[int] = None,
        **kwargs,
    ) -> plt.Figure:  #

        # pragma: nocover
        precision, recall, thresholds = self.precision_recall_curve(
            method=method,
            class_index=class_index,
        )

        if method != AggregationMethod.ONE_VS_ALL:
            class_names = None
            assert (
                class_index is None
            ), "class_index is only usable for ONE_VS_ALL"
        return plot_curve_and_auc(
            x_values=recall,
            y_values=precision,
            thresholds=thresholds,
            class_names=class_names,
            **kwargs,
        )

Instance variables

prop confusion_matrix : numpy.ndarray

For each threshold, and for each class, there is a 2x2 confusion matrix. The entries of each confusion matrix correspond to the labels of: np.array([ ["TP","FN"], ["FP", "TN"] ])

That is, the indices are given by: TP: self.confusion_matrix[…, 0,0] FP: self.confusion_matrix[…, 1,0] FN: self.confusion_matrix[…, 0,1] TN: self.confusion_matrix[…, 1,1]

Note, that this corresponds to the flipped confusion matrix of sklearn. We prefer this order, as it corresponds to many written references, e.g. the wikipedia page.

That is, self.confusion_matrix = np.flip(sklearn_confusion_matrix)

Expand source code
@property
def confusion_matrix(self) -> np.ndarray:
    """
    For each threshold, and for each class, there is a 2x2 confusion
    matrix. The entries of each confusion matrix correspond to the
    labels of:
    np.array([ ["TP","FN"],
               ["FP", "TN"]
               ])

    That is, the indices are given by:
    TP: self.confusion_matrix[..., 0,0]
    FP: self.confusion_matrix[..., 1,0]
    FN: self.confusion_matrix[..., 0,1]
    TN: self.confusion_matrix[..., 1,1]

    Note, that this corresponds to the flipped confusion matrix of
    sklearn. We prefer this order, as it corresponds to many written
    references, e.g. the wikipedia page.

    That is, self.confusion_matrix = np.flip(sklearn_confusion_matrix)

    """
    return self._confusion_matrix

Methods

def auc(self, metric_xaxis: Callable = <function fpr>, metric_yaxis: Callable = <function tpr>, method: AggregationMethod = AggregationMethod.ONE_VS_ALL, class_index: Optional[int] = None, check_inputs=True)
def calc_metric(self, metric: Callable, method: AggregationMethod = AggregationMethod.MACRO, class_index: Optional[int] = None, check_inputs=True)
def false_negatives(self) ‑> numpy.ndarray

Calculate false negatives for each threshold and class.

Returns

np.ndarray
False negatives at each threshold.
def false_positives(self) ‑> numpy.ndarray

Calculate false positives for each threshold and class.

Returns

np.ndarray
False positives at each threshold.
def negatives(self) ‑> numpy.ndarray

Calculate the total negatives for each threshold and class.

Returns

np.ndarray
Negatives at each threshold.
def plot_precision_recall_curve(self, class_names: Optional[List[str]] = None, method=AggregationMethod.ONE_VS_ALL, class_index: Optional[int] = None, **kwargs) ‑> matplotlib.figure.Figure
def plot_roc_curve(self, class_names: Optional[List[str]] = None, method=AggregationMethod.ONE_VS_ALL, class_index: Optional[int] = None, **kwargs) ‑> matplotlib.figure.Figure
def positives(self) ‑> numpy.ndarray

Calculate the total positives for each threshold and class.

Returns

np.ndarray
Positives at each threshold.
def precision_recall_curve(self, method: AggregationMethod = AggregationMethod.ONE_VS_ALL, class_index: Optional[int] = None, check_inputs: bool = True) ‑> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]

Compute precision and recall at all thresholds for plotting and auc computation. We adopt the behaviour of sklearn, in that the precision corresponding to a recall of 0 is 1.

(Technically its undefined since its tp/(tp+fp) with tp=fp=0, but the value of 1 serves for stable plotting.)

Parameters

method : AggregationMethod

Method used to compute precision and recall for multiple classes. Micro and macro refer to the averaging method. Macro computes the metric for each class, and then averages the metrics. If '1-vs-all' the index for the positive class has to be defined in 'class_index'. All other classes will be summarized as the negative class.

Must be one of ["macro","micro","1-vs-all"].

class_index : int, optional
Class index for "1-vs-all" calculation. Required if method is "1-vs-all".

Returns

precision : np.ndarray
Precision values at each threshold.
recall : np.ndarray
Recall values at each threshold.
def predicted_negatives(self) ‑> numpy.ndarray

Calculate predicted negatives for each threshold and class.

Returns

np.ndarray
Predicted positives at each threshold.
def predicted_positives(self)

Calculate predicted positives for each threshold and class.

Returns

np.ndarray
Predicted positives at each threshold.
def reset(self)

Reset the intermediate values for the confusion matrix.

def roc_curve(self, method: AggregationMethod = AggregationMethod.ONE_VS_ALL, class_index: Optional[int] = None, check_inputs: bool = True) ‑> Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]
def true_negatives(self) ‑> numpy.ndarray

Calculate the total negatives for each threshold and class.

Returns

np.ndarray
Negatives at each threshold.
def true_positives(self) ‑> numpy.ndarray

Calculate true positives for each threshold and class.

Returns

np.ndarray
True positives at each threshold.
def update(self, y_true: numpy.ndarray, y_score: numpy.ndarray, check_inputs: bool = True)

Update the intermediate values based on streaming data.

Parameters

y_true : np.ndarray
Ground truth labels of shape [-1] (or [-1, 1], [-1, 1, 1, 1]…) with values indicating the class index. Alternatively, may also be one-hot encoded labels of shape [-1, num_classes].
y_score : np.ndarray
Predicted probabilities for each class of shape[-1, num_classes]

Raises

ValueError
If the shapes of y_true and y_pred do not match.