Source code for direpack.sprm.sprm

from __future__ import absolute_import, division, print_function
from __future__ import unicode_literals
from sklearn.base import RegressorMixin, BaseEstimator, TransformerMixin
from sklearn.utils.metaestimators import _BaseComposition
from scipy.stats import norm, chi2
import copy
import numpy as np
import pandas as ps
import warnings
from ..preprocessing.robcent import VersatileScaler
from .snipls import snipls
from ..utils.utils import MyException, _predict_check_input, _check_input
from ._m_support_functions import *
from ..preprocessing._preproc_utilities import scale_data, mad


[docs]class sprm(_BaseComposition, BaseEstimator, TransformerMixin, RegressorMixin): """ SPRM Sparse Partial Robust M Regression Algorithm first outlined in: Sparse partial robust M regression, Irene Hoffmann, Sven Serneels, Peter Filzmoser, Christophe Croux, Chemometrics and Intelligent Laboratory Systems, 149 (2015), 50-59. Parameters ----------- eta : float. Sparsity parameter in [0,1) n_components : int min 1. Note that if applied on data, n_components shall take a value <= min(x_data.shape) fun : str downweighting function. 'Hampel' (recommended), 'Fair' or 'Huber' probp1 : float probability cutoff for start of downweighting (e.g. 0.95) probp2 : float probability cutoff for start of steep downweighting (e.g. 0.975, only relevant if fun='Hampel') probp3 : float probability cutoff for start of outlier omission (e.g. 0.999, only relevant if fun='Hampel') centre : str type of centring (`'mean'`, `'median'`, `'l1median'`, or `'kstepLTS'`, the latter recommended statistically, if too slow, switch to `'median'`) scale : str type of scaling ('std','mad', 'scaleTau2' [recommended] or 'None') verbose : booleans specifying verbose mode maxit : int maximal number of iterations in M algorithm tol : float tolerance for convergence in M algorithm start_cutoff_mode : str, values:'specific' will set starting value cutoffs specific to X and y (preferred); any other value will set X and y stating cutoffs identically. The latter yields identical results to the SPRM R implementation available from CRAN. start_X_init: str, values: 'pcapp' will include a PCA/broken stick projection to calculate the staring weights, else just based on X; any other value will calculate the X starting values based on the X matrix itself. This is less stable for very flat data (p >> n), yet yields identical results to the SPRM R implementation available from CRAN. columns : (def false) Either boolean, list, numpy array or pandas Index if False, no column names supplied; if True, if X data are supplied as a pandas data frame, will extract column names from the frame throws an error for other data input types if a list, array or Index (will only take length x_data.shape[1]), the column names of the x_data supplied in this list, will be printed in verbose mode. copy : (def True) boolean, whether to copy data Attributes --------------- Attributes always provided - `x_weights_`: X block PLS weighting vectors (usually denoted W) - `x_loadings_`: X block PLS loading vectors (usually denoted P) - `C_`: vector of inner relationship between response and latent variablesblock re - `x_scores_`: X block PLS score vectors (usually denoted T) - `coef_`: vector of regression coefficients - `intercept_`: intercept - `coef_scaled_`: vector of scaled regression coeeficients (when scaling option used) - `intercept_scaled_`: scaled intercept - `residuals_`: vector of regression residuals - `x_ev_`: X block explained variance per component - `y_ev_`: y block explained variance - `fitted_`: fitted response - `x_Rweights_`: X block SIMPLS style weighting vectors (usually denoted R) - `x_caseweights_`: X block case weights - `y_caseweights_`: y block case weights - `caseweights_`: combined case weights - `colret_`: names of variables retained in the sparse model - `x_loc_`: X block location estimate - `y_loc_`: y location estimate - `x_sca_`: X block scale estimate - `y_sca_`: y scale estimate - `non_zero_scale_vars_`: indicator vector of variables in X with nonzero scale """
[docs] def __init__( self, n_components=1, eta=0.5, fun="Hampel", probp1=0.95, probp2=0.975, probp3=0.999, centre="median", scale="mad", verbose=True, maxit=100, tol=0.01, start_cutoff_mode="specific", start_X_init="pcapp", columns=False, copy=True, ): self.n_components = int(n_components) self.eta = float(eta) self.fun = fun self.probp1 = probp1 self.probp2 = probp2 self.probp3 = probp3 self.centre = centre self.scale = scale self.verbose = verbose self.maxit = maxit self.tol = tol self.start_cutoff_mode = start_cutoff_mode self.start_X_init = start_X_init self.columns = columns self.copy = copy self.probctx_ = "irrelevant" self.probcty_ = "irrelevant" self.hampelbx_ = "irrelevant" self.hampelby__ = "irrelevant" self.hampelrx_ = "irrelevant" self.hampelry_ = "irrelevant" self.non_zero_scale_vars_ = None
def fit(self, X, y): """ Fit a SPRM model. Parameters ------------ X : numpy array Input data. y : vector or 1D matrix Response data """ if self.copy: self.X = copy.deepcopy(X) self.y = copy.deepcopy(y) (n, p) = X.shape if not (type(self.n_components) == int) | (self.n_components <= 0): raise MyException("Number of components has to be a positive integer") if (self.n_components > n) | (self.n_components > p): raise MyException("The number of components is too large.") if self.n_components <= 0: raise MyException("The number of components has to be positive.") if not (type(self.eta) == float): raise MyException( "Sparsity parameter eta has to be a floating point number" ) if (self.eta < 0) | (self.eta >= 1): raise MyException("eta has to come from the interval [0,1)") if not (self.fun in ("Hampel", "Huber", "Fair")): raise MyException( "Invalid weighting function. Choose Hampel, Huber or Fair for parameter fun." ) if (self.probp1 > 1) | (self.probp1 <= 0): raise MyException("probp1 is a probability. Choose a value between 0 and 1") if self.fun == "Hampel": if not ( (self.probp1 < self.probp2) & (self.probp2 < self.probp3) & (self.probp3 <= 1) ): raise MyException( "Wrong choise of parameters for Hampel function. Use 0<probp1<hampelp2<hampelp3<=1" ) if type(self.columns) is list: self.columns = np.array(self.columns) elif type(self.columns) is bool: if type(X) != ps.core.frame.DataFrame and self.columns: raise ( MyException( "Columns set to true can only extract column names for data frame input" ) ) if type(X) == ps.core.frame.DataFrame: if type(self.columns) is bool and self.columns: self.columns = X.columns X = _check_input(X) y = _check_input(y) ny = y.shape[0] if ny != n: raise MyException("Number of cases in y and X must be identical.") if self.scale == "scaleTau2": if (mad(X) == 0).any(): # kstepLTS divides by MAD. Zero MAD leads to nan -> detect zero MAD, scale by MAD and remove zero MAD variable. self.scale = "mad" warnings.warn("Scale set to `mad`") scaling = VersatileScaler(center=self.centre, scale=self.scale) Xs = scaling.fit_transform(X).astype("float64") mX = scaling.col_loc_ sX = scaling.col_sca_ ys = scaling.fit_transform(y).astype("float64") my = scaling.col_loc_ sy = scaling.col_sca_ setattr(self, "x_loc_", mX) setattr(self, "y_loc_", my) setattr(self, "x_sca_", sX) setattr(self, "y_sca_", sy) ys = np.array(ys).reshape(-1) zero_scale = np.where(sX < 1e-5)[0] vars_to_keep = np.arange(0, p) if len(zero_scale) > 0: if type(self.columns) != bool: warntext = ( "Zero scale variables with indices " + str(self.columns[zero_scale]) + " detected and removed" ) else: warntext = ( "Zero scale variables with indices " + str(zero_scale) + " detected and removed" ) warnings.warn(warntext) vars_to_keep = np.setdiff1d(np.arange(0, p), zero_scale) Xs = Xs[:, vars_to_keep] X = X[:, vars_to_keep] sX = sX[vars_to_keep] if type(self.columns) != bool: self.columns = self.columns[vars_to_keep] p = len(vars_to_keep) if self.n_components > p: raise MyException( "Upon removal of zero scale variables, the number of components is too large. Please reduce to {p} maximally." ) self.non_zero_scale_vars_ = vars_to_keep if self.start_X_init == "pcapp": U, S, V = np.linalg.svd(Xs) spc = np.square(S) spc /= np.sum(spc) relcomp = max(np.where(spc - brokenstick(min(p, n))[:, 0] <= 0)[0][0], 1) Urc = np.array(U[:, 0:relcomp]) Us = scaling.fit_transform(Urc) else: Us = Xs wx = np.sqrt(np.array(np.sum(np.square(Us), 1), dtype=np.float64)) wx = wx / np.median(wx) if [self.centre, self.scale] == ["median", "mad"]: wy = np.array(abs(ys), dtype=np.float64) else: wy = (y - np.median(y, axis=0)) / ( 1.4826 * np.median(abs(y - np.median(y, axis=0)), axis=0) ) self.probcty_ = norm.ppf(self.probp1) if self.start_cutoff_mode == "specific": self.probctx_ = chi2.ppf(self.probp1, relcomp) else: self.probctx_ = self.probcty_ if self.fun == "Fair": wx = Fair(wx, self.probctx_) wy = Fair(wy, self.probcty_) if self.fun == "Huber": wx = Huber(wx, self.probctx_) wy = Huber(wy, self.probcty_) if self.fun == "Hampel": self.hampelby_ = norm.ppf(self.probp2) self.hampelry_ = norm.ppf(self.probp3) if self.start_cutoff_mode == "specific": self.hampelbx_ = chi2.ppf(self.probp2, relcomp) self.hampelrx_ = chi2.ppf(self.probp3, relcomp) else: self.hampelbx_ = self.hampelby_ self.hampelrx_ = self.hampelry_ wx = Hampel(wx, self.probctx_, self.hampelbx_, self.hampelrx_) wy = Hampel(wy, self.probcty_, self.hampelby_, self.hampelry_) wx = np.array(wx).reshape(-1) wy = np.array(wy).reshape(-1) w = (wx * wy).astype("float64") if (w < 1e-06).any(): w0 = np.where(w < 1e-06)[0] w[w0] = 1e-06 we = np.array(w, dtype=np.float64) else: we = np.array(w, dtype=np.float64) wte = wx wye = wy WEmat = np.array([np.sqrt(we) for i in range(1, p + 1)], ndmin=1).T Xw = np.multiply(Xs, WEmat).astype("float64") yw = ys * np.sqrt(we) scalingt = copy.deepcopy(scaling) loops = 1 rold = 1e-5 difference = 1 # Begin at iteration res_snipls = snipls( self.eta, self.n_components, self.verbose, self.columns, "mean", "None", self.copy, ) while (difference > self.tol) & (loops < self.maxit): res_snipls.fit(Xw, yw) T = np.divide(res_snipls.x_scores_, WEmat[:, 0 : (self.n_components)]) b = res_snipls.coef_ yp = res_snipls.fitted_ r = ys - yp if len(r) / 2 > np.sum(r == 0): r = abs(r) / (1.4826 * np.median(abs(r))) else: r = abs(r) / (1.4826 * np.median(abs(r[r != 0]))) scalet = self.scale if scalet == "None": scalingt.set_params(scale="mad") dt = scalingt.fit_transform(T) wtn = np.sqrt(np.array(np.sum(np.square(dt), 1), dtype=np.float64)) wtn = wtn / np.median(wtn) wtn = wtn.reshape(-1) wye = r.reshape(-1) wte = wtn if self.fun == "Fair": wte = Fair(wtn, self.probctx_) wye = Fair(wye, self.probcty_) if self.fun == "Huber": wte = Huber(wtn, self.probctx_) wye = Huber(wye, self.probcty_) if self.fun == "Hampel": self.probctx_ = chi2.ppf(self.probp1, self.n_components) self.hampelbx_ = chi2.ppf(self.probp2, self.n_components) self.hampelrx_ = chi2.ppf(self.probp3, self.n_components) wte = Hampel(wtn, self.probctx_, self.hampelbx_, self.hampelrx_) wye = Hampel(wye, self.probcty_, self.hampelby_, self.hampelry_) b2sum = np.sum(np.power(b, 2)) difference = abs(b2sum - rold) / rold rold = b2sum wte = np.array(wte).reshape(-1) we = (wye * wte).astype("float64") w0 = [] if any(we < 1e-06): w0 = np.where(we < 1e-06)[0] we[w0] = 1e-06 we = np.array(we, dtype=np.float64) if len(w0) >= (n / 2): break WEmat = np.array([np.sqrt(we) for i in range(1, p + 1)], ndmin=1).T Xw = np.multiply(Xs, WEmat).astype("float64") yw = ys * np.sqrt(we) loops += 1 if difference > self.maxit: print( "Warning: Method did not converge. The scaled difference between norms of the coefficient vectors is " + str(round(difference, 4)) ) plotprec = False if plotprec: print(str(loops - 1)) w = we w[w0] = 0 wt = wte wt[w0] = 0 wy = wye wy[w0] = 0 P = res_snipls.x_loadings_ W = res_snipls.x_weights_ R = res_snipls.x_Rweights_ Xrw = np.array(np.multiply(Xs, np.sqrt(WEmat)).astype("float64")) scaling.set_params(scale="None") Xrw = scaling.fit_transform(Xrw) T = np.matmul(Xs, R) if self.verbose: print( "Final Model: Variables retained for " + str(self.n_components) + " latent variables: \n" + str(res_snipls.colret_) + "\n" ) b_rescaled = np.multiply(np.reshape(sy / sX, (p, 1)), b) yp_rescaled = np.matmul(X, b_rescaled) if self.centre == "mean": intercept = np.mean(y - yp_rescaled, axis=0) else: intercept = np.median(y - yp_rescaled, axis=0) # This median calculation produces slightly different result in R and Py yfit = yp_rescaled + intercept if self.scale != "None": if self.centre == "mean": b0 = np.mean(ys.astype("float64") - np.matmul(Xs.astype("float64"), b)) else: b0 = np.median( np.array(ys.astype("float64") - np.matmul(Xs.astype("float64"), b)) ) # yfit2 = (np.matmul(Xrc.Xs.astype("float64"),b) + b0)*yrc.col_sca + yrc.col_loc # already more generally included else: if self.centre == "mean": b0 = np.mean(y - np.matmul(X, b)) else: b0 = np.median(np.array(y - np.matmul(X, b))) # yfit = np.matmul(X,b) + intercept yfit = yfit r = y - yfit setattr(self, "x_weights_", W) setattr(self, "x_loadings_", P) setattr(self, "C_", res_snipls.C_) setattr(self, "x_scores_", T) setattr(self, "coef_", b_rescaled) setattr(self, "intercept_", intercept) setattr(self, "coef_scaled_", b) setattr(self, "intercept_scaled_", b0) setattr(self, "residuals_", r) setattr(self, "x_ev_", res_snipls.x_ev_) setattr(self, "y_ev_", res_snipls.y_ev_) setattr(self, "fitted_", yfit) setattr(self, "x_Rweights_", R) setattr(self, "x_caseweights_", wte) setattr(self, "y_caseweights_", wye) setattr(self, "caseweights_", we) setattr(self, "colret_", res_snipls.colret_) setattr(self, "scaling_", scaling) setattr(self, "scalingt_", scalingt) return self pass def predict(self, Xn): """ Predict using a SPRM model. Parameters ------------ Xn : numpy array or data frame Input data. """ n, p, Xn = _predict_check_input(Xn) if p != self.X.shape[1]: raise ( ValueError( "New data must have seame number of columns as the ones the model has been trained with" ) ) Xn = Xn[:, self.non_zero_scale_vars_] return np.matmul(Xn, self.coef_) + self.intercept_ def transform(self, Xn): """ Transform input data. Parameters ------------ Xn : numpy array or data frame Input data. """ n, p, Xn = _predict_check_input(Xn) if p != self.X.shape[1]: raise ( ValueError( "New data must have seame number of columns as the ones the model has been trained with" ) ) Xn = Xn[:, self.non_zero_scale_vars_] Xnc = scale_data( Xn, self.x_loc_[self.non_zero_scale_vars_], self.x_sca_[self.non_zero_scale_vars_], ) return np.matmul(Xnc, self.x_Rweights_) def weightnewx(self, Xn): """ Calculate case weights for new data based on the projection in the SPRM score space """ n, p, Xn = _predict_check_input(Xn) (n, p) = Xn.shape if p != self.X.shape[1]: raise ( ValueError( "New data must have seame number of columns as the ones the model has been trained with" ) ) Tn = self.transform(Xn) scaling = self.scalingt_ scalet = self.scale if scalet == "None": scaling.set_params(scale="mad") if isinstance(Tn, np.matrix): Tn = np.array(Tn) dtn = scaling.fit_transform(Tn) wtn = np.sqrt(np.array(np.sum(np.square(dtn), 1), dtype=np.float64)) wtn = wtn / np.median(wtn) wtn = wtn.reshape(-1) if self.fun == "Fair": wtn = Fair(wtn, self.probctx_) if self.fun == "Huber": wtn = Huber(wtn, self.probctx_) if self.fun == "Hampel": wtn = Hampel(wtn, self.probctx_, self.hampelbx_, self.hampelrx_) return wtn def valscore(self, Xn, yn, scoring): """ Specific score function for validation data """ n, p, Xn = _predict_check_input(Xn) (n, p) = Xn.shape if p != self.X.shape[1]: raise ( ValueError( "New data must have seame number of columns as the ones the model has been trained with" ) ) if scoring == "weighted": return RegressorMixin.score(self, Xn, yn, sample_weight=self.caseweights_) elif scoring == "normal": return RegressorMixin.score(self, Xn, yn) else: raise (ValueError('Scoring flag must be set to "weighted" or "normal".'))