Source code for pydl85.supervised.classifiers.boosting

import os
import sys
import time
from enum import Enum

from sklearn.base import ClassifierMixin

from ..classifiers.classifier import Cache_Type, Wipe_Type, DL85Classifier
from ...errors.errors import SearchFailedError, TreeNotFoundError
from sklearn.exceptions import NotFittedError
from sklearn.utils.multiclass import unique_labels
from sklearn.base import BaseEstimator
from copy import deepcopy
import numpy as np
import cvxpy


[docs]class Boosting_Model(Enum): """ The mathematical model solved by the boosting algorithm :cvar MODEL_LP_RATSCH: Model used in Ratsch paper. The regulator value is between ]0; 1] :cvar MODEL_LP_DEMIRIZ: Model used in Demiriz paper. The regulator value is between ]1/n_instances; +\infty] :cvar MODEL_LP_AGLIN: Model proposed by Aglin. The regulator value is between [0; 1] :cvar MODEL_QP_MDBOOST: regulator used in MDBOOST paper. The regulator value is between ]1/n_instances; +\infty] """ MODEL_LP_RATSCH = 1 # regulator of ratsch is between ]0; 1] MODEL_LP_DEMIRIZ = 2 # regulator of demiriz is between ]1/n_instances; +\infty] MODEL_LP_AGLIN = 3 # regulator of aglin is between [0; 1] MODEL_QP_MDBOOST = 4
def is_pos_def(x): # print(np.linalg.eigvals(x)) return np.all(np.linalg.eigvals(x) > 0) def is_semipos_def(x): return np.all(np.linalg.eigvals(x) >= 0) def isPD(B): """Returns true when input is positive-definite, via Cholesky""" try: _ = np.linalg.cholesky(B) return True except np.linalg.LinAlgError: return False def nearestPD(A): """Find the nearest positive-definite matrix to input A Python/Numpy port of John D'Errico's `nearestSPD` MATLAB code [1], which credits [2]. [1] https://www.mathworks.com/matlabcentral/fileexchange/42885-nearestspd [2] N.J. Higham, "Computing a nearest symmetric positive semidefinite matrix" (1988): https://doi.org/10.1016/0024-3795(88)90223-6 """ B = (A + A.T) / 2 _, s, V = np.linalg.svd(B) H = np.dot(V.T, np.dot(np.diag(s), V)) A2 = (B + H) / 2 A3 = (A2 + A2.T) / 2 if isPD(A3): return A3 spacing = np.spacing(np.linalg.norm(A)) # The above is different from [1]. It appears that MATLAB's `chol` Cholesky # decomposition will accept matrixes with exactly 0-eigenvalue, whereas # Numpy's will not. So where [1] uses `eps(mineig)` (where `eps` is Matlab # for `np.spacing`), we use the above definition. CAVEAT: our `spacing` # will be much larger than [1]'s `eps(mineig)`, since `mineig` is usually on # the order of 1e-16, and `eps(1e-16)` is on the order of 1e-34, whereas # `spacing` will, for Gaussian random matrixes of small dimension, be on # othe order of 1e-16. In practice, both ways converge, as the unit test # below suggests. I = np.eye(A.shape[0]) k = 1 while not isPD(A3): mineig = np.min(np.real(np.linalg.eigvals(A3))) A3 += I * (-mineig * k**2 + spacing) k += 1 return A3 def get_near_psd(A): C = (A + A.T)/2 eigval, eigvec = np.linalg.eig(C) eigval[eigval < 0] = 0 return eigvec.dot(np.diag(eigval)).dot(eigvec.T)
[docs]class DL85Booster(BaseEstimator, ClassifierMixin): """ An optimal binary decision tree classifier. Parameters ---------- base_estimator : Object, default=None The base estimator implementing fit/predict/predict_proba to learn at each step of the boosting process. max_depth : int, default=1 Maximum depth of the tree to be found min_sup : int, default=1 Minimum number of examples per leaf max_iterations : int, default=0 Maximum number of iterations. Default value stands for no bound. model : Boosting_Model, default=Boosting_Model.MODEL_LP_DEMIRIZ The mathematical model used to solve the boosting problem gamma : float, default=None Regularization parameter used in MDBOOST model. If None, it is set automatically error_function : function, default=None Function used to evaluate the quality of each node. The function must take at least one argument, the list of instances covered by the node. It should return a float value representing the error of the node. In case of supervised learning, it should additionally return a label. If no error function is provided, the default one is used. fast_error_function : function, default=None Function used to evaluate the quality of each node. The function must take at least one argument, the list of number of instances per class in the node. It should return a float value representing the error of the node and the predicted label. If no error function is provided, the default one is used. opti_gap : float, default=0.01 The optimality gap used in the optimization model max_error : int, default=0 Maximum allowed error. Default value stands for no bound. If no tree can be found that is strictly better, the model remains empty. regulator : float, default=-1 The regulator used in the optimization model. stop_after_better : bool, default=False A parameter used to indicate if the search will stop after finding a tree better than max_error time_limit : int, default=0 Allocated time in second(s) for the search. Default value stands for no limit. The best tree found within the time limit is stored, if this tree is better than max_error. verbose : bool, default=False A parameter used to switch on/off the print of what happens during the search desc : function, default=None A parameter used to indicate heuristic function used to sort the items in descending order asc : function, default=None A parameter used to indicate heuristic function used to sort the items in ascending order repeat_sort : bool, default=False A parameter used to indicate whether the heuristic sort will be applied at each level of the lattice or only at the root quiet : bool, default=True A parameter used to indicate if the boosting log will be printed or not print_output : bool, default=False A parameter used to indicate if the search output will be printed or not cache_type : Cache_Type, default=Cache_Type.Cache_TrieItemset A parameter used to indicate the type of cache used when the `DL85Predictor.usecache` is set to True. maxcachesize : int, default=0 A parameter used to indicate the maximum size of the cache. If the cache size is reached, the cache will be wiped using the `DL85Predictor.wipe_type` and `DL85Predictor.wipe_factor` parameters. Default value 0 stands for no limit. wipe_type : Wipe_Type, default=Wipe_Type.Reuses A parameter used to indicate the type of cache used when the `DL85Predictor.maxcachesize` is reached. wipe_factor : float, default=0.5 A parameter used to indicate the rate of elements to delete from the cache when the `DL85Predictor.maxcachesize` is reached. use_cache : bool, default=True A parameter used to indicate if a cache will be used or not use_ub : bool, default=True Define whether the hierarchical upper bound is used or not dynamic_branch : bool, default=True Define whether a dynamic branching is used to decide in which order explore decisions on an attribute Attributes ---------- optimal_ : bool Whether the found forest is optimal or not estimators_ : list List of DL85Classifier in the forest estimator_weights_ : list List of weights of the estimators in the forest accuracy_ : float Accuracy of the found forest on training set duration_ : float Time of the optimal forest learning n_estimators_ : int Number of estimators in the forest n_iterations_ : int Number of iterations of the forest learning margins_ : list List of margins of each instance in the training set margins_norm_ : list List of normalized margins of each instance in the training set classes_ : ndarray, shape (n_classes,) The classes seen in :meth:`fit`. """ def __init__( self, base_estimator=None, max_depth=1, min_sup=1, max_iterations=0, model=Boosting_Model.MODEL_LP_DEMIRIZ, gamma=None, error_function=None, fast_error_function=None, opti_gap=0.01, max_error=0, regulator=-1, stop_after_better=False, time_limit=0, verbose=False, desc=False, asc=False, repeat_sort=False, quiet=True, print_output=False, cache_type=Cache_Type.Cache_TrieItemset, maxcachesize=0, wipe_type=Wipe_Type.Subnodes, wipe_factor=0.5, use_cache=True, use_ub=True, dynamic_branch=True): self.clf_params = dict(locals()) self.clf_params["depth_two_special_algo"] = False self.clf_params["similar_lb"] = True self.clf_params["similar_for_branching"] = False del self.clf_params["self"] del self.clf_params["regulator"] del self.clf_params["base_estimator"] del self.clf_params["max_iterations"] del self.clf_params["model"] del self.clf_params["gamma"] del self.clf_params["opti_gap"] self.base_estimator = base_estimator self.max_depth = max_depth self.min_sup = min_sup self.max_iterations = max_iterations self.error_function = error_function self.fast_error_function = fast_error_function self.max_error = max_error self.stop_after_better = stop_after_better self.time_limit = time_limit self.verbose = verbose self.desc = desc self.asc = asc self.repeat_sort = repeat_sort self.print_output = print_output self.regulator = regulator self.quiet = quiet self.model = model self.gamma = gamma self.opti_gap = opti_gap self.n_instances = None self.A_inv = None self.solver = cvxpy.SCS self.optimal_ = True self.estimators_, self.estimator_weights_ = [], [] self.accuracy_ = self.duration_ = self.n_estimators_ = self.n_iterations_ = 0 self.margins_ = [] self.margins_norm_ = [] self.classes_ = [] def fit(self, X, y=None): if y is None: raise ValueError("The \"y\" value is compulsory for boosting.") start_time = time.perf_counter() # initialize variables self.n_instances, _ = X.shape sample_weights = np.array([1/self.n_instances] * self.n_instances) predictions, r, self.n_iterations_, constant = None, None, 1, 0.0001 if self.model == Boosting_Model.MODEL_QP_MDBOOST: if self.gamma is None: # Build positive semidefinite A matrix self.A_inv = np.full((self.n_instances, self.n_instances), -1/(self.n_instances - 1), dtype=np.float64) np.fill_diagonal(self.A_inv, 1) # regularize A to make sure it is really PSD self.A_inv = np.add(self.A_inv, np.dot(np.eye(self.n_instances), constant)) else: if self.gamma == 'auto': self.gamma = 1 / self.n_instances elif self.gamma == 'scale': self.gamma = 1 / (self.n_features * X.var()) elif self.gamma == 'nscale': # scaler = MinMaxScaler(feature_range=(-10, 10)) # self.gamma = 1 / scaler.fit_transform(X).var() self.gamma = 1 / X.var() self.A_inv = np.identity(self.n_instances, dtype=np.float64) for i in range(self.n_instances): for j in range(self.n_instances): if i != j: self.A_inv[i, j] = np.exp(-self.gamma * np.linalg.norm(np.subtract(X[i, :], X[j, :])) ** 2) if not self.quiet: print(self.A_inv) print("is psd", is_pos_def(self.A_inv)) print("is semi psd", is_semipos_def(self.A_inv)) self.A_inv = np.linalg.pinv(self.A_inv) if isPD(self.A_inv) is False: self.A_inv = nearestPD(self.A_inv) if not self.quiet: print("A_inv") print("is psd", isPD(self.A_inv)) print("is psd", is_pos_def(self.A_inv)) print("is semi psd", is_semipos_def(self.A_inv)) print("is semi psd", is_pos_def(nearestPD(self.A_inv))) if not self.quiet: print() while (self.max_iterations > 0 and self.n_iterations_ <= self.max_iterations) or self.max_iterations <= 0: if not self.quiet: print("n_iter", self.n_iterations_) # initialize the classifier clf = DL85Classifier(**self.clf_params) if self.base_estimator is None else self.base_estimator # fit the model zero_indices = np.where(sample_weights == 0)[0].tolist() keep_mask = np.in1d(np.arange(len(y)), zero_indices, invert=True) X_filtered = X[keep_mask] y_filtered = y[keep_mask] sample_weights_filtered = sample_weights[keep_mask] if self.quiet: old_stdout = sys.stdout sys.stdout = open(os.devnull, "w") clf.fit(X_filtered, y_filtered, sample_weight=sample_weights_filtered.tolist()) sys.stdout = old_stdout else: clf.fit(X_filtered, y_filtered, sample_weight=sample_weights_filtered.tolist()) # print the tree expression of the estimator if it has if not self.quiet: print("A new tree has been learnt based on previous found sample weights") if hasattr(clf, "tree_") and isinstance(clf.tree_, dict): pass # compute the prediction of the new estimator : 1 if correct else -1 try: pred = np.array([-1 if p != y[i] else 1 for i, p in enumerate(clf.predict(X))]) except (NotFittedError, SearchFailedError, TreeNotFoundError) as error: if not self.quiet: print("Problem during the search so we stop") break if not self.quiet: print("correct predictions - incorrect predictions =", pred.sum()) print("np.dot(predictions, sample_weigths) =", pred @ sample_weights) # check if optimal condition is met if self.n_iterations_ > 1: if pred @ sample_weights < r + self.opti_gap: if not self.quiet: print("np.dot(predictions, sample_weigths) < r + espsilon ==> we cannot add the new tree. End of iterations") print("Objective value at end is", opti) self.optimal_ = True break if not self.quiet: print("np.dot(predictions, sample_weigths) >= r + epsilon. We can add the new tree.") # add new prediction to all prediction matrix. Each column represents predictions of a tree for all examples predictions = pred.reshape((-1, 1)) if predictions is None else np.concatenate((predictions, pred.reshape(-1, 1)), axis=1) if not self.quiet: print("whole predictions shape", predictions.shape) print("run dual...") # add the new estimator and compute the dual to find new sample weights for another estimator to add self.estimators_.append(deepcopy(clf)) if self.model == Boosting_Model.MODEL_LP_RATSCH: r, sample_weights, opti, self.estimator_weights_ = self.compute_dual_ratsch(predictions) elif self.model == Boosting_Model.MODEL_LP_DEMIRIZ: r, sample_weights, opti, self.estimator_weights_ = self.compute_dual_demiriz(predictions) elif self.model == Boosting_Model.MODEL_LP_AGLIN: r, sample_weights, opti, self.estimator_weights_ = self.compute_dual_aglin(predictions) elif self.model == Boosting_Model.MODEL_QP_MDBOOST: r, sample_weights, opti, self.estimator_weights_ = self.compute_dual_mdboost(predictions) self.margins_ = (predictions @ np.array(self.estimator_weights_).reshape(-1, 1)).transpose().tolist()[0] self.margins_norm_ = (predictions @ np.array([float(i)/sum(self.estimator_weights_) for i in self.estimator_weights_]).reshape(-1, 1)).transpose().tolist()[0] if sum(self.estimator_weights_) > 0 else None if not self.quiet: print("after dual") print("We got", len(self.estimator_weights_), "trees with weights w:", self.estimator_weights_) print("Objective value at this stage is", opti) print("Value of r is", r) print("The sorted margin at this stage is", sorted(self.margins_)) mean = sum(self.margins_) / len(self.margins_) variance = sum([((x - mean) ** 2) for x in self.margins_]) / len(self.margins_) std = variance ** 0.5 print("min margin:", min(self.margins_), "\tmax margin:", max(self.margins_), "\tavg margin:", mean, "\tstd margin:", std, "\tsum:", sum(self.margins_)) print("number of neg margins:", len([marg for marg in self.margins_ if marg < 0]), "\tnumber of pos margins:", len([marg for marg in self.margins_ if marg >= 0])) print("The new sample weight for the next iteration is", sample_weights.tolist(), "\n") self.n_iterations_ += 1 self.duration_ = time.perf_counter() - start_time self.n_iterations_ -= 1 # remove the useless estimators zero_ind = [i for i, val in enumerate(self.estimator_weights_) if val == 0] if not self.quiet: print("\nall tree w", self.estimator_weights_, "\n", "zero ind", zero_ind) self.estimator_weights_ = np.delete(self.estimator_weights_, np.s_[zero_ind], axis=0) self.estimators_ = [clf for clf_id, clf in enumerate(self.estimators_) if clf_id not in zero_ind] predictions = np.delete(predictions, np.s_[zero_ind], axis=1) if not self.quiet: print("final pred shape", predictions.shape) # compute training accuracy of the found ensemble and store it in the variable `accuracy_` forest_pred_val = np.dot(predictions, np.array(self.estimator_weights_)) train_pred_correct_or_not = np.where(forest_pred_val < 0, 0, 1) # 1 if prediction is correct, 0 otherwise self.accuracy_ = sum(train_pred_correct_or_not)/len(y) # save the number of found estimators self.n_estimators_ = len(self.estimators_) # Show each non-zero estimator weight and its tree expression if it has if not self.quiet: for i, estimator in enumerate(sorted(zip(self.estimator_weights_, self.estimators_), key=lambda x: x[0], reverse=True)): print("clf n_", i+1, " ==>\tweight: ", estimator[0], sep="", end="") if hasattr(estimator[1], "tree_") and isinstance(estimator[1].tree_, dict): print(" \tjson_string: ", estimator[1].tree_, sep="") else: print() if self.n_estimators_ == 0: raise NotFittedError("No tree selected") self.classes_ = unique_labels(y) return self def compute_dual_ratsch(self, predictions): # primal is maximization r_ = cvxpy.Variable() u_ = cvxpy.Variable(self.n_instances) obj = cvxpy.Minimize(r_) constr = [predictions[:, i] @ u_ <= r_ for i in range(predictions.shape[1])] constr.append(-u_ <= 0) if self.regulator > 0: constr.append(u_ <= self.regulator) constr.append(cvxpy.sum(u_) == 1) problem = cvxpy.Problem(obj, constr) if self.quiet: old_stdout = sys.stdout sys.stdout = open(os.devnull, "w") opti = problem.solve(solver=self.solver) if self.quiet: sys.stdout = old_stdout return r_.value, u_.value, opti, [x.dual_value for x in problem.constraints[:predictions.shape[1]]] def compute_dual_aglin(self, predictions): # primal is maximization r_ = cvxpy.Variable() u_ = cvxpy.Variable(self.n_instances) v_ = cvxpy.Variable(self.n_instances) obj = cvxpy.Minimize(r_) constr = [-(predictions[:, t] @ u_) <= r_ for t in range(predictions.shape[1])] constr.append(u_ + v_ == -1) constr.append(cvxpy.sum(v_) == self.regulator) constr.append(-v_ <= 0) problem = cvxpy.Problem(obj, constr) if self.quiet: old_stdout = sys.stdout sys.stdout = open(os.devnull, "w") opti = problem.solve(solver=self.solver) if self.quiet: sys.stdout = old_stdout return r_.value, v_.value, opti, [x.dual_value for x in problem.constraints[:predictions.shape[1]]] def compute_dual_demiriz(self, predictions): # primal is minimization u_ = cvxpy.Variable(self.n_instances) obj = cvxpy.Maximize(cvxpy.sum(u_)) constr = [predictions[:, i] @ u_ <= 1 for i in range(predictions.shape[1])] constr.append(-u_ <= 0) constr.append(u_ <= self.regulator) problem = cvxpy.Problem(obj, constr) if self.quiet: old_stdout = sys.stdout sys.stdout = open(os.devnull, "w") opti = problem.solve(solver=self.solver) if self.quiet: sys.stdout = old_stdout return 1, u_.value, opti, [x.dual_value for x in problem.constraints[:predictions.shape[1]]] def compute_dual_mdboost(self, predictions): # primal is maximization r_ = cvxpy.Variable() u_ = cvxpy.Variable(self.n_instances) obj = cvxpy.Minimize(r_ + 1/(2*self.regulator) * cvxpy.quad_form((u_ - 1), self.A_inv)) constr = [predictions[:, i] @ u_ <= r_ for i in range(predictions.shape[1])] problem = cvxpy.Problem(obj, constr) if self.quiet: old_stdout = sys.stdout sys.stdout = open(os.devnull, "w") opti = problem.solve(solver=self.solver) if self.quiet: sys.stdout = old_stdout return r_.value, u_.value, opti, [x.dual_value for x in problem.constraints]
[docs] def softmax(self, X, copy=True): """ Calculate the softmax function. The softmax function is calculated by np.exp(X) / np.sum(np.exp(X), axis=1) This will cause overflow when large values are exponentiated. Hence the largest value in each row is subtracted from each data point to prevent this. Parameters ---------- X : array-like of float of shape (M, N) Argument to the logistic function. copy : bool, default=True Copy X or not. Returns ------- out : ndarray of shape (M, N) Softmax function evaluated at every point in x. """ if copy: X = np.copy(X) max_prob = np.max(X, axis=1).reshape((-1, 1)) X -= max_prob np.exp(X, X) sum_prob = np.sum(X, axis=1).reshape((-1, 1)) X /= sum_prob return X
def predict(self, X, y=None): if self.n_estimators_ == 0: # fit method has not been called print(self.estimators_) print(self.estimator_weights_) raise NotFittedError("Call fit method first" % {'name': type(self).__name__}) # Run a prediction on each estimator predict_per_clf = np.asarray([clf.predict(X) for clf in self.estimators_]).transpose() return np.apply_along_axis(lambda x: np.argmax(np.bincount(x, weights=self.estimator_weights_)), axis=1, arr=predict_per_clf.astype('int')) def predict_proba(self, X): classes = self.classes_[:, np.newaxis] pred = sum((np.array(estimator.predict(X)) == classes).T * w for estimator, w in zip(self.estimators_, self.estimator_weights_)) pred /= sum(self.estimator_weights_) pred[:, 0] *= -1 decision = pred.sum(axis=1) decision = np.vstack([-decision, decision]).T / 2 return self.softmax(decision, False) def get_nodes_count(self): if self.n_estimators_ == 0: # fit method has not been called raise NotFittedError("Call fit method first" % {'name': type(self).__name__}) return sum([clf.get_nodes_count() for clf in self.estimators_])