291 lines
8.2 KiB
Python
291 lines
8.2 KiB
Python
"""Bilinear models"""
|
|
|
|
from numpy import expand_dims,ones
|
|
|
|
from engines import pca
|
|
|
|
def mean_center(x, axis=0):
|
|
"""Mean center across axis."""
|
|
return expand_dims(-x.mean(axis), axis)
|
|
|
|
def scale(x, axis=0):
|
|
""" Scale across axis."""
|
|
scale = ones((x.shape[axis-1],))
|
|
#scale = 1./x.std(axis)
|
|
return expand_dims(scale, axis)
|
|
|
|
|
|
class Model(object):
|
|
"""All underscored attributes are properties.
|
|
"""
|
|
def __init__(self, name="johndoe"):
|
|
self.name = name
|
|
self.options = {}
|
|
|
|
def save(self, filename='pca.ml'):
|
|
pass
|
|
|
|
def load(self):
|
|
pass
|
|
|
|
def clear(self):
|
|
for param in self.__dict__.keys():
|
|
if param.startswith("_") and param[1:5]!="core":
|
|
exec "del self." + param[1:]
|
|
|
|
def clear_core(self):
|
|
for param in self.__dict__.keys():
|
|
if param.startswith("_"):
|
|
exec "del self." + param
|
|
#self.clear()
|
|
|
|
|
|
class PCA(Model):
|
|
def __init__(self, x, amax=10):
|
|
Model.__init__(self, name="PCA")
|
|
self._x = x
|
|
self.amax = amax
|
|
self.aopt = amax
|
|
|
|
# properties
|
|
def amax():
|
|
doc = "maximum number of components"
|
|
def fget(self):
|
|
return self._amax
|
|
def fset(self, a):
|
|
assert(a>0)
|
|
a = min(a, min(self.x.shape))
|
|
if hasattr(self, "_amax"):
|
|
if a>self._amax:
|
|
# fixme: do a model update
|
|
raise NotImplementedError
|
|
self._amax = a
|
|
def fdel(self):
|
|
pass
|
|
return locals()
|
|
amax = property(**amax())
|
|
|
|
def tot_var():
|
|
doc = "total variance"
|
|
def fget(self):
|
|
if not hasattr(self, "_tot_var"):
|
|
self._tot_var = (self.xw**2).sum()
|
|
return self._tot_var
|
|
def fset(self, tv):
|
|
self._tot_var = tv
|
|
def fdel(self):
|
|
del self._tot_var
|
|
return locals()
|
|
tot_var = property(**tot_var())
|
|
|
|
def scores():
|
|
doc = "pca scores"
|
|
def fget(self):
|
|
if not hasattr(self, "_core_scores"):
|
|
result= pca(self.xw, self.amax)
|
|
self._core_scores = result['T']
|
|
self.singvals = result['eigvals']
|
|
self.loadings = result['P']
|
|
self.tot_var = 120.
|
|
return self._core_scores[:,:self.amax]
|
|
def fset(self, t):
|
|
self._core_scores = t
|
|
def fdel(self):
|
|
del self._core_scores
|
|
return locals()
|
|
scores = property(**scores())
|
|
|
|
def loadings():
|
|
doc = "pca loadings"
|
|
def fget(self):
|
|
if not hasattr(self, "_loadings"):
|
|
result = pca(self.xw, self.amax)
|
|
self.loadings = result['P']
|
|
self.scores = result['T']
|
|
self.singvals = result['eigvals']
|
|
self.tot_var = 120
|
|
return self._loadings[:,:self.amax]
|
|
def fdel(self):
|
|
del self._loadings
|
|
def fset(self, p):
|
|
self._loadings = p
|
|
return locals()
|
|
loadings = property(**loadings())
|
|
|
|
def singvals():
|
|
doc = "Singular values"
|
|
def fget(self):
|
|
if not hasattr(self, "_singvals"):
|
|
result = pca(self.xw, self.amax)
|
|
self._singvals = result['eigvals']
|
|
self.scores = result['T']
|
|
self.loadings = result['P']
|
|
self.tot_var = 120
|
|
return self._singvals[:self.amax]
|
|
def fset(self, w):
|
|
self._singvals = w
|
|
def fdel(self):
|
|
del self._singvals
|
|
return locals()
|
|
singvals = property(**singvals())
|
|
|
|
def x():
|
|
doc = "x is readonly, may not be deleted"
|
|
def fget(self):
|
|
return self._x
|
|
def fdel(self):
|
|
pass
|
|
return locals()
|
|
x = property(**x())
|
|
|
|
def xadd():
|
|
doc = "column means"
|
|
def fget(self):
|
|
if not hasattr(self, "_xadd"):
|
|
self._xadd = mean_center(self.x, axis=0)
|
|
return self._xadd
|
|
def fset(self, mnx):
|
|
if hasattr(self, "_xc"):
|
|
del self._xc
|
|
self._xadd = mnx
|
|
def fdel(self):
|
|
del self._xadd
|
|
if hasattr(self, "_xc"):
|
|
del self._xc
|
|
return locals()
|
|
xadd = property(**xadd())
|
|
|
|
def xc():
|
|
doc = "mean_centered input data"
|
|
def fget(self):
|
|
if not hasattr(self, "_xc"):
|
|
self._xc = self.x + self.xadd
|
|
return self._xc
|
|
def fset(self, xc):
|
|
self._xc = xc
|
|
def fdel(self):
|
|
print "a"
|
|
if hasattr(self, "_xc"):
|
|
print "del"
|
|
del self._xc
|
|
return locals()
|
|
xc = property(**xc())
|
|
|
|
def xw():
|
|
doc = "scaled input data"
|
|
def fget(self):
|
|
if not hasattr(self, "_xw"):
|
|
if self.x.shape==self.row_metric.shape:
|
|
self._xw = dot(self.xc, self.row_metric)
|
|
else:
|
|
self._xw = self.xc * self.row_metric
|
|
return self._xw
|
|
def fset(self, xw):
|
|
self._xw = xw
|
|
def fdel(self):
|
|
del self._xw
|
|
return locals()
|
|
xw = property(**xw())
|
|
|
|
def explained_variance():
|
|
doc = "explained variance"
|
|
def fget(self):
|
|
if not hasattr(self, "_explained_variance"):
|
|
self._explained_variance = 100*(self.singvals**2)/self.tot_var
|
|
return self._explained_variance[:self.amax]
|
|
def fset(self, ev):
|
|
self._explained_variance = ev
|
|
def fdel(self):
|
|
del self._explained_variance
|
|
return locals()
|
|
explained_variance = property(**explained_variance())
|
|
|
|
def residuals():
|
|
doc = "residuals"
|
|
def fget(self):
|
|
if not hasattr(self, "_residuals"):
|
|
res = empty((self.amax, self.x.shape[0], self.x.shape[1]))
|
|
for a in range(self.amax):
|
|
res[a,:,:] = self.xw - dot(self.scores[:,:a+1], self.loadings[:,:a+1].T)
|
|
self._residuals = res
|
|
return self._residuals
|
|
def fset(self, e):
|
|
self._residuals = e
|
|
def fdel(self):
|
|
del self._residuals
|
|
return locals()
|
|
residuals = property(**residuals())
|
|
|
|
def leverage():
|
|
doc = "objects leverage"
|
|
def fget(self):
|
|
if not hasattr(self, "_leverage"):
|
|
u = self.scores/self.singvals
|
|
self._leverage = empty((self.amax, u.shape[0]))
|
|
for i in range(self.amax):
|
|
self._leverage[i,:] = 1./u.shape[0] + (u[:,:i+1]**2).sum(1)
|
|
return self._leverage[:self.amax,:]
|
|
def fset(self, lev):
|
|
self._leverage = lev
|
|
def fdel(self):
|
|
del self._leverage
|
|
return locals()
|
|
leverage = property(**leverage())
|
|
|
|
def row_metric():
|
|
doc = "row metric"
|
|
def fget(self):
|
|
if not hasattr(self, "_row_metric"):
|
|
self._row_metric = scale(self.xc, axis=0)
|
|
return self._row_metric
|
|
def fset(self, w):
|
|
self._row_metric = w
|
|
def fdel(self):
|
|
del self._row_metric
|
|
if hasattr(self, "_xd"):
|
|
del self._xd
|
|
return locals()
|
|
row_metric = property(**row_metric())
|
|
|
|
def column_metric():
|
|
doc = "column metric"
|
|
def fget(self):
|
|
if not hasattr(self, "_column_metric"):
|
|
self._column_metric = scale(self.xc, axis=1)
|
|
return self._column_metric
|
|
def fset(self, w):
|
|
|
|
self._column_metric = w
|
|
# update model
|
|
def fdel(self):
|
|
del self._column_metric
|
|
if hasattr(self, "_xd"):
|
|
del self._xd
|
|
return locals()
|
|
column_metric = property(**column_metric())
|
|
|
|
def blm_update(self, a, b):
|
|
pass
|
|
|
|
def append_columns(self, cols):
|
|
pass
|
|
|
|
def append_rows(self, rows):
|
|
pass
|
|
|
|
def delete_columns(self, index):
|
|
pass
|
|
|
|
def delete_rows(self, index):
|
|
pass
|
|
|
|
def reweight(self, w):
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from numpy.random import rand
|
|
X = rand(4,10)
|
|
pcaobj = PCA(X)
|
|
print "explained variance" + str(pcaobj.explained_variance)
|