"""Module contain algorithms for low-rank models.
There is almost no typechecking of any kind here, just focus on speed
import math
from scipy.linalg import svd,inv
from scipy import dot,empty,eye,newaxis,zeros,sqrt,diag,\
rand,sum,cumsum,matrix, expand_dims,minimum,where
from symeig import symeig
has_sym = False
def pca(a, aopt,scale='scores',mode='normal',center_axis=0):
""" Principal Component Analysis.
Performs PCA on given matrix and returns results in a dictionary.
a : array
Data measurement matrix, (samples x variables)
aopt : int
Number of components to use, aopt<=min(samples, variables)
results : dict
keys -- values, T -- scores, P -- loadings, E -- residuals,
lev --leverages, ssq -- sum of squares, expvar -- cumulative
explained variance, aopt -- number of components used
mode : str
Amount of info retained, ('fast', 'normal', 'detailed')
center_axis : int
Center along given axis. If neg.: no centering (-inf,..., matrix modes)
- pcr : other blm
- pls : other blm
- lpls : other blm
Uses kernel speed-up if m>>n or m<<n.
If residuals turn rank deficient, a lower number of component than given
in input will be used. The number of components used is given in
>>> import scipy,engines
>>> a=scipy.asarray([[1,2,3],[2,4,5]])
>>> dat=engines.pca(a, 2)
>>> dat['expvarx']
array([0.,99.8561562, 100.])
if center_axis>=0:
a = a - expand_dims(a.mean(center_axis), center_axis)
m, n = a.shape
if m>(n+100) or n>(m+100):
u, e, v = esvd(a)
s = sqrt(e)
u, s, vt = svd(a, 0)
v = vt.T
e = s**2
tol = 1e-10
eff_rank = sum(s>s[0]*tol)
aopt = minimum(aopt, eff_rank)
T = u*s
s = s[:aopt]
T = T[:,:aopt]
P = v[:,:aopt]
if scale=='loads':
T = T/s
P = P*s
if mode == 'fast':
return {'T':T, 'P':P, 'aopt':aopt}
if mode=='detailed':
E = empty((aopt, m, n))
ssq = []
lev = []
for ai in range(aopt):
E[ai,:,:] = a - dot(T[:,:ai+1], P[:,:ai+1].T)
ssq.append([(E[ai,:,:]**2).sum(0), (E[ai,:,:]**2).sum(1)])
if scale=='loads':
lev.append([((s*T)**2).sum(1), (P**2).sum(1)])
lev.append([(T**2).sum(1), ((s*P)**2).sum(1)])
# residuals
E = a - dot(T, P.T)
SEP = E**2
ssq = [SEP.sum(0), SEP.sum(1)]
# leverages
if scale=='loads':
lev = [(1./m)+(T**2).sum(1), (1./n)+((P/s)**2).sum(1)]
lev = [(1./m)+((T/s)**2).sum(1), (1./n)+(P**2).sum(1)]
# variances
expvarx = r_[0, 100*e.cumsum()/e.sum()][:aopt+1]
return {'T':T, 'P':P, 'E':E, 'expvarx':expvarx, 'levx':lev, 'ssqx':ssq, 'aopt':aopt}
def pcr(a, b, aopt, scale='scores',mode='normal',center_axis=0):
""" Principal Component Regression.
Performs PCR on given matrix and returns results in a dictionary.
a : array
Data measurement matrix, (samples x variables)
b : array
Data response matrix, (samples x responses)
aopt : int
Number of components to use, aopt<=min(samples, variables)
results : dict
keys -- values, T -- scores, P -- loadings, E -- residuals,
levx -- leverages, ssqx -- sum of squares, expvarx -- cumulative
explained variance, aopt -- number of components used
mode : str
Amount of info retained, ('fast', 'normal', 'detailed')
center_axis : int
Center along given axis. If neg.: no centering (-inf,..., matrix modes)
- pca : other blm
- pls : other blm
- lpls : other blm
Uses kernel speed-up if m>>n or m<<n.
If residuals turn rank deficient, a lower number of component than given
in input will be used. The number of components used is given in results-dict.
>>> import scipy,engines
>>> a=scipy.asarray([[1,2,3],[2,4,5]])
>>> b=scipy.asarray([[1,1],[2,3]])
>>> dat=engines.pcr(a, 2)
>>> dat['expvarx']
array([0.,99.8561562, 100.])
k, l = m_shape(b)
if center_axis>=0:
b = b - expand_dims(b.mean(center_axis), center_axis)
dat = pca(a, aopt=aopt, scale=scale, mode=mode, center_axis=center_axis)
T = dat['T']
weights = apply_along_axis(vnorm, 0, T)**2
if scale=='loads':
Q = dot(b.T, T*weights)
Q = dot(b.T, T/weights)
if mode=='fast':
return dat
if mode=='detailed':
F = empty((aopt, k, l))
for i in range(aopt):
F[i,:,:] = b - dot(T[:,:i+1], Q[:,:i+1].T)
F = b - dot(T, Q.T)
expvary = r_[0, 100*((T**2).sum(0)*(Q**2).sum(0)/(b**2).sum()).cumsum()[:aopt]]
#fixme: Y-var leverages
dat.update({'Q':Q, 'F':F, 'expvary':expvary})
return dat
def pls(a, b, aopt=2, scale='scores', mode='normal', center_axis=0, ab=None):
"""Partial Least Squares Regression.
Performs PLS on given matrix and returns results in a dictionary.
a : array
Data measurement matrix, (samples x variables)
b : array
Data response matrix, (samples x responses)
aopt : int
Number of components to use, aopt<=min(samples, variables)
results : dict
keys -- values, T -- scores, P -- loadings, E -- residuals,
levx -- leverages, ssqx -- sum of squares, expvarx -- cumulative
explained variance of descriptors, expvary -- cumulative explained
variance of responses, aopt -- number of components used
mode : str
Amount of info retained, ('fast', 'normal', 'detailed')
center_axis : int
Center along given axis. If neg.: no centering (-inf,..., matrix modes)
- pca : other blm
- pcr : other blm
- lpls : other blm
Uses kernel speed-up if m>>n or m<<n.
If residuals turn rank deficient, a lower number of component than given
in input will be used. The number of components used is given in results-dict.
>>> import scipy,engines
>>> a=scipy.asarray([[1,2,3],[2,4,5]])
>>> b=scipy.asarray([[1,1],[2,3]])
>>> dat=engines.pls(a, b, 2)
>>> dat['expvarx']
array([0.,99.8561562, 100.])
m, n = m_shape(a)
if ab!=None:
mm, l = m_shape(ab)
k, l = m_shape(b)
if center_axis>=0:
a = a - expand_dims(a.mean(center_axis), center_axis)
b = b - expand_dims(b.mean(center_axis), center_axis)
W = empty((n, aopt))
P = empty((n, aopt))
R = empty((n, aopt))
Q = empty((l, aopt))
T = empty((m, aopt))
B = empty((aopt, n, l))
if ab==None:
ab = dot(a.T, b)
for i in range(aopt):
if ab.shape[1]==1: #pls 1
w = ab.reshape(n, l)
w = w/vnorm(w)
elif n<l: # more yvars than xvars
if has_sym:
s, u = symeig(dot(ab, ab.T),range=[l,l],overwrite=True)
u, s, vh = svd(dot(ab, ab.T))
w = u[:,0]
else: # standard wide xdata
if has_sym:
s, q = symeig(dot(ab.T, ab),range=[l,l],overwrite=True)
q, s, vh = svd(dot(ab.T, ab))
q = q[:,:1]
w = dot(ab, q)
w = w/vnorm(w)
r = w.copy()
if i>0:
for j in range(0, i, 1):
r = r - dot(P[:,j].T, w)*R[:,j][:,newaxis]
print vnorm(r)
t = dot(a, r)
tt = vnorm(t)**2
p = dot(a.T, t)/tt
q = dot(r.T, ab).T/tt
ab = ab - dot(p, q.T)*tt
T[:,i] = t.ravel()
W[:,i] = w.ravel()
if mode=='fast' and i==aopt-1:
if scale=='loads':
tnorm = apply_along_axis(vnorm, 0, T)
T = T/tnorm
W = W*tnorm
return {'T':T, 'W':W}
P[:,i] = p.ravel()
R[:,i] = r.ravel()
Q[:,i] = q.ravel()
B[i] = dot(R[:,:i+1], Q[:,:i+1].T)
if mode=='detailed':
E = empty((aopt, m, n))
F = empty((aopt, k, l))
for i in range(1, aopt+1, 1):
E[i-1] = a - dot(T[:,:i], P[:,:i].T)
F[i-1] = b - dot(T[:,:i], Q[:,:i].T)
E = a - dot(T[:,:aopt], P[:,:aopt].T)
F = b - dot(T[:,:aopt], Q[:,:aopt].T)
if scale=='loads':
tnorm = apply_along_axis(vnorm, 0, T)
T = T/tnorm
W = W*tnorm
Q = Q*tnorm
P = P*tnorm
return {'B':B, 'Q':Q, 'P':P, 'T':T, 'W':W, 'R':R, 'E':E, 'F':F}
def w_simpls(aat, b, aopt):
""" Simpls for wide matrices.
Fast pls for crossval, used in calc rmsep for wide X
There is no P or W. T is normalised
bb = b.copy()
m, m = aat.shape
U = empty((m, aopt)) # W
T = empty((m, aopt))
H = empty((m, aopt)) # R
PROJ = empty((m, aopt)) # P?
for i in range(aopt):
q, s, vh = svd(dot(dot(b.T, aat), b), full_matrices=0)
u = dot(b, q[:,:1]) #y-factor scores
U[:,i] = u.ravel()
t = dot(aat, u)
t = t/vnorm(t)
T[:,i] = t.ravel()
h = dot(aat, t) #score-weights
H[:,i] = h.ravel()
PROJ[:,:i+1] = dot(T[:,:i+1], inv(dot(T[:,:i+1].T, H[:,:i+1])) )
if i<aopt:
b = b - dot(PROJ[:,:i+1], dot(H[:,:i+1].T,b) )
C = dot(bb.T, T)
return {'T':T, 'U':U, 'Q':C, 'H':H}
def w_pls(aat, b, aopt):
""" Pls for wide matrices.
Fast pls for crossval, used in calc rmsep for wide X
There is no P or W. T is normalised
aat = centered kernel matrix
b = centered y
bb = b.copy()
k, l = m_shape(b)
m, m = m_shape(aat)
U = empty((m, aopt)) # W
T = empty((m, aopt))
R = empty((m, aopt)) # R
PROJ = empty((m, aopt)) # P?
for i in range(aopt):
if has_sym:
s, q = symeig(dot(dot(b.T, aat), b), range=(l,l),overwrite=True)
q, s, vh = svd(dot(dot(b.T, aat), b), full_matrices=0)
q = q[:,:1]
u = dot(b , q) #y-factor scores
U[:,i] = u.ravel()
t = dot(aat, u)
print "Norm of t: %s" %vnorm(t)
print "s: %s" %s
t = t/vnorm(t)
T[:,i] = t.ravel()
r = dot(aat, t)#score-weights
#r = r/vnorm(r)
print "Norm R: %s" %vnorm(r)
R[:,i] = r.ravel()
PROJ[:,: i+1] = dot(T[:,:i+1], inv(dot(T[:,:i+1].T, R[:,:i+1])) )
if i<aopt:
b = b - dot(PROJ[:,:i+1], dot(R[:,:i+1].T, b) )
C = dot(bb.T, T)
return {'T':T, 'U':U, 'Q':C, 'R':R}
def bridge(a, b, aopt, scale='scores', mode='normal', r=0):
"""Undeflated Ridged svd(X'Y)
m, n = m_shape(a)
k, l = m_shape(b)
u, s, vt = svd(b, full_matrices=0)
g0 = dot(u*s, u.T)
g = (1 - r)*g0 + r*eye(m)
ag = dot(a.T, g)
u, s, vt = svd(ag, full_matrices=0)
W = u[:,:aopt]
K = vt[:aopt,:].T
T = dot(a, W)
tnorm = apply_along_axis(vnorm, 0, T) # norm of T-columns
if mode == 'fast':
if scale=='loads':
T = T/tnorm
W = W*tnorm
return {'T':T, 'W':W}
U = dot(g0, K) #fixme check this
Q = dot(b.T, dot(T, inv(dot(T.T, T)) ))
B = zeros((aopt, n, l), dtype='f')
for i in range(aopt):
B[i] = dot(W[:,:i+1], Q[:,:i+1].T)
if mode == 'detailed':
E = empty((aopt, m, n))
F = empty((aopt, k, l))
for i in range(aopt):
E[i] = a - dot(T[:,:i+1], W[:,:i+1].T)
F[i] = b - dot(a, B[i])
else: #normal
F = b - dot(a, B[-1])
E = a - dot(T, W.T)
# leverages
# fixme: probably need an orthogonal basis for row-space leverage
# T (scores) are not orthogonal
# Using a qr decomp to get an orthonormal basis for row-space
#Tq = qr(T)[0]
#s_lev,v_lev = leverage(aopt,Tq,W)
# explained variance
#var_x, exp_var_x = variances(a,T,W)
#qnorm = apply_along_axis(norm, 0, Q)
#var_y, exp_var_y = variances(b,U,Q/qnorm)
if scale=='loads':
T = T/tnorm
W = W*tnorm
Q = Q*tnorm
return {'B':B, 'W':W, 'T':T, 'Q':Q, 'E':E, 'F':F, 'U':U, 'P':W}
def nipals_lpls(X, Y, Z, a_max, alpha=.7, mean_ctr=[2, 0, 1], mode='normal', scale='scores', verbose=False):
""" L-shaped Partial Least Sqaures Regression by the nipals algorithm.
X : data matrix (m, n)
Y : data matrix (m, l)
Z : data matrix (n, o)
T : X-scores
W : X-weights/Z-weights
P : X-loadings
Q : Y-loadings
U : X-Y relation
L : Z-scores
K : Z-loads
B : Regression coefficients X->Y
b0: Regression coefficient intercept
evx : X-explained variance
evy : Y-explained variance
evz : Z-explained variance
if mean_ctr!=None:
xctr, yctr, zctr = mean_ctr
X, mnX = center(X, xctr)
Y, mnY = center(Y, xctr)
Z, mnZ = center(Z, zctr)
varX = pow(X, 2).sum()
varY = pow(Y, 2).sum()
varZ = pow(Z, 2).sum()
m, n = X.shape
k, l = Y.shape
u, o = Z.shape
# initialize
U = empty((k, a_max))
Q = empty((l, a_max))
T = empty((m, a_max))
W = empty((n, a_max))
P = empty((n, a_max))
K = empty((o, a_max))
L = empty((u, a_max))
B = empty((a_max, n, l))
b0 = empty((a_max, m, l))
var_x = empty((a_max,))
var_y = empty((a_max,))
var_z = empty((a_max,))
for a in range(a_max):
if verbose:
print "\n Working on comp. %s" %a
u = Y[:,:1]
diff = 1
MAX_ITER = 200
lim = 1e-16
niter = 0
while (diff>lim and niter<MAX_ITER):
niter += 1
u1 = u.copy()
w = dot(X.T, u)
w = w/sqrt(dot(w.T, w))
l = dot(Z, w)
k = dot(Z.T, l)
k = k/sqrt(dot(k.T, k))
w = alpha*k + (1-alpha)*w
w = w/sqrt(dot(w.T, w))
t = dot(X, w)
c = dot(Y.T, t)
c = c/sqrt(dot(c.T, c))
u = dot(Y, c)
diff = abs(u1 - u).max()
if verbose:
print "Converged after %s iterations" %niter
tt = dot(t.T, t)
p = dot(X.T, t)/tt
q = dot(Y.T, t)/tt
l = dot(Z, w)
U[:,a] = u.ravel()
W[:,a] = w.ravel()
P[:,a] = p.ravel()
T[:,a] = t.ravel()
Q[:,a] = q.ravel()
L[:,a] = l.ravel()
K[:,a] = k.ravel()
X = X - dot(t, p.T)
Y = Y - dot(t, q.T)
Z = (Z.T - dot(w, l.T)).T
var_x[a] = pow(X, 2).sum()
var_y[a] = pow(Y, 2).sum()
var_z[a] = pow(Z, 2).sum()
B[a] = dot(dot(W[:,:a+1], inv(dot(P[:,:a+1].T, W[:,:a+1]))), Q[:,:a+1].T)
b0[a] = mnY - dot(mnX, B[a])
# variance explained
evx = 100.0*(1 - var_x/varX)
evy = 100.0*(1 - var_y/varY)
evz = 100.0*(1 - var_z/varZ)
if scale=='loads':
tnorm = apply_along_axis(vnorm, 0, T)
T = T/tnorm
W = W*tnorm
Q = Q*tnorm
knorm = apply_along_axis(vnorm, 0, K)
L = L*knorm
K = K/knorm
return {'T':T, 'W':W, 'P':P, 'Q':Q, 'U':U, 'L':L, 'K':K, 'B':B, 'b0':b0, 'evx':evx, 'evy':evy, 'evz':evz}
def nipals_pls(X, Y, a_max, alpha=.7, ax_center=0, mode='normal', scale='scores', verbose=False):
"""Partial Least Sqaures Regression by the nipals algorithm.
X : data matrix (m, n)
Y : data matrix (m, l)
T : X-scores
W : X-weights/Z-weights
P : X-loadings
Q : Y-loadings
U : X-Y relation
B : Regression coefficients X->Y
b0: Regression coefficient intercept
evx : X-explained variance
evy : Y-explained variance
evz : Z-explained variance
if ax_center>=0:
mn_x = expand_dims(X.mean(ax_center), ax_center)
mn_y = expand_dims(Y.mean(ax_center), ax_center)
X = X - mn_x
Y = Y - mn_y
varX = pow(X, 2).sum()
varY = pow(Y, 2).sum()
m, n = X.shape
k, l = Y.shape
# initialize
U = empty((k, a_max))
Q = empty((l, a_max))
T = empty((m, a_max))
W = empty((n, a_max))
P = empty((n, a_max))
B = empty((a_max, n, l))
b0 = empty((a_max, m, l))
var_x = empty((a_max,))
var_y = empty((a_max,))
t1 = X[:,:1]
for a in range(a_max):
if verbose:
print "\n Working on comp. %s" %a
u = Y[:,:1]
diff = 1
MAX_ITER = 100
lim = 1e-16
niter = 0
while (diff>lim and niter<MAX_ITER):
niter += 1
#u1 = u.copy()
w = dot(X.T, u)
w = w/sqrt(dot(w.T, w))
#l = dot(Z, w)
#k = dot(Z.T, l)
#k = k/sqrt(dot(k.T, k))
#w = alpha*k + (1-alpha)*w
#w = w/sqrt(dot(w.T, w))
t = dot(X, w)
q = dot(Y.T, t)
q = q/sqrt(dot(q.T, q))
u = dot(Y, q)
diff = vnorm(t1 - t)
t1 = t.copy()
if verbose:
print "Converged after %s iterations" %niter
#tt = dot(t.T, t)
#p = dot(X.T, t)/tt
#q = dot(Y.T, t)/tt
#l = dot(Z, w)
p = dot(X.T, t)/dot(t.T, t)
p_norm = vnorm(p)
t = t*p_norm
w = w*p_norm
p = p/p_norm
U[:,a] = u.ravel()
W[:,a] = w.ravel()
P[:,a] = p.ravel()
T[:,a] = t.ravel()
Q[:,a] = q.ravel()
X = X - dot(t, p.T)
Y = Y - dot(t, q.T)
var_x[a] = pow(X, 2).sum()
var_y[a] = pow(Y, 2).sum()
B[a] = dot(dot(W[:,:a+1], inv(dot(P[:,:a+1].T, W[:,:a+1]))), Q[:,:a+1].T)
b0[a] = mn_y - dot(mn_x, B[a])
# variance explained
evx = 100.0*(1 - var_x/varX)
evy = 100.0*(1 - var_y/varY)
if scale=='loads':
tnorm = apply_along_axis(vnorm, 0, T)
T = T/tnorm
W = W*tnorm
Q = Q*tnorm
return {'T':T, 'W':W, 'P':P, 'Q':Q, 'U':U, 'B':B, 'b0':b0, 'evx':evx, 'evy':evy}
########### Helper routines #########
def m_shape(array):
return matrix(array).shape
def esvd(data, amax=None):
"""SVD with the option of economy sized calculation
Calculate subspaces of X'X or XX' depending on the shape
of the matrix.
Good for extreme fat or thin matrices
Numpy supports this by setting full_matrices=0
m, n = data.shape
if m>=n:
kernel = dot(data.T, data)
if has_sym:
if not amax:
amax = n
pcrange = [n-amax, n]
s, v = symeig(kernel, range=pcrange, overwrite=True)
s = s[::-1]
v = v[:,arange(n, -1, -1)]
u, s, vt = svd(kernel)
v = vt.T
u = dot(data, v)
for i in xrange(n):
s[i] = vnorm(u[:,i])
u[:,i] = u[:,i]/s[i]
kernel = dot(data, data.T)
if has_sym:
if not amax:
amax = m
pcrange = [m-amax, m]
s, u = symeig(kernel, range=pcrange, overwrite=True)
u, s, vt = svd(kernel)
v = dot(u.T, data)
for i in xrange(m):
s[i] = vnorm(v[i,:])
v[i,:] = v[i,:]/s[i]
return u, s, v.T
def vnorm(x):
# assume column arrays (or vectors)
return math.sqrt(dot(x.T, x))
def center(a, axis):
# 0 = col center, 1 = row center, 2 = double center
# -1 = nothing
if axis==-1:
mn = zeros((a.shape[1],))
elif axis==0:
mn = a.mean(0)
elif axis==1:
mn = a.mean(1)[:,newaxis]
elif axis==2:
mn = a.mean(0) + a.mean(1)[:,newaxis] - a.mean()
raise IOError("input error: axis must be in [-1,0,1,2]")
return a - mn, mn