This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
pyblm/pyblm/crossvalidation.py
2008-01-08 11:33:49 +00:00

744 lines
24 KiB
Python

"""This module implements some validation schemes l-pls.
The primary use is crossvalidation.
"""
__all__ = ['pca_val', 'pls_val', 'lpls_val', 'pls_jk', 'lpls_jk']
__docformat__ = "restructuredtext en"
from numpy import dot,empty,zeros,sqrt,atleast_2d,argmax,asarray,median,\
array_split,arange, isnan, any,newaxis,eye,diag,tile,ones
from numpy.random import shuffle
from engines import pls, pca, center
from engines import nipals_lpls as lpls
def pca_val(a, a_max, nsets=None, center_axis=[0], method='cv'):
"""Returns error estimate of crossvalidated PCA.
*Parameters*:
a : {array}
data matrix (n x m)
a_max : {integer}
Maximum number of components in model.
center_axis:
Centering
nsets : {integer}
number of segments
method : {['cv', 'diag']}
*Returns*:
rmsep : {array}
Squared error of prediction for each component and xvar (a_max, m)
xhat : {array}
Crossvalidated predicted a (a_max, m, n)
aopt : {integer} Estimate of optimal number of components
*Notes*:
- Crossvalidation of PCA is somewhat artificial as we do not have
any external information to predict. There are essentially two approaches
to this, one is to use a projection error, the other is to leave out
elements in the data matrix then record a missing-value estimator error.
- Crossvalidation method is currently set to random blocks of diagonals.
"""
n, m = a.shape
if nsets == None:
nsets = n
err = zeros((a_max + 1, n, m), dtype=a.dtype)
xhat = zeros((a_max + 1, n, m), dtype=a.dtype)
if center_axis[0] == 1:
a = a - a.mean(1)[:,newaxis]
center_axis = [-1]
elif center_axis[0] == 2:
a = a - a.mean(1)[:,newaxis]
center_axis = [0]
if method == 'diag':
mn_a = dot(ones((n,1)), a.mean(0)[newaxis])
for i, val in enumerate(diag_cv(a.shape, nsets)):
true_values = a.take(val)
new_values = mn_a.take(val)
# impute with mean values
a.put(val, new_values)
dat = pca(a, a_max, mode='fast', center_axis=center_axis)
Ti, Pi, mnx = dat['T'], dat['P'], dat['mnx']
# expand mean values
if center_axis[0] == 1:
mnx = tile(mnx, (1, m))
else:
mnx = tile(mnx, (n, 1))
err[0,:,:].put(val, (true_values - mnx.take(val))**2)
for j in xrange(a_max):
# estimate the imputed values
a_pred = (mnx + dot(Ti[:,:j+1], Pi[:,:j+1].T)).take(val)
err[j+1,:,:].put(val, (true_values - a_pred)**2)
xhat[j,:,:].put(val, a_pred)
# put original values back
a.put(val, true_values)
elif method == 'cv':
for i, (cal, val) in enumerate(cv(n, nsets)):
xcal = a[cal, :]
dat = pca(xcal, a_max, mode='fast', scale='scores', center_axis=center_axis)
P, mnx = dat['P'], dat['mnx']
xval = atleast_2d(a[val,:]) - mnx
err[0,val,:] = xval**2 #pc0
e = eye(m)
rmat = zeros((m, m))
for j, p in enumerate(P.T):
#d2 = diag(e) - (p**2).ravel()
p = p[:,newaxis]
e = e - dot(p, p.T)
d = diag(e)
es = e/atleast_2d(d)
xhat[j+1,val,:] = dot(xval, es)
err[j+1,val,:] = (dot(xval, es)**2)
rmsep = sqrt(err).mean(1) # take mean over samples
aopt = rmsep.mean(-1).argmin()
return rmsep, xhat, aopt, err
def pls_val(X, Y, a_max=2, nsets=None, center_axis=[0,0], verbose=False):
"""Performs crossvalidation for generalisation error in pls.
*Parameters*:
X : {array}
Main data matrix (m, n)
Y : {array}
External row data (m, l)
a_max : {integer}, optional
Maximum number of components to calculate (0, min(m,n))
nsets : (integer), optional
Number of crossvalidation sets
center_axis : {array-like}, optional
A three element array-like structure with elements in [-1,0,1,2],
that decides the type of centering used.
-1 : nothing
0 : row center
1 : column center
2 : double center
verbose : {boolean}, optional
Verbosity of console output. For use in debugging.
*Returns*:
rmsep : {array}
Root mean squred error of prediction (for each y-var)
yhat : {array}
Estimated responses
aopt : {integer}
Estimated value of optimal number of components
"""
dt = X.dtype
m, n = X.shape
k, l = Y.shape
assert m == k, "X (%d,%d) - Y (%d,%d) dim mismatch" %(m, n, k, l)
if nsets == None:
nsets = m
if nsets > X.shape[0]:
print "nsets (%d) is larger than number of variables (%d).\nnsets: %d -> %d" %(nsets, m, nsets, m)
nsets = m
if n > 15*m:
# boosting (wide x)
Yhat = _w_pls_predict(X, Y, a_max)
Yhat = empty((a_max, k, l), dtype=dt)
for cal, val in cv(k, nsets):
# do the training model
dat = pls(X[cal], Y[cal], aopt=a_max,center_axis=center_axis)
# center test data
xi = X[val,:] - dat['mnx']
ym = dat['mny']
# predictions
for a in range(a_max):
Yhat[a,val] = ym + dot(xi, dat['B'][a])
sep = (Y - Yhat)**2
rmsep = sqrt(sep.mean(1)).T
#aopt = find_aopt_from_sep(rmsep)
# todo: need a better support for classification error
error = prediction_error(Yhat, Y, method='squared')
return rmsep, Yhat, error
def lpls_val(X, Y, Z, a_max=2, nsets=None,alpha=.5, center_axis=[2,0,2], zorth=False, verbose=False):
"""Performs crossvalidation for generalisation error in lpls.
The L-PLS crossvalidation is estimated just like an ordinary pls
crossvalidation. That is, the generalisation error is estimated by
predicting samples (rows of X and Y) left out according to a cross
validation scheme.
*Parameters*:
X : {array}
Main data matrix (m, n)
Y : {array}
External row data (m, l)
Z : {array}
External column data (n, o)
a_max : {integer}, optional
Maximum number of components to calculate (0, min(m,n))
nsets : (integer), optional
Number of crossvalidation sets
alpha : {float}, optional
Parameter to control the amount of influence from Z-matrix.
0 is none, which returns a pls-solution, 1 is max
center_axis : {array-like}, optional
A three element array-like structure with elements in [-1,0,1,2],
that decides the type of centering used.
-1 : nothing
0 : row center
1 : column center
2 : double center
zorth : {boolean}
If true, Require orthogonal latent components in Z.
verbose : {boolean}, optional
Verbosity of console output. For use in debugging.
*Returns*:
rmsep : {array}
Root mean squred error of prediction
yhat : {array}
Estimated responses
aopt : {integer}
Estimated value of optimal number of components
"""
m, n = X.shape
k, l = Y.shape
o, p = Z.shape
assert m == k, "X (%d,%d) - Y (%d,%d) dim mismatch" %(m, n, k, l)
assert n == p, "X (%d,%d) - Z (%d,%d) dim mismatch" %(m, n, o, p)
if nsets == None:
nsets = m
if nsets > X.shape[0]:
print "nsets (%d) is larger than number of variables (%d).\nnsets: %d -> %d" %(nsets, m, nsets, m)
nsets = m
assert (alpha >= 0 and alpha<=1), "Alpha needs to be within [0,1], got: %.2f" %alpha
Yhat = empty((a_max, k, l), 'd')
for cal, val in cv(k, nsets):
# do the training model
dat = lpls(X[cal], Y[cal], Z, a_max=a_max, alpha=alpha,
center_axis=center_axis, zorth=zorth, verbose=verbose)
# center test data
if center_axis[0] == 2:
xi = X[val,:] - dat['mnx'] - X[val,:].mean(1)[:,newaxis] + dat['mnx'].mean()
else:
xi = X[val,:] - dat['mnx']
if center_axis[1] == 2:
ym = dat['mny'] + Y[val,:].mean(1)[:,newaxis] - dat['mny'].mean()
else:
ym = dat['mny']
# predictions
for a in range(a_max):
Yhat[a,val,:] = atleast_2d(ym + dot(xi, dat['B'][a]))
# todo: need a better support for classification error
y_is_class = Y.dtype.char.lower() in ['i','p', 'b', 'h','?']
if y_is_class:
pass
#Yhat, err = class_error(Yhat, Y)
#return Yhat, err
sep = (Y - Yhat)**2
rmsep = sqrt(sep.mean(1)).T
#aopt = find_aopt_from_sep(rmsep)
# todo: need a better support for classification error
error = prediction_error(Yhat, Y, method='1/2')
return rmsep, Yhat, error
def pca_jk(a, aopt, nsets=None, center_axis=[0], method='cv'):
"""Returns jack-knife segments from PCA.
*Parameters*:
a : {array}
data matrix (n x m)
aopt : {integer}
number of components in model.
center_axis:
Centering
nsets : {integer}
number of segments
method : {'cv', 'diag', 'bs', 'bs_diag'}
Perturbation method is one of:
cv = leave samples out
diag = impute diagonals
bs = leave samples out with replacement (bootstrap)
bs_diag = impute diagonals
*Returns*:
Pcv : {array}
Loadings collected in a three way matrix (n_segments, m, aopt)
*Notes*:
Nope
"""
m, n = a.shape
if nsets == None:
nsets = m
Pcv = empty((nsets, a.shape[1], aopt), dtype=a.dtype)
if method == 'diag':
mn_a = .5*(a.mean(0) + a.mean(1)[:,newaxis])
for i, val in enumerate(diag_cv(a.shape, nsets)):
old_values = a.take(val)
new_values = mn_a.take(val)
# impute mean values
a.put(val, new_values)
dat = pca(a, aopt, mode='fast', scale='loads', center_axis=center_axis)
Pcv[i,:,:] = dat['P']
# put original values back
a.put(val, old_values)
elif method == 'cv':
for i, (cal, val) in enumerate(cv(m, nsets)):
Pcv[i,:,:] = pca(a[cal,:], aopt, mode='fast', scale='loads', center_axis = center_axis)['P']
else:
raise NotImplementedError(method)
return Pcv
def pls_jk(X, Y, a_opt, nsets=None, center_axis=[0,0], verbose=False):
""" Returns jack-knife segements of W.
*Parameters*:
X : {array}
Main data matrix (m, n)
Y : {array}
External row data (m, l)
a_opt : {integer}
The number of components to calculate (0, min(m,n))
nsets : (integer), optional
Number of jack-knife segments
center_axis : {boolean}, optional
- -1 : nothing
- 0 : row center
- 1 : column center
- 2 : double center
verbose : {boolean}, optional
Verbosity of console output. For use in debugging.
*Returns*:
Wcv : {array}
Loading-weights jack-knife segements
"""
m, n = X.shape
k, l = Y.shape
assert(m == k)
if nsets == None:
nsets = X.shape[0]
Wcv = empty((nsets, X.shape[1], a_opt), dtype=X.dtype)
for i, (cal, val) in enumerate(cv(k, nsets)):
if verbose:
print "Segment number: %d" %i
dat = pls(X[cal,:], Y[cal,:], a_opt, scale='loads', mode='fast', center_axis=center_axis)
Wcv[i,:,:] = dat['W']
return Wcv
def lpls_jk(X, Y, Z, a_opt, nsets=None, xz_alpha=.5, center_axis=[2,0,2], zorth=False, verbose=False):
"""Returns jack-knifed segments of lpls model.
Jack-knifing is a method to perturb the model paramters, hopefully
to be representable as a typical perturbation of a *future* sample.
The mean and variance of the jack knife segements may be used to
infer the paramter confidence in th model.
The segements returned are the X-block weights and Z-block weights.
*Parameters*:
X : {array}
Main data matrix (m, n)
Y : {array}
External row data (m, l)
Z : {array}
External column data (n, o)
a_opt : {integer}, optional
The number of components to calculate (0, min(m,n))
nsets : (integer), optional
Number of jack-knife segments
xz_alpha : {float}, optional
Parameter to control the amount of influence from Z-matrix.
0 is none, which returns a pls-solution, 1 is max
center_axis : {array-like}, optional
A three element array-like structure with elements in [-1,0,1,2],
that decides the type of centering used.
-1 : nothing
0 : row center
1 : column center
2 : double center
verbose : {boolean}, optional
Verbosity of console output. For use in debugging.
*Returns*:
Wx : {array}
X-block jack-knife segements
Wz : {array}
Z-block jack-knife segements
"""
m, n = X.shape
k, l = Y.shape
o, p = Z.shape
assert(m == k)
assert(n == p)
if nsets == None:
nsets = m
WWx = empty((nsets, n, a_opt), 'd')
WWz = empty((nsets, o, a_opt), 'd')
#WWy = empty((nsets, l, a_opt), 'd')
for i, (cal, val) in enumerate(cv(k, nsets)):
dat = lpls(X[cal], Y[cal], Z, a_max=a_opt,alpha=xz_alpha, center_axis=center_axis,
scale='loads', zorth=zorth, verbose=verbose)
WWx[i,:,:] = dat['W']
WWz[i,:,:] = dat['L']
#WWy[i,:,:] = dat['Q']
return WWx, WWz
def find_aopt_from_sep(err, method='vanilla'):
"""Returns an estimate of optimal number of components.
The estimate is based on the error of prediction from
crossvalidation. This is pretty much wild guessing and it is
recomended to inspect model parameters and prediction errors
closely before deciding on the optimal number of components.
*Parameters*:
err : {array}
Error of prediction
method : ['vanilla', '75perc']
Mehtod used to estimate optimal number of components
*Returns*:
aopt : {integer}
A guess on the optimal number of components
"""
if method == 'vanilla':
# min rmsep
rmsecv = sqrt(err.mean(0))
return rmsecv.argmin() + 1
elif method == '75perc':
raise NotImplementedError
#prct = .75 #percentile
#ind = 1.*err.shape[0]*prct
#med = median(err)
#prc_75 = []
#for col in err.T:
# col = sorted(col)
# prc_75.append(col[int(ind)])
#prc_75 = asarray(prc_75)
#for i in range(1, err.shape[1], 1):
# if med[i-1]<prc_75[i]:
# return i
#return len(med)
def cv(N, K, randomise=True, sequential=False):
"""Generates K (training, validation) index pairs.
Each pair is a partition of arange(N), where validation is an iterable
of length ~N/K, *without* replacement.
*Parameters*:
N : {integer}
Total number of samples
K : {integer}
Number of subset samples
randomise : {boolean}
Use random sampling
sequential : {boolean}
Use sequential sampling
*Returns*:
training : {array-like}
training-indices
validation : {array-like}
validation-indices
*Notes*:
If randomise is true, a copy of index is shuffled before partitioning,
otherwise its order is preserved in training and validation.
Randomise overrides the sequential argument. If randomise is true,
sequential is False
If sequential is true the index is partioned in continous blocks,
otherwise interleaved ordering is used.
"""
if K > N:
raise ValueError, "You cannot divide a list of %d samples into more than %d segments. Yout tried: %s" %(N, N, K)
index = xrange(N)
if randomise:
from random import shuffle
index = list(index)
shuffle(index)
sequential = False
if sequential:
for validation in array_split(index, K):
training = [i for i in index if i not in validation]
yield training, validation
else:
for k in xrange(K):
training = [i for i in index if i % K != k]
validation = [i for i in index if i % K == k]
yield training, validation
def diag_cv(shape, nsets=9, randomise=True):
"""Generates K (training, validation) index pairs.
*Parameters*:
shape : {tuple}
Array shape
nsets : {integer}
Number of cv sets
randomise : {boolean}
Randomise diagonal index
*Returns*:
training : {array-like}
training-indices
validation : {array-like}
validation-indices
*Notes*:
This index is based on the full index (raveled row-major ordering).
It extracts along diagonals to ensure balanced removal along both axis
"""
try:
m, n = shape
except:
raise ValueError("shape needs to be a two-tuple")
if nsets>m or nsets>n:
msg = "You may not use more subsets than max(n_rows, n_cols)"
raise ValueError, msg
nsets = min(m, n)
nm = n*m
index = arange(nm)
n_ind = arange(n+1)
#shuffle(n_ind) # random start diag
start_inds = array_split(n_ind, nsets)
for v in range(nsets):
validation = set()
for start in start_inds[v]:
ind = arange(start, nm, n+1)
validation.update(ind)
#training = [j for j in index if j not in validation]
yield list(validation)
def prediction_error(y_hat, y, method='squared'):
"""Loss function on multiclass Y.
Assumes y is a binary dummy class matrix (samples, classes)
"""
k, l = y.shape
a_max, kk, ll = y_hat.shape
error = empty((a_max, l))
for a in range(a_max):
yha = y_hat[a, :, :]
if method == 'hinge':
err = zeros((k, l))
for j in range(l):
for i in range(k):
if y[i,j] == 1:
if yha[i, j] >= 1:
err[i,j] = 0
else:
err[i,j] = abs(y[i,j] - yha[i,j])
elif y[i,j] == 0:
if yha[i, j] <= 0:
err[i,j] = 0
else:
err[i,j] = abs(y[i,j] - yha[i,j])
elif method == 'smooth_hinge':
err = zeros((k, l))
for j in range(l):
for i in range(k):
if y[i,j] == 1:
if yha[i, j] >= 1:
err[i,j] = 0
elif yha[i,j] < 1 and yha[i,j] > 0:
err[i,j] = abs(y[i,j] - yha[i,j])
else:
err[i,j] = 1
elif y[i,j] == 0:
if yha[i, j] <= 0:
err[i,j] = 0
elif yha[i,j] < 1 and yha[i,j] > 0:
err[i,j] = abs(y[i,j] - yha[i,j])
else:
err[i,j] = 1
elif method == 'abs':
err = abs(y - yha)
elif method == 'squared':
err = (y - yha)**2
elif method == '0/1':
pred = zeros((k, l))
for i, row in enumerate(yha):
largest = row.argsort()[-1]
pred[i, largest] = 1.
err = abs(y - pred)
elif method == '1/2':
yh = yha.copy()
yh[yha>.5] = 1
yh[yha<.5] = 0
err = abs(y - yh)
else:
raise ValueError("Option: %s (method) not valid" %method)
error[a,:] = err.mean(0)
return error
def _wkernel_pls_val(X, Y, a_max, n_blocks=None):
"""Returns pls crossvalidated predictions tailored for wide X.
The error of cross validation is calculated
based on random block cross-validation. With number of blocks equal to
number of samples [default] gives leave-one-out cv.
The pls model is based on the simpls algorithm for wide X, an is quite
fast in very high dimensional X data.
*Parameters*:
X : ndarray
column centered data matrix of size (samples x variables)
Y : ndarray
column centered response matrix of size (samples x responses)
a_max : scalar
Maximum number of components
n_blocks : scalar
Number of blocks in cross validation
*Returns*:
rmsep : ndarray
Root Mean Square Error of cross-validated Predictions
aopt : scalar
Guestimate of the optimal number of components
SeeAlso:
- pls_cv_val : Same output, not optimised for wide X
- w_simpls : Simpls algorithm for wide X
Notes
-----
Based (cowardly translated) on m-files from the Chemoact toolbox
X, Y inputs need to be centered (fixme: check)
Examples
--------
>>> import numpy as n
>>> X = n.array([[1., 2., 3.],[]])
>>> Y = n.array([[1., 2., 3.],[]])
>>> w_pls(X, Y, 1)
[4,5,6], 1
"""
dt = X.dtype
k, l = atleast_2d(Y).shape
if k == 1:
Y = Y.T
k, l = Y.shape
PRESS = zeros((l, a_max + 1), dtype=dt)
if n_blocks==None:
n_blocks = Y.shape[0]
XXt = dot(X, X.T)
for cal, val in cv(k, n_blocks):
ym = -sum(Y[val,:], 0)[newaxis]/(1.0*Y[cal,:].shape[0])
PRESS[:,0] = PRESS[:,0] + ((Y[val,:] - ym)**2).sum(0)
dat = w_simpls(XX[cal,:][:,cal], Y[cal,:], a_max)
Q, U, H = dat['Q'], dat['U'], dat['H']
That = dot(XX[val,:][:,cal], dot(U, inv(triu(dot(H.T, U))) ))
Yhat = zeros((a_max, k, l), dtype=dt)
for j in range(l):
TQ = dot(That, triu(dot(Q[j,:][:,newaxis], ones((1,a_max)))) )
Yhat[:,:,l] = TQ
E = Yout[:,j][:,newaxis] - TQ
E = E + sum(E, 0)/Din.shape[0]
PRESS[j,1:] = PRESS[j,1:] + sum(E**2, 0)
#Yhat = Yin - dot(That,Q.T)
msep = PRESS/(Y.shape[0])
#aopt = find_aopt_from_sep(msep)
return Yhat, sqrt(msep)
def _w_pls(aat, b, aopt):
""" Pls for wide matrices.
Fast pls for crossval, used in calc rmsep for wide X
There is no P or W. T is normalised
aat = centered kernel matrix
b = centered y
"""
bb = b.copy()
k, l = m_shape(b)
m, m = m_shape(aat)
U = empty((m, aopt)) # W
T = empty((m, aopt))
R = empty((m, aopt)) # R
PROJ = empty((m, aopt)) # P?
for i in range(aopt):
if has_sym:
s, q = symeig(dot(dot(b.T, aat), b), range=(l,l),overwrite=True)
else:
q, s, vh = svd(dot(dot(b.T, aat), b), full_matrices=0)
q = q[:,:1]
u = dot(b , q) #y-factor scores
U[:,i] = u.ravel()
t = dot(aat, u)
t = t/vnorm(t)
T[:,i] = t.ravel()
r = dot(aat, t)#score-weights
#r = r/vnorm(r)
R[:,i] = r.ravel()
PROJ[:,: i+1] = dot(T[:,:i+1], inv(dot(T[:,:i+1].T, R[:,:i+1])) )
if i<aopt:
b = b - dot(PROJ[:,:i+1], dot(R[:,:i+1].T, b) )
C = dot(bb.T, T)