import sys from pylab import * import matplotlib from scipy import * from scipy.linalg import inv,norm sys.path.append("../../fluents/lib") import select_generators def nipals_lpls(X, Y, Z, a_max, alpha=.7, mean_ctr=[2, 0, 1], scale='scores', verbose=True): """ L-shaped Partial Least Sqaures Regression by the nipals algorithm. (X!Z)->Y :input: X : data matrix (m, n) Y : data matrix (m, l) Z : data matrix (n, o) :output: T : X-scores W : X-weights/Z-weights P : X-loadings Q : Y-loadings U : X-Y relation L : Z-scores K : Z-loads B : Regression coefficients X->Y b0: Regression coefficient intercept evx : X-explained variance evy : Y-explained variance evz : Z-explained variance :Notes: """ if mean_ctr: xctr, yctr, zctr = mean_ctr X, mnX = center(X, xctr) Y, mnY = center(Y, xctr) Z, mnZ = center(Z, zctr) varX = pow(X, 2).sum() varY = pow(Y, 2).sum() varZ = pow(Z, 2).sum() m, n = X.shape k, l = Y.shape u, o = Z.shape # initialize U = empty((k, a_max)) Q = empty((l, a_max)) T = empty((m, a_max)) W = empty((n, a_max)) P = empty((n, a_max)) K = empty((o, a_max)) L = empty((u, a_max)) B = empty((a_max, n, l)) b0 = empty((a_max, m, l)) var_x = empty((a_max,)) var_y = empty((a_max,)) var_z = empty((a_max,)) for a in range(a_max): if verbose: print "\n Working on comp. %s" %a u = Y[:,:1] diff = 1 MAX_ITER = 100 lim = 1e-7 niter = 0 while (diff>lim and niterY :input: X : data matrix (m, n) Y : data matrix (m, l) Z : data matrix (n, o) :output: T : X-scores W : X-weights/Z-weights P : X-loadings Q : Y-loadings U : X-Y relation L : Z-scores K : Z-loads B : Regression coefficients X->Y b0: Regression coefficient intercept evx : X-explained variance evy : Y-explained variance evz : Z-explained variance :Notes: Not quite there ,,,,,,,,,,,,,, """ if mean_ctr: xctr, yctr, zctr = mean_ctr X, mnX = center(X, xctr) Y, mnY = center(Y, xctr) Z, mnZ = center(Z, zctr) varX = pow(X, 2).sum() varY = pow(Y, 2).sum() varZ = pow(Z, 2).sum() m, n = X.shape k, l = Y.shape u, o = Z.shape # initialize U = empty((k, a_max)) Q = empty((l, a_max)) T = empty((m, a_max)) W = empty((n, a_max)) P = empty((n, a_max)) K = empty((o, a_max)) L = empty((u, a_max)) var_x = empty((a_max,)) var_y = empty((a_max,)) var_z = empty((a_max,)) for a in range(a_max): if verbose: print "\n Working on comp. %s" %a xyz = dot(dot(Z,X.T),Y) u,s,vt = linalg.svd(xyz, 0) w = u[:,o] t = dot(X, w) tt = dot(t.T, t) p = dot(X.T, t)/tt q = dot(Y.T, t)/tt l = dot(Z.T, w) W[:,a] = w.ravel() P[:,a] = p.ravel() T[:,a] = t.ravel() Q[:,a] = q.ravel() L[:,a] = l.ravel() K[:,a] = k.ravel() X = X - dot(t, p.T) Y = Y - dot(t, q.T) Z = (Z.T - dot(w, l.T)).T var_x[a] = pow(X, 2).sum() var_y[a] = pow(Y, 2).sum() var_z[a] = pow(Z, 2).sum() B = dot(dot(W, inv(dot(P.T, W))), Q.T) b0 = mnY - dot(mnX, B) # variance explained evx = 100.0*(1 - var_x/varX) evy = 100.0*(1 - var_y/varY) evz = 100.0*(1 - var_z/varZ) return T, W, P, Q, U, L, K, B, b0, evx, evy, evz def lplsr(X, Y, Z, a_max, mean_ctr=[2,0,1]): """ Haralds LPLS. """ if mean_ctr!=None: xctr, yctr, zctr = mean_ctr X, mnX = center(X, xctr) Y, mnY = center(Y, yctr) Z, mnZ = center(Z, zctr) varX = pow(X, 2).sum() varY = pow(Y, 2).sum() varZ = pow(Z, 2).sum() m, n = X.shape k, l = Y.shape u, o = Z.shape # initialize Wy = empty((l, a_max)) Py = empty((l, a_max)) Ty = empty((m, a_max)) Tz = empty((o, a_max)) Wz = empty((u, a_max)) Pz = empty((u, a_max)) var_x = empty((a_max,)) var_y = empty((a_max,)) var_z = empty((a_max,)) # residuals Ey = Y.copy() Ez = Z.copy() Ex = X.copy() for i in range(a_max): YtXZ = dot(Ey.T, dot(X, Ez.T)) U, S, V = linalg.svd(YtXZ) wy = U[:,0] print wy wz = V[0,:] ty = dot(Ey, wy) tz = dot(Ez.T, wz) py = dot(Ey.T, ty)/dot(ty.T,ty) pz = dot(Ez, tz)/dot(tz.T,tz) Wy[:,i] = wy Wz[:,i] = wz Ty[:,i] = ty Tz[:,i] = tz Py[:,i] = py Pz[:,i] = pz Ey = Ey - outer(ty, py.T) Ez = (Ez.T - outer(tz, pz.T)).T var_y[i] = pow(Ey, 2).sum() var_z[i] = pow(Ez, 2).sum() tyd = apply_along_axis(norm, 0, Ty) tzd = apply_along_axis(norm, 0, Tz) Tyu = Ty/tyd Tzu = Tz/tzd C = dot(dot(Tyu.T, X), Tzu) for i in range(a_max): Ex = Ex - dot(dot(Ty[:,:i+1],C[:i+1,:i+1]), Tz[:,:i+1].T) var_x[i] = pow(Ex,2).sum() # variance explained print "var_x:" print var_x print "varX total:" print varX evx = 100.0*(1 - var_x/varX) evy = 100.0*(1 - var_y/varY) evz = 100.0*(1 - var_z/varZ) return Ty, Tz, Wy, Wz, Py, Pz, C, Ey, Ez, Ex, evx, evy, evz def bifpls(X, Y, Z, a_max, alpha): """Swedssihsh LPLS by nipals. """ u = X[:,0] Ey = Y.copy() Ez = Z.copy() for i in range(100): w = dot(X.T,u) w = w/vnorm(w) t = dot(X, w) q = dot(Ey, t.T)/dot(t.T,t) qnorm = vnorm(q) q = q/qnorm v = dot(Ez, q) s = dot(Ez.T, v)/dot(v.T,v) v = v*vnorm(s) s = s/vnorm(s) c = qnorm*(alpha*q + (1-alpha)*s) u = dot(Ey, c)/dot(s.T,s) p = dot(X.T, t)/dot(t.T,t) v2 = dot(Ez, s)/dot(s.T,s) Ey = Ey - dot(t, p.T) Ez = Ez - dot(v2, c.T) # variance explained evx = 100.0*(1 - var_x/varX) evy = 100.0*(1 - var_y/varY) evz = 100.0*(1 - var_z/varZ) def center(a, axis): # 0 = col center, 1 = row center, 2 = double center # -1 = nothing if axis==-1: mn = zeros((a.shape[1],)) return a - mn, mn elif axis==0: mn = a.mean(0) return a - mn, mn elif axis==1: mn = a.mean(1)[:,newaxis] return a - mn , mn elif axis==2: mn = a.mean(0) + a.mean(1)[:,newaxis] - a.mean() return a - mn, mn else: raise IOError("input error: axis must be in [-1,0,1,2]") def correlation_loadings(D, T, P, test=True): """ Returns correlation loadings. :input: - D: [nsamps, nvars], data (non-centered data) - T: [nsamps, a_max], Scores - P: [nvars, a_max], Loadings :ouput: - Rloads: [nvars, a_max], Correlation loadings - rmseVars: [nvars], scaling coeff. for each var in D :notes: - FIXME: Calculation is not valid .... using corrceof instead """ nsamps, nvars = D.shape nsampsT, a_max = T.shape nvarsP, a_maxP = P.shape if nsamps!=nsampsT: raise IOError("D/T mismatch") if a_max!=a_maxP: raise IOError("a_max mismatch") if nvars!=nvarsP: raise IOError("D/P mismatch") #init Rloads = empty((nvars, a_max), 'd') stdvar = stats.std(D, 0) rmseVars = sqrt(nsamps-1)*stdvar # center D = D - D.mean(0) TT = diag(dot(T.T, T)) sTT = sqrt(TT) for a in range(a_max): Rloads[:,a] = sTT[a]*P[:,a]/rmseVars R = empty_like(Rloads) for a in range(a_max): for k in range(nvars): r = corrcoef(D[:,k], T[:,a]) R[k,a] = r[0,1] #Rloads = R return Rloads, R, rmseVars def cv_lpls(X, Y, Z, a_max=2, nsets=None,alpha=.5, mean_ctr=[2,0,1]): """Performs crossvalidation to get generalisation error in lpls""" cv_iter = select_generators.pls_gen(X, Y, n_blocks=nsets,center=False,index_out=True) k, l = Y.shape Yhat = empty((a_max,k,l), 'd') for i, (xcal,xi,ycal,yi,ind) in enumerate(cv_iter): T, W, P, Q, U, L, K, B, b0, evx, evy, evz = nipals_lpls(xcal,ycal,Z, a_max=a_max, alpha=alpha, mean_ctr=mean_ctr, verbose=False) for a in range(a_max): Yhat[a,ind,:] = b0[a][0][0] + dot(xi, B[a]) Yhat_class = zeros_like(Yhat) for a in range(a_max): for i in range(k): Yhat_class[a,i,argmax(Yhat[a,i,:])]=1.0 class_err = 100*((Yhat_class+Y)==2).sum(1)/Y.sum(0).astype('d') sep = (Y - Yhat)**2 rmsep = sqrt(sep.mean(1)) return rmsep, Yhat, class_err def jk_lpls(X, Y, Z, a_max, nsets=None, alpha=.5, mean_ctr=[2,0,1]): cv_iter = select_generators.pls_gen(X, Y, n_blocks=nsets,center=False,index_out=False) m, n = X.shape k, l = Y.shape o, p = Z.shape if nsets==None: nsets = m WWx = empty((nsets, n, a_max), 'd') WWz = empty((nsets, o, a_max), 'd') WWy = empty((nsets, l, a_max), 'd') for i, (xcal,xi,ycal,yi) in enumerate(cv_iter): T, W, P, Q, U, L, K, B, b0, evx, evy, evz = nipals_lpls(xcal,ycal,Z, a_max=a_max, alpha=alpha, mean_ctr=mean_ctr, scale='loads', verbose=False) WWx[i,:,:] = W WWz[i,:,:] = L WWy[i,:,:] = Q return WWx, WWz, WWy