"""Module contain algorithms for low-rank models. There is almost no typechecking of any kind here, just focus on speed """ import math from scipy.linalg import svd,inv from scipy import dot,empty,eye,newaxis,zeros,sqrt,diag,\ apply_along_axis,mean,ones,randn,empty_like,outer,c_,\ rand,sum,cumsum,matrix has_sym=True try: import symmeig except: has_sym = False def pca(a, aopt, scale='scores', mode='normal'): """ Principal Component Analysis model mode: -- fast : returns smallest dim scaled (T for n<=m, P for n>m ) -- normal : returns all model params and residuals after aopt comp -- detailed : returns all model params and all residuals """ m, n = a.shape #print "rows: %s cols: %s" %(m,n) if m>(n+100) or n>(m+100): u, s, v = esvd(a) else: u, s, vt = svd(a, 0) v = vt.T eigvals = (1./m)*s T = u*s T = T[:,:aopt] P = v[:,:aopt] if scale=='loads': tnorm = apply_along_axis(vnorm, 0, T) T = T/tnorm P = P*tnorm if mode == 'fast': return {'T':T, 'P':P} if mode=='detailed': """Detailed mode returns residual matrix for all comp. That is E, is a three-mode matrix: (amax, m, n) """ E = empty((aopt, m, n)) for ai in range(aopt): e = a - dot(T[:,:ai+1], P[:,:ai+1].T) E[ai,:,:] = e.copy() else: E = a - dot(T,P.T) return {'T':T, 'P':P, 'E':E} def pcr(a, b, aopt=2, scale='scores', mode='normal'): """Principal Component Regression. Returns """ m, n = m_shape(a) B = empty((aopt, n, l)) dat = pca(a, aopt=aopt, scale=scale, mode='normal', center_axis=0) T = dat['T'] weigths = apply_along_axis(vnorm, 0, T) if scale=='loads': # fixme: check weights Q = dot(b.T, T*weights) else: Q = dot(b.T, T/weights**2) if mode=='fast': return {'T', T:, 'P':P, 'Q':Q} if mode=='detailed': for i in range(1, aopt+1, 1): F[i,:,:] = b - dot(T[:,i],Q[:,:i].T) else: F = b - dot(T, Q.T) #fixme: explained variance in Y + Y-var leverages dat.update({'Q',Q, 'F':F}) return dat def pls(a, b, aopt=2, scale='scores', mode='normal', ab=None): """Partial Least Squares Regression. Applies plsr to given matrices and returns results in a dictionary. Fast pls for calibration. Only inefficient for many Y-vars. """ m, n = a.shape if ab!=None: mm, ll = m_shape(ab) else: k, l = m_shape(b) assert(m==mm) assert(l==ll) W = empty((n, aopt)) P = empty((n, aopt)) R = empty((n, aopt)) Q = empty((l, aopt)) T = empty((m, aopt)) B = empty((aopt, n, l)) if ab==None: ab = dot(a.T, b) for i in range(aopt): if ab.shape[1]==1: w = ab.reshape(n, l) else: u, s, vh = svd(dot(ab.T, ab)) w = dot(ab, u[:,:1]) w = w/vnorm(w) r = w.copy() if i>0: # recursive estimate to for j in range(0, i, 1): r = r - dot(P[:,j].T, w)*R[:,j][:,newaxis] t = dot(a, r) tt = vnorm(t)**2 p = dot(a.T, t)/tt q = dot(r.T, ab).T/tt ab = ab - dot(p, q.T)*tt T[:,i] = t.ravel() W[:,i] = w.ravel() P[:,i] = p.ravel() R[:,i] = r.ravel() if mode=='fast' and i==aopt-1: if scale=='loads': tnorm = apply_along_axis(vnorm, 0, T) T = T/tnorm W = W*tnorm return {'T':T, 'W':W} Q[:,i] = q.ravel() B[i] = dot(R[:,:i+1], Q[:,:i+1].T) if mode=='detailed': E = empty((aopt, m, n)) F = empty((aopt, k, l)) for i in range(1, aopt+1, 1): E[i-1] = a - dot(T[:,:i], P[:,:i].T) F[i-1] = b - dot(T[:,:i], Q[:,:i].T) else: E = a - dot(T[:,:aopt], P[:,:aopt].T) F = b - dot(T[:,:aopt], Q[:,:aopt].T) if scale=='loads': tnorm = apply_along_axis(vnorm, 0, T) T = T/tnorm W = W*tnorm Q = Q*tnorm P = P*tnorm return {'B':B, 'Q':Q, 'P':P, 'T':T, 'W':W, 'R':R, 'E':E, 'F':F} def w_simpls(aat, b, aopt): """ Simpls for wide matrices. Fast pls for crossval, used in calc rmsep for wide X There is no P or W. T is normalised """ bb = b.copy() m, m = aat.shape U = empty((m, aopt)) T = empty((m, aopt)) H = empty((m, aopt)) #just like W in simpls PROJ = empty((m, aopt)) #just like R in simpls for i in range(aopt): u, s, vh = svd(dot(dot(b.T, aat), b), full_matrices=0) u = dot(b, u[:,:1]) #y-factor scores U[:,i] = u.ravel() t = dot(aat, u) t = t/vnorm(t) T[:,i] = t.ravel() h = dot(aat, t) #score-weights H[:,i] = h.ravel() PROJ[:,:i+1] = dot(T[:,:i+1], inv(dot(T[:,:i+1].T, H[:,:i+1])) ) if i<aopt: b = b - dot(PROJ[:,:i+1], dot(H[:,:i+1].T,b) ) C = dot(bb.T, T) return {'T':T, 'U':U, 'Q':C, 'H':H} def bridge(a, b, aopt, scale='scores', mode='normal', r=0): """Undeflated Ridged svd(X'Y) """ m, n = m_shape(a) k, l = m_shape(b) u, s, vt = svd(b, full_matrices=0) g0 = dot(u*s, u.T) g = (1 - r)*g0 + r*eye(m) ag = dot(a.T, g) u, s, vt = svd(ag, full_matrices=0) W = u[:,:aopt] K = vt[:aopt,:].T T = dot(a, W) tnorm = apply_along_axis(vnorm, 0, T) # norm of T-columns if mode == 'fast': if scale=='loads': T = T/tnorm W = W*tnorm return {'T':T, 'W':W} U = dot(g0, K) #fixme check this Q = dot(b.T, dot(T, inv(dot(T.T, T)) )) B = zeros((aopt, n, l), dtype='f') for i in range(aopt): B[i] = dot(W[:,:i+1], Q[:,:i+1].T) if mode == 'detailed': E = empty((aopt, m, n)) F = empty((aopt, k, l)) for i in range(aopt): E[i] = a - dot(T[:,:i+1], W[:,:i+1].T) F[i] = b - dot(a, B[i]) else: #normal F = b - dot(a, B[-1]) E = a - dot(T, W.T) # leverages # fixme: probably need an orthogonal basis for row-space leverage # T (scores) are not orthogonal # Using a qr decomp to get an orthonormal basis for row-space #Tq = qr(T)[0] #s_lev,v_lev = leverage(aopt,Tq,W) # explained variance #var_x, exp_var_x = variances(a,T,W) #qnorm = apply_along_axis(norm, 0, Q) #var_y, exp_var_y = variances(b,U,Q/qnorm) if scale=='loads': T = T/tnorm W = W*tnorm Q = Q*tnorm return {'B':B, 'W':W, 'T':T, 'Q':Q, 'E':E, 'F':F, 'U':U, 'P':W} def nipals_lpls(X, Y, Z, a_max, alpha=.7, mean_ctr=[2, 0, 1], mode='normal', scale='scores', verbose=False): """ L-shaped Partial Least Sqaures Regression by the nipals algorithm. (X!Z)->Y :input: X : data matrix (m, n) Y : data matrix (m, l) Z : data matrix (n, o) :output: T : X-scores W : X-weights/Z-weights P : X-loadings Q : Y-loadings U : X-Y relation L : Z-scores K : Z-loads B : Regression coefficients X->Y b0: Regression coefficient intercept evx : X-explained variance evy : Y-explained variance evz : Z-explained variance :Notes: """ if mean_ctr!=None: xctr, yctr, zctr = mean_ctr X, mnX = center(X, xctr) Y, mnY = center(Y, xctr) Z, mnZ = center(Z, zctr) print Z.mean(1) varX = pow(X, 2).sum() varY = pow(Y, 2).sum() varZ = pow(Z, 2).sum() m, n = X.shape k, l = Y.shape u, o = Z.shape # initialize U = empty((k, a_max)) Q = empty((l, a_max)) T = empty((m, a_max)) W = empty((n, a_max)) P = empty((n, a_max)) K = empty((o, a_max)) L = empty((u, a_max)) var_x = empty((a_max,)) var_y = empty((a_max,)) var_z = empty((a_max,)) for a in range(a_max): if verbose: print "\n Working on comp. %s" %a u = Y[:,:1] diff = 1 MAX_ITER = 100 lim = 1e-5 niter = 0 while (diff>lim and niter<MAX_ITER): niter += 1 u1 = u.copy() w = dot(X.T, u) w = w/sqrt(dot(w.T, w)) l = dot(Z, w) k = dot(Z.T, l) k = k/sqrt(dot(k.T, k)) w = alpha*k + (1-alpha)*w w = w/sqrt(dot(w.T, w)) t = dot(X, w) c = dot(Y.T, t) c = c/sqrt(dot(c.T, c)) u = dot(Y, c) diff = abs(u1 - u).max() if verbose: print "Converged after %s iterations" %niter tt = dot(t.T, t) p = dot(X.T, t)/tt q = dot(Y.T, t)/tt l = dot(Z, w) U[:,a] = u.ravel() W[:,a] = w.ravel() P[:,a] = p.ravel() T[:,a] = t.ravel() Q[:,a] = q.ravel() L[:,a] = l.ravel() K[:,a] = k.ravel() X = X - dot(t, p.T) Y = Y - dot(t, q.T) Z = (Z.T - dot(w, l.T)).T var_x[a] = pow(X, 2).sum() var_y[a] = pow(Y, 2).sum() var_z[a] = pow(Z, 2).sum() B = dot(dot(W, inv(dot(P.T, W))), Q.T) b0 = mnY - dot(mnX, B) # variance explained evx = 100.0*(1 - var_x/varX) evy = 100.0*(1 - var_y/varY) evz = 100.0*(1 - var_z/varZ) if scale=='loads': tnorm = apply_along_axis(vnorm, 0, T) T = T/tnorm W = W*tnorm Q = Q*tnorm knorm = apply_along_axis(vnorm, 0, K) L = L*knorm K = K/knorm return {'T':T, 'W':W, 'P':P, 'Q':Q, 'U':U, 'L':L, 'K':K, 'B':B, 'b0':b0, 'evx':evx, 'evy':evy, 'evz':evz} ########### Helper routines ######### def m_shape(array): return matrix(array).shape def esvd(data): """SVD with the option of economy sized calculation Calculate subspaces of X'X or XX' depending on the shape of the matrix. Good for extreme fat or thin matrices :notes: Numpy supports this by setting full_matrices=0 """ m, n = data.shape if m>=n: kernel = dot(data.T, data) u, s, vt = svd(kernel) u = dot(data, vt.T) v = vt.T for i in xrange(n): s[i] = vnorm(u[:,i]) u[:,i] = u[:,i]/s[i] else: kernel = dot(data, data.T) #data = (data + data.T)/2.0 u, s, vt = svd(kernel) v = dot(u.T, data) for i in xrange(m): s[i] = vnorm(v[i,:]) v[i,:] = v[i,:]/s[i] return u, s, v.T def vnorm(x): # assume column arrays (or vectors) return math.sqrt(dot(x.T, x)) def center(a, axis): # 0 = col center, 1 = row center, 2 = double center # -1 = nothing if axis==-1: mn = zeros((a.shape[1],)) elif axis==0: mn = a.mean(0) elif axis==1: mn = a.mean(1)[:,newaxis] elif axis==2: mn = a.mean(0) + a.mean(1)[:,newaxis] - a.mean() else: raise IOError("input error: axis must be in [-1,0,1,2]") return a - mn, mn