From 4866984e0093bd51064628ad4ba5bc203432fcd9 Mon Sep 17 00:00:00 2001 From: flatberg <flatberg@pvv.ntnu.no> Date: Wed, 10 Oct 2007 12:19:21 +0000 Subject: [PATCH] Initial import --- COPYING | 0 README | 0 __init__.py | 15 ++ crossvalidation.py | 264 ++++++++++++++++++++++++++++++++++ engines.py | 298 +++++++++++++++++++++++++++++++++++++++ setup.py | 50 +++++++ statistics.py | 296 ++++++++++++++++++++++++++++++++++++++ tests/test_lplsengine.py | 143 +++++++++++++++++++ 8 files changed, 1066 insertions(+) create mode 100644 COPYING create mode 100644 README create mode 100644 __init__.py create mode 100644 crossvalidation.py create mode 100644 engines.py create mode 100644 setup.py create mode 100644 statistics.py create mode 100644 tests/test_lplsengine.py diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..e69de29 diff --git a/README b/README new file mode 100644 index 0000000..e69de29 diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..42ebe6c --- /dev/null +++ b/__init__.py @@ -0,0 +1,15 @@ +from crossvalidation import lpls_val +from statistics import lpls_qvals +from engines import nipals_lpls as lpls +#from tests import * + +__version__ = '0.0.1' + +def test(level=1, verbosity=1): + import os, sys + print 'lplslib is installed in %s' % (os.path.split(__file__)[0],) + print 'lpls ib version %s' % (__version__,) + print 'Python version %s' % (sys.version.replace('\n', '',),) + from numpy.testing import NumpyTest + return NumpyTest().test(level, verbosity) + diff --git a/crossvalidation.py b/crossvalidation.py new file mode 100644 index 0000000..000fbef --- /dev/null +++ b/crossvalidation.py @@ -0,0 +1,264 @@ +"""This module implements some validation schemes l-pls. + +The primary use is crossvalidation. +""" +__all__ = ['lpls_val', 'lpls_jk'] +__docformat__ = "restructuredtext en" + +from numpy import dot,empty,zeros,sqrt,atleast_2d,argmax,asarray,median,\ + array_split + +from engines import nipals_lpls as lpls + + +def lpls_val(X, Y, Z, a_max=2, nsets=None,alpha=.5, mean_ctr=[2,0,2],verbose=True): + """Performs crossvalidation for generalisation error in lpls. + + The L-PLS crossvalidation is estimated just like an ordinary pls + crossvalidation. That is, the generalisation error is estimated by + predicting samples (rows of X and Y) left out according to a cross + validation scheme. + + *Parameters*: + + X : {array} + Main data matrix (m, n) + Y : {array} + External row data (m, l) + Z : {array} + External column data (n, o) + a_max : {integer}, optional + Maximum number of components to calculate (0, min(m,n)) + nsets : (integer), optional + Number of crossvalidation sets + alpha : {float}, optional + Parameter to control the amount of influence from Z-matrix. + 0 is none, which returns a pls-solution, 1 is max + mean_center : {array-like}, optional + A three element array-like structure with elements in [-1,0,1,2], + that decides the type of centering used. + -1 : nothing + 0 : row center + 1 : column center + 2 : double center + verbose : {boolean}, optional + Verbosity of console output. For use in debugging. + + *Returns*: + + rmsep : {array} + Root mean squred error of prediction + yhat : {array} + Estimated responses + aopt : {integer} + Estimated value of optimal number of components + + """ + + m, n = X.shape + k, l = Y.shape + o, p = Z.shape + assert m==k, "X (%d,%d) - Y (%d,%d) dim mismatch" %(m,n,k,l) + assert n==p, "X (%d,%d) - Z (%d,%d) dim mismatch" %(m,n,o,p) + if nsets == None: + nsets = m + if nsets > X.shape[0]: + print "nsets (%d) is larger than number of variables (%d).\nnsets: %d -> %d" %(nsets, m, nsets, m) + nsets = m + assert (alpha >= 0 and alpha<=1), "Alpha needs to be within [0,1], got: %.2f" %alpha + + Yhat = empty((a_max, k, l), 'd') + for cal, val in cv(nsets, k): + dat = lpls(X[cal],Y[cal],Z,a_max=a_max,alpha=alpha,mean_ctr=mean_ctr,verbose=verbose) + if mean_ctr[0] != 1: + xi = X[val,:] - dat['mnx'] + else: + xi = X[val] - X[val].mean(1)[:,newaxis] + if mean_ctr[2] != 1: + ym = dat['mny'] + else: + ym = Y[val].mean(1)[:,newaxis] #???: check this + for a in range(a_max): + Yhat[a,val,:] = atleast_2d(ym + dot(xi, dat['B'][a])) + + # todo: need a better support for classification error + y_is_class = Y.dtype.char.lower() in ['i','p', 'b', 'h','?'] + if y_is_class: + Yhat, err = class_error(Yhat,Y) + return Yhat, err + + sep = (Y - Yhat)**2 + rmsep = sqrt(sep.mean(1)).T + aopt = find_aopt_from_sep(rmsep) + + return rmsep, Yhat, aopt + + +def lpls_jk(X, Y, Z, a_max, nsets=None, xz_alpha=.5, mean_ctr=[2,0,2], verbose=False): + """Returns jack-knifed segments of lpls model. + + Jack-knifing is a method to perturb the model paramters, hopefully + to be representable as a typical perturbation of a *future* sample. + The mean and variance of the jack knife segements may be used to + infer the paramter confidence in th model. + + The segements returned are the X-block weights and Z-block weights. + + + *Parameters*: + + X : {array} + Main data matrix (m, n) + Y : {array} + External row data (m, l) + Z : {array} + External column data (n, o) + a_max : {integer}, optional + Maximum number of components to calculate (0, min(m,n)) + nsets : (integer), optional + Number of jack-knife segments + xz_alpha : {float}, optional + Parameter to control the amount of influence from Z-matrix. + 0 is none, which returns a pls-solution, 1 is max + mean_center : {array-like}, optional + A three element array-like structure with elements in [-1,0,1,2], + that decides the type of centering used. + -1 : nothing + 0 : row center + 1 : column center + 2 : double center + verbose : {boolean}, optional + Verbosity of console output. For use in debugging. + + *Returns*: + + Wx : {array} + X-block jack-knife segements + Wz : {array} + Z-block jack-knife segements + """ + + m, n = X.shape + k, l = Y.shape + o, p = Z.shape + assert(m==k) + assert(n==p) + if nsets==None: + nsets = m + WWx = empty((nsets, n, a_max), 'd') + WWz = empty((nsets, o, a_max), 'd') + #WWy = empty((nsets, l, a_max), 'd') + for i, (cal, val) in enumerate(cv(k, nsets)): + dat = lpls(X[cal],Y[cal],Z,a_max=a_max,alpha=xz_alpha,mean_ctr=mean_ctr, + scale='loads', verbose=verbose) + WWx[i,:,:] = dat['W'] + WWz[i,:,:] = dat['L'] + #WWy[i,:,:] = dat['Q'] + + return WWx, WWz + +def find_aopt_from_sep(sep, method='vanilla'): + """Returns an estimate of optimal number of components. + + The estimate is based on the squared error of prediction from + crossvalidation. This is pretty much wild guessing and it is + recomended to inspect model parameters and prediction errors + closely before deciding on the optimal number of components. + + *Parameters*: + sep : {array} + Squared error of prediction + method : ['vanilla', '75perc'] + Mehtod used to estimate optimal number of components + + *Returns*: + aopt : {integer} + A guess on the optimal number of components + """ + + if method=='vanilla': + # min rmsep + rmsecv = sqrt(sep.mean(0)) + return rmsecv.argmin() + 1 + + elif method=='75perc': + prct = .75 #percentile + ind = 1.*sep.shape[0]*prct + med = median(sep) + prc_75 = [] + for col in sep.T: + col = sorted(col) + prc_75.append(col[int(ind)]) + prc_75 = asarray(prc_75) + for i in range(1, sep.shape[1], 1): + if med[i-1]<prc_75[i]: + return i + return len(med) + +def cv(N, K, randomise=True, sequential=False): + """Generates K (training, validation) index pairs. + + Each pair is a partition of arange(N), where validation is an iterable + of length ~N/K, *without* replacement. + + *Parameters*: + N : {integer} + Total number of samples + K : {integer} + Number of subset samples + randomise : {boolean} + Use random sampling + sequential : {boolean} + Use sequential sampling + + *Returns*: + training : {array-like} + training-indices + validation : {array-like} + validation-indices + + *Notes*: + If randomise is true, a copy of index is shuffled before partitioning, + otherwise its order is preserved in training and validation. + + Randomise overrides the sequential argument. If randomise is true, + sequential is False + If sequential is true the index is partioned in continous blocks, + otherwise interleaved ordering is used. + + """ + if K>N: + raise ValueError, "You cannot divide a list of %d samples into more than %d segments. Yout tried: %s" %(N,N,K) + index = xrange(N) + if randomise: + from random import shuffle + index = list(index) + shuffle(index) + sequential = False + if sequential: + for validation in array_split(index, K): + training = [i for i in index if i not in validation] + yield training, validation + else: + for k in xrange(K): + training = [i for i in index if i % K != k] + validation = [i for i in index if i % K == k] + yield training, validation + + +def class_error(Yhat, Y, method='vanilla'): + """ Not used. + """ + a_max, k, l = Yhat.shape + Yhat_c = zeros((k, l), dtype='d') + for a in range(a_max): + for i in range(k): + Yhat_c[a,val,argmax(Yhat[a,val,:])] = 1.0 + err = 100*((Yhat_c + Y) == 2).sum(1)/Y.sum(0).astype('d') + + return Yhat_c, err + +def class_errorII(T, Y, method='lda'): + """ Not used ... + """ + pass diff --git a/engines.py b/engines.py new file mode 100644 index 0000000..fa44244 --- /dev/null +++ b/engines.py @@ -0,0 +1,298 @@ +"""Module contain algorithms for low-rank L-shaped model. + +""" + +__all__ = ['nipals_lpls'] +__docformat__ = "restructuredtext en" + +from math import sqrt as msqrt + +from numpy import dot,empty,zeros,apply_along_axis,newaxis,finfo +from numpy.linalg import inv + + +def nipals_lpls(X, Y, Z, a_max, alpha=.7, mean_ctr=[2, 0, 1], scale='scores', verbose=False): + """ L-shaped Partial Least Sqaures Regression by the nipals algorithm. + + An L-shaped low rank model aproximates three matrices in a hyploid + structure. That means that the main matrix (X), has one matrix asociated + with its row space and one to its column space. A simultanously low rank estiamte + of these three matrices tries to discover common directions/subspaces. + + *Parameters*: + + X : {array} + Main data matrix (m, n) + Y : {array} + External row data (m, l) + Z : {array} + External column data (n, o) + a_max : {integer} + Maximum number of components to calculate (0, min(m,n)) + alpha : {float}, optional + Parameter to control the amount of influence from Z-matrix. + 0 is none, which returns a pls-solution, 1 is max + mean_center : {array-like}, optional + A three element array-like structure with elements in [-1,0,1,2], + that decides the type of centering used. + -1 : nothing + 0 : row center + 1 : column center + 2 : double center + scale : {'scores', 'loads'}, optional + Option to decide on where the scale goes. + verbose : {boolean}, optional + Verbosity of console output. For use in debugging. + + *Returns*: + + T : {array} + X-scores + W : {array} + X-weights/Z-weights + P : {array} + X-loadings + Q : {array} + Y-loadings + U : {array} + X-Y relation + L : {array} + Z-scores + K : {array} + Z-loads + B : {array} + Regression coefficients X->Y + evx : {array} + X-explained variance + evy : {array} + Y-explained variance + evz : {array} + Z-explained variance + mnx : {array} + X location + mny : {array} + Y location + mnz : {array} + Z location + + *References* + Saeboe et al., LPLS-regression: a method for improved prediction and + classification through inclusion of background information on + predictor variables, J. of chemometrics and intell. laboratory syst. + + Martens et.al, Regression of a data matrix on descriptors of + both its rows and of its columns via latent variables: L-PLSR, + Computational statistics & data analysis, 2005 + + """ + m, n = X.shape + k, l = Y.shape + u, o = Z.shape + max_rank = min(m, n) + assert (a_max>0 and a_max<max_rank), "Number of comp error:\ + tried:%d, max_rank:%d" %(a_max,max_rank) + + if mean_ctr!=None: + xctr, yctr, zctr = mean_ctr + X, mnX = center(X, xctr) + Y, mnY = center(Y, yctr) + Z, mnZ = center(Z, zctr) + + varX = (X**2).sum() + varY = (Y**2).sum() + varZ = (Z**2).sum() + + # initialize + U = empty((k, a_max)) + Q = empty((l, a_max)) + T = zeros((m, a_max)) + W = empty((n, a_max)) + P = empty((n, a_max)) + K = empty((o, a_max)) + L = empty((u, a_max)) + B = empty((a_max, n, l)) + E = X.copy() + F = Y.copy() + G = Z.copy() + #b0 = empty((a_max, 1, l)) + var_x = empty((a_max,)) + var_y = empty((a_max,)) + var_z = empty((a_max,)) + + MAX_ITER = 450 + LIM = finfo(X.dtype).resolution + is_rd = False + for a in range(a_max): + if verbose: + print "\nWorking on comp. %s" %a + u = F[:,:1] + diff = 1 + niter = 0 + while (diff>LIM and niter<MAX_ITER): + niter += 1 + u1 = u.copy() + w = dot(E.T, u) + wn = msqrt(dot(w.T, w)) + if wn < LIM: + print "Rank exhausted in X! Comp: %d " %a + is_rd = True + break + w = w/wn + #w = w/dot(w.T, w) + l = dot(G, w) + k = dot(G.T, l) + k = k/msqrt(dot(k.T, k)) + #k = k/dot(k.T, k) + w = alpha*k + (1-alpha)*w + w = w/msqrt(dot(w.T, w)) + t = dot(E, w) + c = dot(F.T, t) + c = c/msqrt(dot(c.T, c)) + u = dot(F, c) + diff = dot((u-u1).T, (u-u1)) + + if verbose: + print "Converged after %s iterations" %niter + print "Error: %.2E" %diff + + if is_rd: + print "Hei og haa ... rank deficient, this should really not happen" + break + tt = dot(t.T, t) + p = dot(X.T, t)/tt + q = dot(Y.T, t)/tt + l = dot(Z, w) + #k = dot(Z.T, l)/dot(l.T, l) + + U[:,a] = u.ravel() + W[:,a] = w.ravel() + P[:,a] = p.ravel() + T[:,a] = t.ravel() + Q[:,a] = q.ravel() + L[:,a] = l.ravel() + K[:,a] = k.ravel() + + + E = E - dot(t, p.T) + F = F - dot(t, q.T) + G = (G.T - dot(k, l.T)).T + + var_x[a] = pow(E, 2).sum() + var_y[a] = pow(F, 2).sum() + var_z[a] = pow(G, 2).sum() + + B[a] = dot(dot(W[:,:a+1], inv(dot(P[:,:a+1].T, W[:,:a+1]))), Q[:,:a+1].T) + #b0[a] = mnY - dot(mnX, B[a]) + + + # variance explained + evx = 100.*(1 - var_x/varX) + evy = 100.*(1 - var_y/varY) + evz = 100.*(1 - var_z/varZ) + + if scale=='loads': + tnorm = apply_along_axis(vnorm, 0, T) + T = T/tnorm + W = W*tnorm + Q = Q*tnorm + knorm = apply_along_axis(vnorm, 0, K) + L = L*knorm + K = K/knorm + + return {'T':T, 'W':W, 'P':P, 'Q':Q, 'U':U, 'L':L, 'K':K, 'B':B, 'E': E, 'F': F, 'G': G, 'evx':evx, 'evy':evy, 'evz':evz,'mnx': mnX, 'mny': mnY, 'mnz': mnZ} + +def vnorm(a): + """Returns the norm of a vector. + + *Parameters*: + + a : {array} + Input data, 1-dim, or column vector (m, 1) + + *Returns*: + + a_norm : {array} + Norm of input vector + + """ + return msqrt(dot(a.T,a)) + +def center(a, axis): + """ Matrix centering. + + *Parameters*: + + a : {array} + Input data + axis : {integer} + Which centering to perform. + 0 = col center, 1 = row center, 2 = double center + -1 = nothing + + *Returns*: + + a_centered : {array} + Centered data matrix + mn : {array} + Location vector/matrix + """ + + # check if we have a vector + is_vec = len(a.shape)==1 + if not is_vec: + is_vec = a.shape[0]==1 or a.shape[1]==1 + if is_vec: + if axis==2: + warnings.warn("Double centering of vecor ignored, using ordinary centering") + if axis==-1: + mn = 0 + else: + mn = a.mean() + return a - mn, mn + # !!!fixme: use broadcasting + if axis==-1: + mn = zeros((1,a.shape[1],)) + #mn = tile(mn, (a.shape[0], 1)) + elif axis==0: + mn = a.mean(0)[newaxis] + #mn = tile(mn, (a.shape[0], 1)) + elif axis==1: + mn = a.mean(1)[:,newaxis] + #mn = tile(mn, (1, a.shape[1])) + elif axis==2: + mn = a.mean(0)[newaxis] + a.mean(1)[:,newaxis] - a.mean() + return a - mn , a.mean(0)[newaxis] + else: + raise IOError("input error: axis must be in [-1,0,1,2]") + + return a - mn, mn + +def _scale(a, axis): + """ Matrix scaling to unit variance. + + *Parameters*: + + a : {array} + Input data + axis : {integer} + Which scaling to perform. + 0 = column, 1 = row, -1 = nothing + + *Returns*: + + a_scaled : {array} + Scaled data matrix + mn : {array} + Scaling vector/matrix + """ + + if axis==-1: + sc = zeros((a.shape[1],)) + elif axis==0: + sc = a.std(0) + elif axis==1: + sc = a.std(1)[:,newaxis] + else: + raise IOError("input error: axis must be in [-1,0,1]") + + return a - sc, sc diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..60debf1 --- /dev/null +++ b/setup.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +#from distutils.core import setup + +short_description=\ +"""Library routines for performing L-shaped matrix decompositon. +""" + +long_description=\ +"""Library for performing L-shaped low rank models. An L shaped decomposition +is a a situation where a matrices X (n, p), Y (n, o) and Z (k, p) are +aproximated by low rank bilinear models (X ~ TP', Y~ TQ', Z ~ OW') in a way +that common patterns between the X-Y, and X-Z are identified. +""" + +classifiers = """\ +Development Status :: 4 - Beta +Environment :: Console +Intended Audience :: Developers +Intended Audience :: Science/Research +License :: OSI Approved :: GPL License +Operating System :: OS Independent +Programming Language :: Python +Topic :: Scientific/Engineering +Topic :: Software Development :: Libraries :: Python Modules +""" + +import __init__ + +def configuration(parent_package='',top_path=None): + from numpy.distutils.misc_util import Configuration + config = Configuration('lplslib', parent_package, top_path) + config.add_data_dir('tests') + #config.add_data_files(['lplslib',('COPYING','README')]) + #config.add_subpackage('lplslib') + return config + +if __name__ == "__main__": + from numpy.distutils.core import setup + config = configuration(top_path='').todict() + config['author'] = 'Arnar Flatberg' + config['author_email'] = 'arnar.flatberg at gmail.com' + config['short_description'] = short_description + config['long_description'] = long_description + config['url'] = 'http://dev.pvv.org' + config['version'] = __init__.__version__ + config['license'] = 'GPL v2' + config['platforms'] = ['Linux'] + config['classifiers'] = filter(None, classifiers.split('\n')) + setup(**config) diff --git a/statistics.py b/statistics.py new file mode 100644 index 0000000..9253180 --- /dev/null +++ b/statistics.py @@ -0,0 +1,296 @@ +""" A collection of some statistical utitlites used. +""" +__all__ = ['hotelling', 'lpls_qvals'] +__docformat__ = 'restructuredtext en' + +from math import sqrt as msqrt + +from numpy import dot,empty,zeros,eye,median,sign,arange,argsort +from numpy.linalg import svd,inv,det +from numpy.random import shuffle + +from crossvalidation import lpls_jk +from engines import nipals_lpls as lpls + + +def hotelling(Pcv, P, p_center='median', cov_center=median, + alpha=0.3, crot=True, strict=False): + """Returns regularized hotelling T^2. + + Hotelling, is a generalization of Student's t statistic that is + used in multivariate hypothesis testing. In order to avoid small variance + samples to become significant this version allows borrowing variance + from the pooled covariance. + + *Parameters*: + + Pcv : {array} + Crossvalidation segements of paramter + P : {array} + Calibration model paramter + p_center : {'median', 'mean', 'cal_model'}, optional + Location method for sub-segments + cov_center : {py_func}, optional + Location function + alpha : {float}, optional + Regularisation towards pooled covariance estimate. + crot : {boolean}, optional + Rotate sub-segments toward calibration model. + strict : {boolean}, optional + Only rotate 90 degree + + *Returns*: + + tsq : {array} + Hotellings T^2 estimate + + *Reference*: + + Gidskehaug et al., A framework for significance analysis of + gene expression datausing dimension reduction methods, BMC + bioinformatics, 2007 + + *Notes* + + The rotational freedom in the solution of bilinear + models may require that a rotation onto the calibration + model. One way of doing that is procrustes rotation. + + """ + m, n = P.shape + n_sets, n, amax = Pcv.shape + T_sq = empty((n,), dtype='d') + Cov_i = zeros((n, amax, amax), dtype='d') + + # rotate sub_models to full model + if crot: + for i, Pi in enumerate(Pcv): + Pcv[i] = procrustes(P, Pi, strict=strict) + + # center of pnull + if p_center=='median': + P_ctr = median(Pcv) + elif p_center=='mean': + P_ctr = Pcv.mean(0) + else: # calibration model + P_ctr = P + + for i in xrange(n): + Pi = Pcv[:,i,:] # (n_sets x amax) + Pi_ctr = P_ctr[i,:] # (1 x amax) + Pim = (Pi - Pi_ctr)*msqrt(n_sets-1) + Cov_i[i] = (1./n_sets)*dot(Pim.T, Pim) + + Cov = cov_center(Cov_i) + reg_cov = (1. - alpha)*Cov_i + alpha*Cov + for i in xrange(n): + Pc = P_ctr[i,:] + sigma = reg_cov[i] + T_sq[i] = dot(dot(Pc, inv(sigma)), Pc) + return T_sq + +def procrustes(a, b, strict=True, center=False, verbose=False): + """Orthogonal rotation of b to a. + + Procrustes rotation is an orthogonal rotoation of one subspace + onto another by minimising the squared error. + + *Parameters*: + + a : {array} + Input array + b : {array} + Input array + strict : {boolean} + Only do flipping and shuffling + center : {boolean} + Center before rotation, translate back after + verbose : {boolean} + Show sum of squares + + *Returns*: + + b_rot : {array} + B-matrix rotated + + *Reference*: + + Schonemann, A generalized solution of the orthogonal Procrustes problem, + Psychometrika, 1966 + """ + + if center: + mn_a = a.mean(0) + a = a - mn_a + mn_b = b.mean(0) + b = b - mn_b + + u, s, vt = svd(dot(b.T, a)) + Cm = dot(u, vt) # Cm: orthogonal rotation matrix + if strict: + Cm = _ensure_strict(Cm) + b_rot = dot(b, Cm) + if verbose: + print Cm.round() + fit = sum(ravel(b - b_rot)**2) + print "Error: %.3E" %fit + if center: + return mn_b + b_rot + else: + return b_rot + +def _ensure_strict(C, only_flips=True): + """Ensure that a rotation matrix does only 90 degree rotations. + + In multiplication with pcs this allows flips and reordering. + if only_flips is True there will onlt be flips allowed + + *Parameters*: + + C : {array} + Rotation matrix + only_flips : {boolean} + Only accept columns to flip (switch signs) + + *Returns*: + + C_rot : {array} + Restricted rotation matrix + + *Notes*: + + This function is not ready for use. Use (only_flips=True) + + """ + if only_flips: + C = eye(Cm.shape[0])*sign(Cm) + return C + Cm = zeros(C.shape, dtype='d') + Cm[abs(C)>.6] = 1. + if det(Cm)>1: + raise NotImplementedError + return Cm*S + +def lpls_qvals(X, Y, Z, aopt=None, alpha=.3, zx_alpha=.5, n_iter=20, + sim_method='shuffle',p_center='med', cov_center=median, + crot=True,strict=False,mean_ctr=[2,0,2], nsets=None): + + """Returns qvals for l-pls model by permutation analysis. + + The response (Y) is randomly permuted, and the number of false positives + is registered by comparing hotellings T2 statistics of the calibration model. + + *Parameters*: + + X : {array} + Main data matrix (m, n) + Y : {array} + External row data (m, l) + Z : {array} + External column data (k, n) + aopt : {integer} + Optimal number of components + alpha : {float}, optional + Parameter to control the amount of influence from Z-matrix. + 0 is none, which returns a pls-solution, 1 is max + mean_center : {array-like}, optional + A three element array-like structure with elements in [-1,0,1,2], + that decides the type of centering used. + -1 : nothing + 0 : row center + 1 : column center + 2 : double center + n_iter : {integer}, optional + Number of permutations + sim_method : ['shuffle'], optional + Permutation method + p_center : {'median', 'mean', 'cal_model'}, optional + Location method for sub-segments + cov_center : {py_func}, optional + Location function + alpha : {float}, optional + Regularisation towards pooled covariance estimate. + crot : {boolean}, optional + Rotate sub-segmentss toward calibration model. + strict : {boolean}, optional + Only rotate 90 degree + + nsets : {integer} + Number of crossvalidation segements + + *Reference*: + + Gidskehaug et al., A framework for significance analysis of + gene expression data using dimension reduction methods, BMC + bioinformatics, 2007 + """ + + m, n = X.shape + k, nz = Z.shape + assert(n==nz) + try: + my, l = Y.shape + except: + # make Y a column vector + Y = atleast_2d(Y).T + my, l = Y.shape + assert(m==my) + + pert_tsq_x = zeros((n, n_iter), dtype='d') # (nxvars x n_subsets) + pert_tsq_z = zeros((k, n_iter), dtype='d') # (nzvars x n_subsets) + + # Full model + dat = lpls(X, Y, Z, aopt, scale='loads', mean_ctr=mean_ctr) + Wc, Lc = lpls_jk(X, Y, Z ,aopt) + cal_tsq_x = hotelling(Wc, dat['W'], alpha=alpha) + cal_tsq_z = hotelling(Lc, dat['L'], alpha=alpha) + + # Perturbations + index = arange(m) + for i in range(n_iter): + indi = index.copy() + shuffle(indi) + dat = lpls(X, Y[indi,:], Z, aopt, scale='loads', mean_ctr=mean_ctr) + Wi, Li = lpls_jk(X, Y[indi,:], Z, aopt, nsets=nsets) + pert_tsq_x[:,i] = hotelling(Wi, dat['W'], alpha=alpha) + pert_tsq_z[:,i] = hotelling(Li, dat['L'], alpha=alpha) + + return _fdr(cal_tsq_z, pert_tsq_z, median), _fdr(cal_tsq_x, pert_tsq_x, median) + +def _fdr(tsq, tsqp, loc_method=median): + """Returns false discovery rate. + + Fdr is a method used in multiple hypothesis testing to correct for multiple + comparisons. It controls the expected proportion of incorrectly rejected null + hypotheses (type I errors) in a list of rejected hypotheses. + + *Parameters*: + + tsq : {array} + Hotellings T2, calibration model + tsqp : {array} + Hotellings T2, submodels + + loc_method : {py_func} + Location method + + *Returns*: + + fdr : {array} + False discovery rate + + """ + n, = tsq.shape + k, m = tsqp.shape + assert(n==k) + n_false = empty((n, m), 'd') + sort_index = argsort(tsq)[::-1] + r_index = argsort(sort_index) + for i in xrange(m): + for j in xrange(n): + n_false[j,i] = (tsqp[:,i]>tsq[j]).sum() + fp = loc_method(n_false.T) + n_signif = (arange(n) + 1.0)[r_index] + fd_rate = fp/n_signif + fd_rate[fd_rate>1] = 1 + return fd_rate diff --git a/tests/test_lplsengine.py b/tests/test_lplsengine.py new file mode 100644 index 0000000..12ab0c5 --- /dev/null +++ b/tests/test_lplsengine.py @@ -0,0 +1,143 @@ +"""Testing routines for the lpls engine. +""" +from math import sqrt as msqrt + +from numpy.testing import * +set_package_path() +from lplslib import lpls +from numpy import dot, eye, random,asarray,empty +from numpy.random import rand, randn +from numpy.linalg import svd,norm +restore_path() + +def blm_array(shape=(5,10), comp=3, noise=0,seed=1,dtype='d'): + assert(min(*shape)>=comp) + random.seed(seed) + t = rand(shape[0], comp) + p = rand(shape[1], comp) + x = dot(t, p.T) + if noise>0: + noise = noise*randn(*shape) + return x + noise + + +class LplsTestCase(NumpyTestCase): + def setUp(self): + self.a = blm_array(shape=(5,10),noise=.1) + self.b = blm_array(shape=(5,3), noise=.1) + self.c = blm_array(shape=(10,10), noise=.1) + self.nc = 2 + + def check_single(self): + self.a = asarray(self.a, dtype='f') + self.b = asarray(self.b, dtype='f') + self.c = asarray(self.c, dtype='f') + self.do() + + def check_double(self): + a = asarray(self.a, dtype='d') + b = asarray(self.b, dtype='d') + c = asarray(self.c, dtype='d') + self.do() + + def do(self,*args): + pass + #raise NotImplementedError + +class testAlphaZero(LplsTestCase): + def do(self): + #dat = lpls(self.a, self.b, self.c, self.nc, alpha=0.0) + + #assert_almost_equal(t1, t2) + pass + +class testAlphaOne(LplsTestCase): + pass + +class testZidentity(LplsTestCase): + def do(self): + I = eye(self.a.shape[1]) + dat = lpls(self.a, self.b, I, 2, alpha=1.0) + dat2 = lpls(self.a, self.b, self.c, self.nc, alpha=0.0) + assert_almost_equal(dat['T'], dat2['T']) + +class testYidentity(LplsTestCase): + def do(self): + I = eye(self.b.shape[0], dtype=self.a.dtype) + T0 = lpls(self.a, I, self.c, self.nc, alpha=0.0, mean_ctr=[-1,-1,-1])['T'] + u, s, vt = svd(self.a, 0) + T = u*s + assert_almost_equal(abs(T0), abs(T[:,:self.nc]),5) + +class testWideX(LplsTestCase): + pass + +class testTallX(LplsTestCase): + pass + +class testWideY(LplsTestCase): + pass + +class testTallY(LplsTestCase): + pass + +class testWideZ(LplsTestCase): + pass + +class testTallZ(LplsTestCase): + pass + +class testRankDeficientX(LplsTestCase): + pass + +class testRankDeficientY(LplsTestCase): + pass + +class testRankDeficientZ(LplsTestCase): + pass + +class testCenterX(LplsTestCase): + def do(self): + T = lpls(self.a, self.b, self.c, self.nc, mean_ctr=[0,-1,-1])['T'] + assert_almost_equal(T.mean(0), 0) + W = lpls(self.a, self.b, self.c, self.nc, alpha=0,mean_ctr=[1,-1,-1])['W'] + assert_almost_equal(W.mean(0), 0) + + +class testResiduals(NumpyTestCase): + def setUp(self): + self.a = blm_array(shape=(5,5),noise=0, comp=3) + self.b = self.a.copy() + self.c = self.a.copy().T + self.nc = 3 + + def check_single(self): + self.a = asarray(self.a, dtype='f') + self.b = asarray(self.b, dtype='f') + self.c = asarray(self.c, dtype='f') + self.do() + + def check_double(self): + a = asarray(self.a, dtype='d') + b = asarray(self.b, dtype='d') + c = asarray(self.c, dtype='d') + self.do() + + def do(self): + dat = lpls(self.a, self.b, self.c, self.nc, mean_ctr=[-1,-1,-1]) + + +class testOrthogonality(LplsTestCase): + def do(self): + dat = lpls(self.a, self.b, self.c, self.nc, mean_ctr=[0,0,0],scale='loads') + T, W, L, E, F = dat['T'],dat['W'],dat['L'],dat['E'],dat['F'] + assert_almost_equal(dot(T.T,T), eye(T.shape[1])) + for i,w in enumerate(W.T): + W[:,i] = w/norm(w) + assert_almost_equal(dot(W.T, W), eye(W.shape[1]), 3) + assert_almost_equal(dot(T.T,E), 0, 3) + assert_almost_equal(dot(T.T,F), 0, 3) + + +if __name__ == '__main__': + NumpyTest().run()