744 lines
24 KiB
Python
744 lines
24 KiB
Python
"""This module implements some validation schemes l-pls.
|
|
|
|
The primary use is crossvalidation.
|
|
"""
|
|
__all__ = ['pca_val', 'pls_val', 'lpls_val', 'pls_jk', 'lpls_jk']
|
|
__docformat__ = "restructuredtext en"
|
|
|
|
from numpy import dot,empty,zeros,sqrt,atleast_2d,argmax,asarray,median,\
|
|
array_split,arange, isnan, any,newaxis,eye,diag,tile,ones
|
|
from numpy.random import shuffle
|
|
|
|
from engines import pls, pca, center
|
|
from engines import nipals_lpls as lpls
|
|
|
|
def pca_val(a, a_max, nsets=None, center_axis=[0], method='cv'):
|
|
"""Returns error estimate of crossvalidated PCA.
|
|
|
|
*Parameters*:
|
|
|
|
a : {array}
|
|
data matrix (n x m)
|
|
a_max : {integer}
|
|
Maximum number of components in model.
|
|
center_axis:
|
|
Centering
|
|
nsets : {integer}
|
|
number of segments
|
|
method : {['cv', 'diag']}
|
|
|
|
*Returns*:
|
|
|
|
rmsep : {array}
|
|
Squared error of prediction for each component and xvar (a_max, m)
|
|
xhat : {array}
|
|
Crossvalidated predicted a (a_max, m, n)
|
|
aopt : {integer} Estimate of optimal number of components
|
|
|
|
*Notes*:
|
|
|
|
- Crossvalidation of PCA is somewhat artificial as we do not have
|
|
any external information to predict. There are essentially two approaches
|
|
to this, one is to use a projection error, the other is to leave out
|
|
elements in the data matrix then record a missing-value estimator error.
|
|
|
|
- Crossvalidation method is currently set to random blocks of diagonals.
|
|
|
|
|
|
"""
|
|
n, m = a.shape
|
|
if nsets == None:
|
|
nsets = n
|
|
err = zeros((a_max + 1, n, m), dtype=a.dtype)
|
|
xhat = zeros((a_max + 1, n, m), dtype=a.dtype)
|
|
if center_axis[0] == 1:
|
|
a = a - a.mean(1)[:,newaxis]
|
|
center_axis = [-1]
|
|
elif center_axis[0] == 2:
|
|
a = a - a.mean(1)[:,newaxis]
|
|
center_axis = [0]
|
|
if method == 'diag':
|
|
mn_a = dot(ones((n,1)), a.mean(0)[newaxis])
|
|
for i, val in enumerate(diag_cv(a.shape, nsets)):
|
|
true_values = a.take(val)
|
|
new_values = mn_a.take(val)
|
|
# impute with mean values
|
|
a.put(val, new_values)
|
|
dat = pca(a, a_max, mode='fast', center_axis=center_axis)
|
|
Ti, Pi, mnx = dat['T'], dat['P'], dat['mnx']
|
|
# expand mean values
|
|
if center_axis[0] == 1:
|
|
mnx = tile(mnx, (1, m))
|
|
else:
|
|
mnx = tile(mnx, (n, 1))
|
|
err[0,:,:].put(val, (true_values - mnx.take(val))**2)
|
|
|
|
for j in xrange(a_max):
|
|
# estimate the imputed values
|
|
a_pred = (mnx + dot(Ti[:,:j+1], Pi[:,:j+1].T)).take(val)
|
|
err[j+1,:,:].put(val, (true_values - a_pred)**2)
|
|
xhat[j,:,:].put(val, a_pred)
|
|
# put original values back
|
|
a.put(val, true_values)
|
|
|
|
elif method == 'cv':
|
|
for i, (cal, val) in enumerate(cv(n, nsets)):
|
|
xcal = a[cal, :]
|
|
dat = pca(xcal, a_max, mode='fast', scale='scores', center_axis=center_axis)
|
|
P, mnx = dat['P'], dat['mnx']
|
|
xval = atleast_2d(a[val,:]) - mnx
|
|
err[0,val,:] = xval**2 #pc0
|
|
e = eye(m)
|
|
rmat = zeros((m, m))
|
|
for j, p in enumerate(P.T):
|
|
#d2 = diag(e) - (p**2).ravel()
|
|
p = p[:,newaxis]
|
|
e = e - dot(p, p.T)
|
|
d = diag(e)
|
|
es = e/atleast_2d(d)
|
|
xhat[j+1,val,:] = dot(xval, es)
|
|
err[j+1,val,:] = (dot(xval, es)**2)
|
|
|
|
rmsep = sqrt(err).mean(1) # take mean over samples
|
|
aopt = rmsep.mean(-1).argmin()
|
|
|
|
return rmsep, xhat, aopt, err
|
|
|
|
def pls_val(X, Y, a_max=2, nsets=None, center_axis=[0,0], verbose=False):
|
|
"""Performs crossvalidation for generalisation error in pls.
|
|
|
|
*Parameters*:
|
|
|
|
X : {array}
|
|
Main data matrix (m, n)
|
|
Y : {array}
|
|
External row data (m, l)
|
|
a_max : {integer}, optional
|
|
Maximum number of components to calculate (0, min(m,n))
|
|
nsets : (integer), optional
|
|
Number of crossvalidation sets
|
|
center_axis : {array-like}, optional
|
|
A three element array-like structure with elements in [-1,0,1,2],
|
|
that decides the type of centering used.
|
|
-1 : nothing
|
|
0 : row center
|
|
1 : column center
|
|
2 : double center
|
|
verbose : {boolean}, optional
|
|
Verbosity of console output. For use in debugging.
|
|
|
|
*Returns*:
|
|
|
|
rmsep : {array}
|
|
Root mean squred error of prediction (for each y-var)
|
|
yhat : {array}
|
|
Estimated responses
|
|
aopt : {integer}
|
|
Estimated value of optimal number of components
|
|
|
|
"""
|
|
dt = X.dtype
|
|
m, n = X.shape
|
|
k, l = Y.shape
|
|
assert m == k, "X (%d,%d) - Y (%d,%d) dim mismatch" %(m, n, k, l)
|
|
if nsets == None:
|
|
nsets = m
|
|
if nsets > X.shape[0]:
|
|
print "nsets (%d) is larger than number of variables (%d).\nnsets: %d -> %d" %(nsets, m, nsets, m)
|
|
nsets = m
|
|
if n > 15*m:
|
|
# boosting (wide x)
|
|
Yhat = _w_pls_predict(X, Y, a_max)
|
|
|
|
Yhat = empty((a_max, k, l), dtype=dt)
|
|
for cal, val in cv(k, nsets):
|
|
# do the training model
|
|
dat = pls(X[cal], Y[cal], aopt=a_max,center_axis=center_axis)
|
|
|
|
# center test data
|
|
xi = X[val,:] - dat['mnx']
|
|
ym = dat['mny']
|
|
|
|
# predictions
|
|
for a in range(a_max):
|
|
Yhat[a,val] = ym + dot(xi, dat['B'][a])
|
|
|
|
sep = (Y - Yhat)**2
|
|
rmsep = sqrt(sep.mean(1)).T
|
|
#aopt = find_aopt_from_sep(rmsep)
|
|
|
|
# todo: need a better support for classification error
|
|
error = prediction_error(Yhat, Y, method='squared')
|
|
|
|
return rmsep, Yhat, error
|
|
|
|
def lpls_val(X, Y, Z, a_max=2, nsets=None,alpha=.5, center_axis=[2,0,2], zorth=False, verbose=False):
|
|
"""Performs crossvalidation for generalisation error in lpls.
|
|
|
|
The L-PLS crossvalidation is estimated just like an ordinary pls
|
|
crossvalidation. That is, the generalisation error is estimated by
|
|
predicting samples (rows of X and Y) left out according to a cross
|
|
validation scheme.
|
|
|
|
*Parameters*:
|
|
|
|
X : {array}
|
|
Main data matrix (m, n)
|
|
Y : {array}
|
|
External row data (m, l)
|
|
Z : {array}
|
|
External column data (n, o)
|
|
a_max : {integer}, optional
|
|
Maximum number of components to calculate (0, min(m,n))
|
|
nsets : (integer), optional
|
|
Number of crossvalidation sets
|
|
alpha : {float}, optional
|
|
Parameter to control the amount of influence from Z-matrix.
|
|
0 is none, which returns a pls-solution, 1 is max
|
|
center_axis : {array-like}, optional
|
|
A three element array-like structure with elements in [-1,0,1,2],
|
|
that decides the type of centering used.
|
|
-1 : nothing
|
|
0 : row center
|
|
1 : column center
|
|
2 : double center
|
|
zorth : {boolean}
|
|
If true, Require orthogonal latent components in Z.
|
|
verbose : {boolean}, optional
|
|
Verbosity of console output. For use in debugging.
|
|
|
|
*Returns*:
|
|
|
|
rmsep : {array}
|
|
Root mean squred error of prediction
|
|
yhat : {array}
|
|
Estimated responses
|
|
aopt : {integer}
|
|
Estimated value of optimal number of components
|
|
|
|
"""
|
|
|
|
m, n = X.shape
|
|
k, l = Y.shape
|
|
o, p = Z.shape
|
|
assert m == k, "X (%d,%d) - Y (%d,%d) dim mismatch" %(m, n, k, l)
|
|
assert n == p, "X (%d,%d) - Z (%d,%d) dim mismatch" %(m, n, o, p)
|
|
if nsets == None:
|
|
nsets = m
|
|
if nsets > X.shape[0]:
|
|
print "nsets (%d) is larger than number of variables (%d).\nnsets: %d -> %d" %(nsets, m, nsets, m)
|
|
nsets = m
|
|
assert (alpha >= 0 and alpha<=1), "Alpha needs to be within [0,1], got: %.2f" %alpha
|
|
|
|
Yhat = empty((a_max, k, l), 'd')
|
|
for cal, val in cv(k, nsets):
|
|
# do the training model
|
|
dat = lpls(X[cal], Y[cal], Z, a_max=a_max, alpha=alpha,
|
|
center_axis=center_axis, zorth=zorth, verbose=verbose)
|
|
|
|
# center test data
|
|
if center_axis[0] == 2:
|
|
xi = X[val,:] - dat['mnx'] - X[val,:].mean(1)[:,newaxis] + dat['mnx'].mean()
|
|
else:
|
|
xi = X[val,:] - dat['mnx']
|
|
if center_axis[1] == 2:
|
|
ym = dat['mny'] + Y[val,:].mean(1)[:,newaxis] - dat['mny'].mean()
|
|
else:
|
|
ym = dat['mny']
|
|
# predictions
|
|
for a in range(a_max):
|
|
Yhat[a,val,:] = atleast_2d(ym + dot(xi, dat['B'][a]))
|
|
# todo: need a better support for classification error
|
|
y_is_class = Y.dtype.char.lower() in ['i','p', 'b', 'h','?']
|
|
if y_is_class:
|
|
pass
|
|
#Yhat, err = class_error(Yhat, Y)
|
|
#return Yhat, err
|
|
sep = (Y - Yhat)**2
|
|
rmsep = sqrt(sep.mean(1)).T
|
|
#aopt = find_aopt_from_sep(rmsep)
|
|
|
|
# todo: need a better support for classification error
|
|
error = prediction_error(Yhat, Y, method='1/2')
|
|
|
|
return rmsep, Yhat, error
|
|
|
|
def pca_jk(a, aopt, nsets=None, center_axis=[0], method='cv'):
|
|
"""Returns jack-knife segments from PCA.
|
|
|
|
*Parameters*:
|
|
|
|
a : {array}
|
|
data matrix (n x m)
|
|
aopt : {integer}
|
|
number of components in model.
|
|
center_axis:
|
|
Centering
|
|
nsets : {integer}
|
|
number of segments
|
|
method : {'cv', 'diag', 'bs', 'bs_diag'}
|
|
Perturbation method is one of:
|
|
cv = leave samples out
|
|
diag = impute diagonals
|
|
bs = leave samples out with replacement (bootstrap)
|
|
bs_diag = impute diagonals
|
|
|
|
*Returns*:
|
|
|
|
Pcv : {array}
|
|
Loadings collected in a three way matrix (n_segments, m, aopt)
|
|
|
|
*Notes*:
|
|
|
|
Nope
|
|
"""
|
|
m, n = a.shape
|
|
if nsets == None:
|
|
nsets = m
|
|
|
|
Pcv = empty((nsets, a.shape[1], aopt), dtype=a.dtype)
|
|
if method == 'diag':
|
|
mn_a = .5*(a.mean(0) + a.mean(1)[:,newaxis])
|
|
for i, val in enumerate(diag_cv(a.shape, nsets)):
|
|
old_values = a.take(val)
|
|
new_values = mn_a.take(val)
|
|
# impute mean values
|
|
a.put(val, new_values)
|
|
dat = pca(a, aopt, mode='fast', scale='loads', center_axis=center_axis)
|
|
Pcv[i,:,:] = dat['P']
|
|
# put original values back
|
|
a.put(val, old_values)
|
|
elif method == 'cv':
|
|
for i, (cal, val) in enumerate(cv(m, nsets)):
|
|
Pcv[i,:,:] = pca(a[cal,:], aopt, mode='fast', scale='loads', center_axis = center_axis)['P']
|
|
else:
|
|
raise NotImplementedError(method)
|
|
|
|
return Pcv
|
|
|
|
def pls_jk(X, Y, a_opt, nsets=None, center_axis=[0,0], verbose=False):
|
|
""" Returns jack-knife segements of W.
|
|
|
|
*Parameters*:
|
|
|
|
X : {array}
|
|
Main data matrix (m, n)
|
|
Y : {array}
|
|
External row data (m, l)
|
|
a_opt : {integer}
|
|
The number of components to calculate (0, min(m,n))
|
|
nsets : (integer), optional
|
|
Number of jack-knife segments
|
|
|
|
center_axis : {boolean}, optional
|
|
- -1 : nothing
|
|
- 0 : row center
|
|
- 1 : column center
|
|
- 2 : double center
|
|
verbose : {boolean}, optional
|
|
Verbosity of console output. For use in debugging.
|
|
|
|
*Returns*:
|
|
|
|
Wcv : {array}
|
|
Loading-weights jack-knife segements
|
|
|
|
"""
|
|
m, n = X.shape
|
|
k, l = Y.shape
|
|
assert(m == k)
|
|
if nsets == None:
|
|
nsets = X.shape[0]
|
|
Wcv = empty((nsets, X.shape[1], a_opt), dtype=X.dtype)
|
|
for i, (cal, val) in enumerate(cv(k, nsets)):
|
|
if verbose:
|
|
print "Segment number: %d" %i
|
|
dat = pls(X[cal,:], Y[cal,:], a_opt, scale='loads', mode='fast', center_axis=center_axis)
|
|
Wcv[i,:,:] = dat['W']
|
|
|
|
return Wcv
|
|
|
|
def lpls_jk(X, Y, Z, a_opt, nsets=None, xz_alpha=.5, center_axis=[2,0,2], zorth=False, verbose=False):
|
|
"""Returns jack-knifed segments of lpls model.
|
|
|
|
Jack-knifing is a method to perturb the model paramters, hopefully
|
|
to be representable as a typical perturbation of a *future* sample.
|
|
The mean and variance of the jack knife segements may be used to
|
|
infer the paramter confidence in th model.
|
|
|
|
The segements returned are the X-block weights and Z-block weights.
|
|
|
|
|
|
*Parameters*:
|
|
|
|
X : {array}
|
|
Main data matrix (m, n)
|
|
Y : {array}
|
|
External row data (m, l)
|
|
Z : {array}
|
|
External column data (n, o)
|
|
a_opt : {integer}, optional
|
|
The number of components to calculate (0, min(m,n))
|
|
nsets : (integer), optional
|
|
Number of jack-knife segments
|
|
xz_alpha : {float}, optional
|
|
Parameter to control the amount of influence from Z-matrix.
|
|
0 is none, which returns a pls-solution, 1 is max
|
|
center_axis : {array-like}, optional
|
|
A three element array-like structure with elements in [-1,0,1,2],
|
|
that decides the type of centering used.
|
|
-1 : nothing
|
|
0 : row center
|
|
1 : column center
|
|
2 : double center
|
|
verbose : {boolean}, optional
|
|
Verbosity of console output. For use in debugging.
|
|
|
|
*Returns*:
|
|
|
|
Wx : {array}
|
|
X-block jack-knife segements
|
|
Wz : {array}
|
|
Z-block jack-knife segements
|
|
"""
|
|
|
|
m, n = X.shape
|
|
k, l = Y.shape
|
|
o, p = Z.shape
|
|
assert(m == k)
|
|
assert(n == p)
|
|
if nsets == None:
|
|
nsets = m
|
|
WWx = empty((nsets, n, a_opt), 'd')
|
|
WWz = empty((nsets, o, a_opt), 'd')
|
|
#WWy = empty((nsets, l, a_opt), 'd')
|
|
for i, (cal, val) in enumerate(cv(k, nsets)):
|
|
dat = lpls(X[cal], Y[cal], Z, a_max=a_opt,alpha=xz_alpha, center_axis=center_axis,
|
|
scale='loads', zorth=zorth, verbose=verbose)
|
|
WWx[i,:,:] = dat['W']
|
|
WWz[i,:,:] = dat['L']
|
|
#WWy[i,:,:] = dat['Q']
|
|
|
|
return WWx, WWz
|
|
|
|
def find_aopt_from_sep(err, method='vanilla'):
|
|
"""Returns an estimate of optimal number of components.
|
|
|
|
The estimate is based on the error of prediction from
|
|
crossvalidation. This is pretty much wild guessing and it is
|
|
recomended to inspect model parameters and prediction errors
|
|
closely before deciding on the optimal number of components.
|
|
|
|
*Parameters*:
|
|
|
|
err : {array}
|
|
Error of prediction
|
|
method : ['vanilla', '75perc']
|
|
Mehtod used to estimate optimal number of components
|
|
|
|
*Returns*:
|
|
|
|
aopt : {integer}
|
|
A guess on the optimal number of components
|
|
"""
|
|
|
|
if method == 'vanilla':
|
|
# min rmsep
|
|
rmsecv = sqrt(err.mean(0))
|
|
return rmsecv.argmin() + 1
|
|
|
|
elif method == '75perc':
|
|
raise NotImplementedError
|
|
#prct = .75 #percentile
|
|
#ind = 1.*err.shape[0]*prct
|
|
#med = median(err)
|
|
#prc_75 = []
|
|
#for col in err.T:
|
|
# col = sorted(col)
|
|
# prc_75.append(col[int(ind)])
|
|
#prc_75 = asarray(prc_75)
|
|
#for i in range(1, err.shape[1], 1):
|
|
# if med[i-1]<prc_75[i]:
|
|
# return i
|
|
#return len(med)
|
|
|
|
def cv(N, K, randomise=True, sequential=False):
|
|
"""Generates K (training, validation) index pairs.
|
|
|
|
Each pair is a partition of arange(N), where validation is an iterable
|
|
of length ~N/K, *without* replacement.
|
|
|
|
*Parameters*:
|
|
|
|
N : {integer}
|
|
Total number of samples
|
|
K : {integer}
|
|
Number of subset samples
|
|
randomise : {boolean}
|
|
Use random sampling
|
|
sequential : {boolean}
|
|
Use sequential sampling
|
|
|
|
*Returns*:
|
|
|
|
training : {array-like}
|
|
training-indices
|
|
validation : {array-like}
|
|
validation-indices
|
|
|
|
*Notes*:
|
|
|
|
If randomise is true, a copy of index is shuffled before partitioning,
|
|
|
|
otherwise its order is preserved in training and validation.
|
|
|
|
Randomise overrides the sequential argument. If randomise is true,
|
|
sequential is False
|
|
If sequential is true the index is partioned in continous blocks,
|
|
otherwise interleaved ordering is used.
|
|
|
|
"""
|
|
if K > N:
|
|
raise ValueError, "You cannot divide a list of %d samples into more than %d segments. Yout tried: %s" %(N, N, K)
|
|
index = xrange(N)
|
|
if randomise:
|
|
from random import shuffle
|
|
index = list(index)
|
|
shuffle(index)
|
|
sequential = False
|
|
if sequential:
|
|
for validation in array_split(index, K):
|
|
training = [i for i in index if i not in validation]
|
|
yield training, validation
|
|
else:
|
|
for k in xrange(K):
|
|
training = [i for i in index if i % K != k]
|
|
validation = [i for i in index if i % K == k]
|
|
yield training, validation
|
|
|
|
def diag_cv(shape, nsets=9, randomise=True):
|
|
"""Generates K (training, validation) index pairs.
|
|
|
|
*Parameters*:
|
|
|
|
shape : {tuple}
|
|
Array shape
|
|
nsets : {integer}
|
|
Number of cv sets
|
|
randomise : {boolean}
|
|
Randomise diagonal index
|
|
|
|
*Returns*:
|
|
|
|
training : {array-like}
|
|
training-indices
|
|
validation : {array-like}
|
|
validation-indices
|
|
|
|
*Notes*:
|
|
|
|
This index is based on the full index (raveled row-major ordering).
|
|
It extracts along diagonals to ensure balanced removal along both axis
|
|
"""
|
|
try:
|
|
m, n = shape
|
|
except:
|
|
raise ValueError("shape needs to be a two-tuple")
|
|
if nsets>m or nsets>n:
|
|
msg = "You may not use more subsets than max(n_rows, n_cols)"
|
|
raise ValueError, msg
|
|
|
|
nsets = min(m, n)
|
|
nm = n*m
|
|
index = arange(nm)
|
|
n_ind = arange(n+1)
|
|
#shuffle(n_ind) # random start diag
|
|
start_inds = array_split(n_ind, nsets)
|
|
for v in range(nsets):
|
|
validation = set()
|
|
for start in start_inds[v]:
|
|
ind = arange(start, nm, n+1)
|
|
validation.update(ind)
|
|
#training = [j for j in index if j not in validation]
|
|
yield list(validation)
|
|
|
|
def prediction_error(y_hat, y, method='squared'):
|
|
"""Loss function on multiclass Y.
|
|
|
|
Assumes y is a binary dummy class matrix (samples, classes)
|
|
"""
|
|
k, l = y.shape
|
|
a_max, kk, ll = y_hat.shape
|
|
error = empty((a_max, l))
|
|
for a in range(a_max):
|
|
yha = y_hat[a, :, :]
|
|
if method == 'hinge':
|
|
err = zeros((k, l))
|
|
for j in range(l):
|
|
for i in range(k):
|
|
if y[i,j] == 1:
|
|
if yha[i, j] >= 1:
|
|
err[i,j] = 0
|
|
else:
|
|
err[i,j] = abs(y[i,j] - yha[i,j])
|
|
elif y[i,j] == 0:
|
|
if yha[i, j] <= 0:
|
|
err[i,j] = 0
|
|
else:
|
|
err[i,j] = abs(y[i,j] - yha[i,j])
|
|
|
|
elif method == 'smooth_hinge':
|
|
err = zeros((k, l))
|
|
for j in range(l):
|
|
for i in range(k):
|
|
if y[i,j] == 1:
|
|
if yha[i, j] >= 1:
|
|
err[i,j] = 0
|
|
elif yha[i,j] < 1 and yha[i,j] > 0:
|
|
err[i,j] = abs(y[i,j] - yha[i,j])
|
|
else:
|
|
err[i,j] = 1
|
|
|
|
elif y[i,j] == 0:
|
|
if yha[i, j] <= 0:
|
|
err[i,j] = 0
|
|
elif yha[i,j] < 1 and yha[i,j] > 0:
|
|
err[i,j] = abs(y[i,j] - yha[i,j])
|
|
else:
|
|
err[i,j] = 1
|
|
|
|
elif method == 'abs':
|
|
err = abs(y - yha)
|
|
elif method == 'squared':
|
|
err = (y - yha)**2
|
|
elif method == '0/1':
|
|
pred = zeros((k, l))
|
|
for i, row in enumerate(yha):
|
|
largest = row.argsort()[-1]
|
|
pred[i, largest] = 1.
|
|
err = abs(y - pred)
|
|
elif method == '1/2':
|
|
yh = yha.copy()
|
|
yh[yha>.5] = 1
|
|
yh[yha<.5] = 0
|
|
err = abs(y - yh)
|
|
else:
|
|
raise ValueError("Option: %s (method) not valid" %method)
|
|
|
|
error[a,:] = err.mean(0)
|
|
|
|
return error
|
|
|
|
def _wkernel_pls_val(X, Y, a_max, n_blocks=None):
|
|
"""Returns pls crossvalidated predictions tailored for wide X.
|
|
|
|
The error of cross validation is calculated
|
|
based on random block cross-validation. With number of blocks equal to
|
|
number of samples [default] gives leave-one-out cv.
|
|
The pls model is based on the simpls algorithm for wide X, an is quite
|
|
fast in very high dimensional X data.
|
|
|
|
*Parameters*:
|
|
|
|
X : ndarray
|
|
column centered data matrix of size (samples x variables)
|
|
Y : ndarray
|
|
column centered response matrix of size (samples x responses)
|
|
a_max : scalar
|
|
Maximum number of components
|
|
n_blocks : scalar
|
|
Number of blocks in cross validation
|
|
|
|
*Returns*:
|
|
|
|
rmsep : ndarray
|
|
Root Mean Square Error of cross-validated Predictions
|
|
aopt : scalar
|
|
Guestimate of the optimal number of components
|
|
|
|
SeeAlso:
|
|
|
|
- pls_cv_val : Same output, not optimised for wide X
|
|
- w_simpls : Simpls algorithm for wide X
|
|
|
|
Notes
|
|
-----
|
|
|
|
Based (cowardly translated) on m-files from the Chemoact toolbox
|
|
X, Y inputs need to be centered (fixme: check)
|
|
|
|
|
|
Examples
|
|
--------
|
|
|
|
>>> import numpy as n
|
|
>>> X = n.array([[1., 2., 3.],[]])
|
|
>>> Y = n.array([[1., 2., 3.],[]])
|
|
>>> w_pls(X, Y, 1)
|
|
[4,5,6], 1
|
|
"""
|
|
dt = X.dtype
|
|
k, l = atleast_2d(Y).shape
|
|
if k == 1:
|
|
Y = Y.T
|
|
k, l = Y.shape
|
|
PRESS = zeros((l, a_max + 1), dtype=dt)
|
|
if n_blocks==None:
|
|
n_blocks = Y.shape[0]
|
|
XXt = dot(X, X.T)
|
|
for cal, val in cv(k, n_blocks):
|
|
ym = -sum(Y[val,:], 0)[newaxis]/(1.0*Y[cal,:].shape[0])
|
|
PRESS[:,0] = PRESS[:,0] + ((Y[val,:] - ym)**2).sum(0)
|
|
|
|
dat = w_simpls(XX[cal,:][:,cal], Y[cal,:], a_max)
|
|
Q, U, H = dat['Q'], dat['U'], dat['H']
|
|
That = dot(XX[val,:][:,cal], dot(U, inv(triu(dot(H.T, U))) ))
|
|
|
|
Yhat = zeros((a_max, k, l), dtype=dt)
|
|
for j in range(l):
|
|
TQ = dot(That, triu(dot(Q[j,:][:,newaxis], ones((1,a_max)))) )
|
|
Yhat[:,:,l] = TQ
|
|
E = Yout[:,j][:,newaxis] - TQ
|
|
E = E + sum(E, 0)/Din.shape[0]
|
|
PRESS[j,1:] = PRESS[j,1:] + sum(E**2, 0)
|
|
#Yhat = Yin - dot(That,Q.T)
|
|
msep = PRESS/(Y.shape[0])
|
|
#aopt = find_aopt_from_sep(msep)
|
|
return Yhat, sqrt(msep)
|
|
|
|
def _w_pls(aat, b, aopt):
|
|
""" Pls for wide matrices.
|
|
Fast pls for crossval, used in calc rmsep for wide X
|
|
There is no P or W. T is normalised
|
|
|
|
aat = centered kernel matrix
|
|
b = centered y
|
|
"""
|
|
bb = b.copy()
|
|
k, l = m_shape(b)
|
|
m, m = m_shape(aat)
|
|
U = empty((m, aopt)) # W
|
|
T = empty((m, aopt))
|
|
R = empty((m, aopt)) # R
|
|
PROJ = empty((m, aopt)) # P?
|
|
|
|
for i in range(aopt):
|
|
if has_sym:
|
|
s, q = symeig(dot(dot(b.T, aat), b), range=(l,l),overwrite=True)
|
|
else:
|
|
q, s, vh = svd(dot(dot(b.T, aat), b), full_matrices=0)
|
|
q = q[:,:1]
|
|
u = dot(b , q) #y-factor scores
|
|
U[:,i] = u.ravel()
|
|
t = dot(aat, u)
|
|
|
|
t = t/vnorm(t)
|
|
T[:,i] = t.ravel()
|
|
r = dot(aat, t)#score-weights
|
|
#r = r/vnorm(r)
|
|
R[:,i] = r.ravel()
|
|
PROJ[:,: i+1] = dot(T[:,:i+1], inv(dot(T[:,:i+1].T, R[:,:i+1])) )
|
|
if i<aopt:
|
|
b = b - dot(PROJ[:,:i+1], dot(R[:,:i+1].T, b) )
|
|
C = dot(bb.T, T)
|