# -*- coding: utf-8 -*-
import numpy as np
from scipy.linalg import inv, sqrtm
import copy
from sklearn.utils.metaestimators import _BaseComposition
from sklearn.base import RegressorMixin, BaseEstimator, TransformerMixin, defaultdict
from ..preprocessing.robcent import VersatileScaler
from ._sudire_utils import *
from scipy.linalg import orth
import warnings
import statsmodels.robust.scale as srs
from scipy.stats import trim_mean
import dcor as dc
# import Ball
import statsmodels.api as sm
import inspect
from ..ipopt_temp.ipopt_wrapper import minimize_ipopt
from ..ipopt_temp.jacobian import (
FunctionWithApproxJacobianCentral,
FunctionWithApproxJacobian,
)
from ..dicomo._dicomo_utils import *
from ..utils.utils import *
[docs]class sudire(_BaseComposition, BaseEstimator, TransformerMixin, RegressorMixin):
"""SUDIRE Sufficient Dimension Reduction
The class allows for Sufficient Dimension Reduction using a variety of
methods. If the method requires optimization of a function,
This optimization is done through the Interior Point Optimizer (IPOPT)
algorithm.
Parameters
----------
sudiremeth: function or class. sudiremeth in this package can also be used,
but user defined functions can be processed. Built in options are :
save : Sliced Average Variance Estimation
sir : Slices Inverse Regression
dr : Directional Regression
iht : Iterative Hessian Transformations
dcov-sdr : SDR via Distance Covariance
mdd-sdr : SDR via Martingale Difference Divergence.
bcov-sdr : SDR via ball covariance
n_components : int
dimension of the central subspace.
trimming : float
trimming percentage to be entered as pct/100
optimizer_options : dict
with options to pass on to the optimizer.Includes:
max_iter : int
Maximal number of iterations.
tol: float
relative convergence tolerance
constr_viol_tol : float
Desired threshold for the constraint violation.
optimizer_constraints : dict or list of dicts
further constraints to be passed on to the
optimizer function.
optimizer_arguments: dict
extra arguments to be passed to the sudiremeth
function during optimization.
optimizer_start : numpy array
starting value for the optimization.
center : str
how to center the data. options accepted are options from sprm.preprocessing
center_data : bool
If True, the data will be centered before the dimension reduction
scale_data : bool
if set to False, convergence to correct optimum is not a given.
Will throw a warning.
compression : bool
Use internal data compresion step for flat data.
n_slices : int
The number of slices for SAVE, SIR, DR
is_distance_mat : bool
if the inputed matrices for x and y are distance matrices.
dmetric : str
distance metric used internally. Defaults to 'euclidean'
fit_ols : bool
if True, an OLS model is fitted after the dimension reduction.
copy : bool
Whether to make a deep copy of the input data or not.
verbose : bool
Set to True prints the iteration number.
return_scaling_object: bool.
If True, the scaling object will be return after the
dimension reduction.
Attributes
----------
Attributes always provided
- `x_loadings_`: Estimated basis of the central subspace
- `x_scores_`: The projected X data.
- `x_loc_`: location estimate for X
- `x_sca_`: scale estimate for X
- ` ols_obj` : fitted OLS objected
- `y_loc_`: y location estimate
- `y_sca_`: y scale estimate
Attributes created only when corresponding input flags are `True`:
- `whitening_`: whitened data matrix (usually denoted K)
- `scaling_object_`: scaling object from `VersatileScaler`
"""
[docs] def __init__(
self,
sudiremeth="dcov-sdr",
n_components=2,
trimming=0,
optimizer_options={"max_iter": 1000},
optimizer_constraints=None,
optimizer_arguments=None,
optimizer_start=None,
center_data=True,
center="mean",
scale_data=True,
whiten_data=False,
compression=False,
n_slices=6,
dmetric="euclidean",
fit_ols=True,
copy=True,
response_type="continuous",
verbose=True,
return_scaling_object=True,
):
# Called arguments
self.sudiremeth = sudiremeth
self.n_components = n_components
self.trimming = trimming
self.optimizer_options = optimizer_options
self.optimizer_constraints = optimizer_constraints
self.optimizer_arguments = optimizer_arguments
self.optimizer_start = optimizer_start
self.center = center
self.center_data = center_data
self.scale_data = scale_data
self.whiten_data = whiten_data
self.compression = compression
self.n_slices = n_slices
self.dmetric = dmetric
self.fit_ols = fit_ols
self.copy = copy
self.response_type = response_type
self.verbose = verbose
self.return_scaling_object = return_scaling_object
# Other global parameters
self.licenter = ["mean", "median"]
if not (self.center in self.licenter):
raise (
ValueError(
'Only location estimator classes allowed are: "mean", "median"'
)
)
self.limeths = [
"sir",
"save",
"dr",
"dcov-sdr",
"bcov-sdr",
"mdd-sdr",
"phd",
"iht",
]
if not (self.sudiremeth in self.limeths) and not callable(self.sudiremeth):
raise (
ValueError(
'Only SDR methods allowed are : "sir", "save", "dr", "dcov-sdr","bcov-sdr", "mdd-sdr", "iht","phd"'
)
)
def fit(self, X, y, *args, **kwargs):
"""
Fit a Sufficient Dimension Reduction Model.
Parameters
----------
X : matrix or data frame
Input data of predictors
y : vector or 1D matrix
Response data
args or kwargs :
Further parameters to user defined sudiremeth can be passed here
Returns
-------
self
-------
"""
# Collect optional fit arguments
if "dmetric" not in kwargs:
dmetric = "euclidean"
else:
dmetric = kwargs.get("dmetric")
if "biascorr" not in kwargs:
biascorr = False
else:
biascorr = kwargs.get("biascorr")
if "flag" not in kwargs:
flag = "two-block"
else:
flag = kwargs.get("flag")
if "is_distance_mat" not in kwargs:
is_distance_mat = False
else:
is_distance_mat = kwargs.pop("is_distance_mat")
# Initiate some parameters and data frames
if self.copy:
X0 = copy.deepcopy(X)
self.X0 = X0
else:
X0 = X
X = convert_X_input(X0)
n, p = X0.shape
trimming = self.trimming
# Check dimensions
if self.n_components > min(n, p):
raise (
MyException(
"number of components cannot exceed number of variables or sample size"
)
)
# Pre-processing adjustment if whitening
if self.whiten_data:
self.center_data = True
self.scale_data = False
self.compression = False
print("All results produced are for whitened data")
# Store original X data mean and varcov matrix.
varmatx = np.cov(X, rowvar=0)
meanx = X.mean(axis=0)
N2 = inv(sqrtm(varmatx))
# Data Compression for flat tables if required
if (p > n) and self.compression:
V, S, U = np.linalg.svd(X.T, full_matrices=False)
X = np.matmul(U.T, np.diag(S))
n, p = X.shape
if (srs.mad(X) == 0).any():
warnings.warn(
"Due to low scales in data, compression would induce zero scales."
+ "\n"
+ "Proceeding without compression."
)
dimensions = False
if copy:
X = copy.deepcopy(X0)
else:
X = X0
else:
dimensions = True
else:
dimensions = False
# Centring and scaling
# centering :
if self.center_data:
if self.center != "mean":
centring = VersatileScaler(
center=self.center, scale="None", trimming=self.trimming
)
Xs = centring.fit_transform(X0)
mX = centring.col_loc_
sX = centring.col_sca_
else:
Xs = X - trim_mean(X, self.trimming, axis=0)
mX = trim_mean(X, self.trimming, axis=0)
sX = np.sqrt(np.diag(varmatx))
else:
Xs = X0
mX = np.zeros((1, p))
sX = np.ones((1, p))
if self.scale_data:
if self.center == "mean":
scale = "std"
N2 = inv(sqrtm(varmatx))
Xs = np.matmul(Xs, N2)
elif (self.center == "median") or (self.center == "l1median"):
scale = "mad"
centring = VersatileScaler(
center=self.center, scale=scale, trimming=trimming
)
Xs = centring.fit_transform(X0)
mX = centring.col_loc_
sX = centring.col_sca_
else:
raise (
MyException(
'centering options have to be either "mean", "median", or "l1median"'
)
)
else:
scale = "None"
warnings.warn("Without scaling, convergence to optima is not given")
# Initiate centring object and scale X data
if self.whiten_data:
V, S, U = np.linalg.svd(Xs.T, full_matrices=False)
del U
K = (V / S)[:, :p]
del S
Xs = np.matmul(Xs, K)
Xs *= np.sqrt(p)
# Pre-process y data
ny = y.shape[0]
y = convert_y_input(y)
if len(y.shape) < 2:
y = np.array(y).reshape((ny, 1))
if ny != n:
raise (MyException("X and y number of rows must agree"))
# Pre-process y data when available
if flag != "one-block":
ny = y.shape[0]
y = convert_y_input(y)
if len(y.shape) < 2:
y = np.array(y).reshape((ny, 1))
if ny != n:
raise (MyException("X and y number of rows must agree"))
if self.copy:
y0 = copy.deepcopy(y)
self.y0 = y0
if self.center_data:
ys = y # dont center y
my = np.mean(y, axis=0)
sy = np.sqrt(np.var(y, axis=0))
else:
ys = y
my = 0
sy = 1
ys = ys.astype("float64")
else:
ys = None
if self.sudiremeth == "sir":
P = SIR(
Xs,
ys,
self.n_slices,
self.n_components,
self.response_type,
self.center_data,
self.scale_data,
)
if self.scale_data:
P = np.matmul(N2, P)
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
elif self.sudiremeth == "save":
P = SAVE(
Xs,
ys,
self.n_slices,
self.n_components,
self.response_type,
self.center_data,
self.scale_data,
)
if self.scale_data:
P = np.matmul(N2, P)
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
elif self.sudiremeth == "dr":
P = DR(
Xs,
ys,
self.n_slices,
self.n_components,
self.response_type,
self.center_data,
self.scale_data,
)
if self.scale_data:
P = np.matmul(N2, P)
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
elif self.sudiremeth == "phd":
P = PHD(Xs, ys, self.n_components, self.center_data, self.scale_data)
if self.scale_data:
P = np.matmul(N2, P)
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
elif self.sudiremeth == "iht":
P = IHT(Xs, ys, self.n_components, self.center_data, self.scale_data)
if self.scale_data:
P = np.matmul(N2, P)
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
else: # SDR obtained through optimization of some function
## choose starting value for DCOV-SDR
if self.optimizer_start is None:
save_start = SAVE(
Xs, ys, 3, self.n_components, self.center_data, self.scale_data
)
if self.scale_data:
save_start = np.matmul(N2, save_start)
sir_start = SIR(
Xs, ys, 6, self.n_components, self.center_data, self.scale_data
)
if self.scale_data:
sir_start = np.matmul(N2, sir_start)
beta_save = orth(save_start)
dc_save = dc.distance_covariance_sqr(np.matmul(Xs, beta_save), y)
beta_sir = orth(sir_start)
dc_sir = dc.distance_covariance_sqr(np.matmul(Xs, beta_sir), y)
DR_start = DR(
Xs, y, 6, self.n_components, self.center_data, self.scale_data
)
if self.scale_data:
DR_start = np.matmul(N2, DR_start)
beta_DR = orth(DR_start)
dc_DR = dc.distance_covariance_sqr(np.matmul(Xs, beta_DR), y)
if dc_save >= dc_sir and dc_save >= dc_DR:
self.optimizer_start = save_start.flatten(order="F")
elif dc_sir >= dc_save and dc_sir >= dc_DR:
self.optimizer_start = sir_start.flatten(order="F")
else:
self.optimizer_start = DR_start.flatten(order="F")
## add constraints for the optimization
if self.optimizer_constraints is None:
const_x = []
const_z = []
for i in range(0, self.n_components):
for j in range(0, self.n_components):
const_x.append(
{
"type": "eq",
"fun": const_xscale,
"args": (Xs, self.n_components, i, j),
}
) # change const2 to const_func
const_z.append(
{
"type": "eq",
"fun": const_zscale,
"args": (Xs, self.n_components, i, j),
}
)
if self.scale_data:
self.optimizer_constraints = tuple(const_z)
else:
self.optimizer_constraints = tuple(const_x)
if self.optimizer_arguments is None:
N2 = inv(sqrtm(varmatx))
self.optimizer_arguments = (
Xs,
ys,
self.n_components,
N2,
is_distance_mat,
self.trimming,
self.center,
dmetric,
biascorr,
)
# perform DCOV-SDR optimization
res = minimize_ipopt(
dcov_trim,
self.optimizer_start,
args=self.optimizer_arguments,
constraints=self.optimizer_constraints,
options=self.optimizer_options,
)
if self.scale_data:
dcov_res = np.matmul(
N2, np.reshape(res.x, (-1, self.n_components), order="F")
)
else:
dcov_res = np.reshape(res.x, (-1, self.n_components), order="F")
if self.sudiremeth == "dcov-sdr":
P = dcov_res
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
elif self.sudiremeth == "bcov-sdr":
self.optimizer_start = dcov_res.flatten(order="F")
res_ball = minimize_ipopt(
ballcov_func,
self.optimizer_start,
args=self.optimizer_arguments,
constraints=self.optimizer_constraints,
options=self.optimizer_options,
)
if self.scale_data:
P = np.matmul(
N2, np.reshape(res_ball.x, (-1, self.n_components), order="F")
)
else:
P = np.reshape(res_ball.x, (-1, self.n_components), order="F")
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
elif self.sudiremeth == "mdd-sdr":
self.optimizer_start = dcov_res.flatten(order="F")
res_mdd = minimize_ipopt(
mdd_trim,
self.optimizer_start,
args=self.optimizer_arguments,
constraints=self.optimizer_constraints,
options=self.optimizer_options,
)
if self.scale_data:
P = np.matmul(
N2, np.reshape(res_mdd.x, (-1, self.n_components), order="F")
)
else:
P = np.reshape(res_mdd.x, (-1, self.n_components), order="F")
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
else: ## user defined function
self.optimizer_start = dcov_res.flatten(order="F")
opt_res = minimize_ipopt(
self.sudiremeth,
self.optimizer_start,
args=self.optimizer_arguments,
constraints=self.optimizer_constraints,
options=self.optimizer_options,
)
if self.scale_data:
P = np.matmul(
N2, np.reshape(opt_res.x, (-1, self.n_components), order="F")
)
else:
P = np.reshape(opt_res.x, (-1, self.n_components), order="F")
projMat = np.matmul(np.matmul(P, inv(np.matmul(P.T, P))), P.T)
T = np.matmul(self.X0, P)
# perform OLS regression
T_reg = sm.add_constant(T) # adding a constant
ols_obj = sm.OLS(ys, T_reg).fit()
# Re-adjust estimates to original dimensions if data have been compressed
if dimensions:
P = np.matmul(V[:, 0:p], P)
setattr(self, "x_loadings_", P)
setattr(self, "x_scores_", T)
setattr(self, "proj_mat_", projMat)
if self.whiten_data:
setattr(self, "whitening_", K)
setattr(self, "x_loc_", mX)
setattr(self, "x_sca_", sX)
setattr(self, "scaling_", scale)
setattr(self, "ols_obj_", ols_obj)
if self.return_scaling_object and self.center != "mean":
setattr(self, "scaling_object_", centring)
return self
def transform(self, Xn, distance_mat=False):
"""
Computes the dimension reduction of the data Xn based on the fitted sudire model.
Parameters
----------
Xn : matrix or data frame
Input data to be transformed
distance_mat : numpy array
distance matrix to represent similarity between observations.
args or kwargs:
Further parameters to user defined sufdiremeth can be passed here
Returns
-------
transformed_data : numpy array
the dimension reduced data
-------
"""
Xn = convert_X_input(Xn)
(n, p) = Xn.shape
(q, h) = self.x_loadings_.shape
if p != q:
raise (
ValueError(
"New data must have same number of columns as the ones the model has been trained with"
)
)
return np.matmul(Xn, self.x_loadings_)
def predict(self, Xn, is_distance_mat=False):
"""
predicts the response on new data Xn
Parameters
----------
Xn : matrix or data frame
Input data to be transformed
is_distance_mat : bool
if True, Xn is treated as a distance matrix
Returns
-------
predictions : numpy array
The predictions from the dimension reduction model
-------
"""
Xn = convert_X_input(Xn)
(n, p) = Xn.shape
(q, h) = self.x_loadings_.shape
if p != q:
raise (
ValueError(
"New data must have same number of columns as the ones the model has been trained with"
)
)
Xns = self.transform(Xn)
Xns = sm.add_constant(Xns) # adding a constant
ys = self.ols_obj_.predict(Xns)
return ys
@classmethod
def _get_param_names(cls):
"""Get parameter names for the estimator"""
# fetch the constructor or the original constructor before
# deprecation wrapping if any
init = getattr(cls.__init__, "deprecated_original", cls.__init__)
if init is object.__init__:
# No explicit constructor to introspect
return []
# introspect the constructor arguments to find the model parameters
# to represent
init_signature = inspect.signature(init)
# Consider the constructor parameters excluding 'self'
parameters = [
p
for p in init_signature.parameters.values()
if p.name != "self" and p.kind != p.VAR_KEYWORD
]
for p in parameters:
if p.kind == p.VAR_POSITIONAL:
raise RuntimeError(
"scikit-learn estimators should always "
"specify their parameters in the signature"
" of their __init__ (no varargs)."
" %s with constructor %s doesn't "
" follow this convention." % (cls, init_signature)
)
# Extract and sort argument names excluding 'self'
return sorted([p.name for p in parameters])
def get_params(self, deep=False):
"""Get parameters for this estimator.
Parameters
----------
deep : boolean, optional
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
------
Copied from ScikitLlearn instead of imported to avoid 'deep=True'
"""
out = dict()
for key in self._get_param_names():
value = getattr(self, key, None)
if deep and hasattr(value, "get_params"):
deep_items = value.get_params().items()
out.update((key + "__" + k, val) for k, val in deep_items)
out[key] = value
return out
def set_params(self, **params):
"""Set the parameters of this estimator.
Copied from ScikitLearn, adapted to avoid calling 'deep=True'
Returns
-------
self
------
Copied from ScikitLlearn instead of imported to avoid 'deep=True'
"""
if not params:
# Simple optimization to gain speed (inspect is slow)
return self
valid_params = self.get_params()
nested_params = defaultdict(dict) # grouped by prefix
for key, value in params.items():
key, delim, sub_key = key.partition("__")
if key not in valid_params:
raise ValueError(
"Invalid parameter %s for estimator %s. "
"Check the list of available parameters "
"with `estimator.get_params().keys()`." % (key, self)
)
if delim:
nested_params[key][sub_key] = value
else:
setattr(self, key, value)
valid_params[key] = value
for key, sub_params in nested_params.items():
valid_params[key].set_params(**sub_params)
return self
def estimate_structural_dim(sudiremeth, Xn, y, B, *args, **kwargs):
"""
Estimates the dimension of the central subspace using
the sudiremeth. This approach is based on the bootstrap method of Sheng and Yin (2016)
Parameters
----------
sudiremeth : str
the SDR method to use in the estimation.
X : numpy array or dataframe
Input X data
Y : vector or 1d matrix
Input Y data as
B : int
Number of bootstrap replications
kwargs:
n_slices: number of slices for SIR/SAVE/DR
center_data, bool : if the data should be centered
scale_data, bool : if the data should be scaled
center, string : which centering('mean', 'median')
Returns
----------
h : int
representing the dimension of the central subspace
----------
"""
if "n_slices" not in kwargs:
n_slices = 6
else:
n_slices = kwargs.pop("n_slices")
if "center_data" not in kwargs:
center_data = True
else:
center_data = kwargs.pop("center_data")
if "scale_data" not in kwargs:
scale_data = True
else:
scale_data = kwargs.pop("scale_data")
if "center" not in kwargs:
center = "mean"
else:
center = kwargs.pop("center")
Xn = convert_X_input(Xn)
y = convert_y_input(y)
n, p = Xn.shape
diff_b = []
mean_diff = []
for k in range(1, p + 1):
print("possible dim", k)
sdr = sudire(
sudiremeth,
center_data=center_data,
scale_data=scale_data,
center=center,
n_slices=n_slices,
n_components=k,
)
sdr.fit(Xn, y=y)
projMat = sdr.proj_mat_
for b in range(B):
idx = np.random.randint(0, n, n)
X_b = Xn[idx, :].copy()
sdr_b = sudire(
sudiremeth,
center_data=center_data,
scale_data=scale_data,
center=center,
n_slices=n_slices,
n_components=k,
)
sdr_b.fit(X_b, y=y)
projMat_b = sdr_b.proj_mat_
uh, sh, vh = np.linalg.svd(projMat - projMat_b)
diff_b.append(np.nanmax(sh))
mean_diff.append(np.mean(diff_b))
return (np.argmin(mean_diff) + 1, mean_diff)