This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
pyblm/pyblm/models.py
2007-12-14 00:16:31 +00:00

291 lines
8.2 KiB
Python

"""Bilinear models"""
from numpy import expand_dims,ones
from engines import pca
def mean_center(x, axis=0):
"""Mean center across axis."""
return expand_dims(-x.mean(axis), axis)
def scale(x, axis=0):
""" Scale across axis."""
scale = ones((x.shape[axis-1],))
#scale = 1./x.std(axis)
return expand_dims(scale, axis)
class Model(object):
"""All underscored attributes are properties.
"""
def __init__(self, name="johndoe"):
self.name = name
self.options = {}
def save(self, filename='pca.ml'):
pass
def load(self):
pass
def clear(self):
for param in self.__dict__.keys():
if param.startswith("_") and param[1:5]!="core":
exec "del self." + param[1:]
def clear_core(self):
for param in self.__dict__.keys():
if param.startswith("_"):
exec "del self." + param
#self.clear()
class PCA(Model):
def __init__(self, x, amax=10):
Model.__init__(self, name="PCA")
self._x = x
self.amax = amax
self.aopt = amax
# properties
def amax():
doc = "maximum number of components"
def fget(self):
return self._amax
def fset(self, a):
assert(a>0)
a = min(a, min(self.x.shape))
if hasattr(self, "_amax"):
if a>self._amax:
# fixme: do a model update
raise NotImplementedError
self._amax = a
def fdel(self):
pass
return locals()
amax = property(**amax())
def tot_var():
doc = "total variance"
def fget(self):
if not hasattr(self, "_tot_var"):
self._tot_var = (self.xw**2).sum()
return self._tot_var
def fset(self, tv):
self._tot_var = tv
def fdel(self):
del self._tot_var
return locals()
tot_var = property(**tot_var())
def scores():
doc = "pca scores"
def fget(self):
if not hasattr(self, "_core_scores"):
result= pca(self.xw, self.amax)
self._core_scores = result['T']
self.singvals = result['eigvals']
self.loadings = result['P']
self.tot_var = 120.
return self._core_scores[:,:self.amax]
def fset(self, t):
self._core_scores = t
def fdel(self):
del self._core_scores
return locals()
scores = property(**scores())
def loadings():
doc = "pca loadings"
def fget(self):
if not hasattr(self, "_loadings"):
result = pca(self.xw, self.amax)
self.loadings = result['P']
self.scores = result['T']
self.singvals = result['eigvals']
self.tot_var = 120
return self._loadings[:,:self.amax]
def fdel(self):
del self._loadings
def fset(self, p):
self._loadings = p
return locals()
loadings = property(**loadings())
def singvals():
doc = "Singular values"
def fget(self):
if not hasattr(self, "_singvals"):
result = pca(self.xw, self.amax)
self._singvals = result['eigvals']
self.scores = result['T']
self.loadings = result['P']
self.tot_var = 120
return self._singvals[:self.amax]
def fset(self, w):
self._singvals = w
def fdel(self):
del self._singvals
return locals()
singvals = property(**singvals())
def x():
doc = "x is readonly, may not be deleted"
def fget(self):
return self._x
def fdel(self):
pass
return locals()
x = property(**x())
def xadd():
doc = "column means"
def fget(self):
if not hasattr(self, "_xadd"):
self._xadd = mean_center(self.x, axis=0)
return self._xadd
def fset(self, mnx):
if hasattr(self, "_xc"):
del self._xc
self._xadd = mnx
def fdel(self):
del self._xadd
if hasattr(self, "_xc"):
del self._xc
return locals()
xadd = property(**xadd())
def xc():
doc = "mean_centered input data"
def fget(self):
if not hasattr(self, "_xc"):
self._xc = self.x + self.xadd
return self._xc
def fset(self, xc):
self._xc = xc
def fdel(self):
print "a"
if hasattr(self, "_xc"):
print "del"
del self._xc
return locals()
xc = property(**xc())
def xw():
doc = "scaled input data"
def fget(self):
if not hasattr(self, "_xw"):
if self.x.shape==self.row_metric.shape:
self._xw = dot(self.xc, self.row_metric)
else:
self._xw = self.xc * self.row_metric
return self._xw
def fset(self, xw):
self._xw = xw
def fdel(self):
del self._xw
return locals()
xw = property(**xw())
def explained_variance():
doc = "explained variance"
def fget(self):
if not hasattr(self, "_explained_variance"):
self._explained_variance = 100*(self.singvals**2)/self.tot_var
return self._explained_variance[:self.amax]
def fset(self, ev):
self._explained_variance = ev
def fdel(self):
del self._explained_variance
return locals()
explained_variance = property(**explained_variance())
def residuals():
doc = "residuals"
def fget(self):
if not hasattr(self, "_residuals"):
res = empty((self.amax, self.x.shape[0], self.x.shape[1]))
for a in range(self.amax):
res[a,:,:] = self.xw - dot(self.scores[:,:a+1], self.loadings[:,:a+1].T)
self._residuals = res
return self._residuals
def fset(self, e):
self._residuals = e
def fdel(self):
del self._residuals
return locals()
residuals = property(**residuals())
def leverage():
doc = "objects leverage"
def fget(self):
if not hasattr(self, "_leverage"):
u = self.scores/self.singvals
self._leverage = empty((self.amax, u.shape[0]))
for i in range(self.amax):
self._leverage[i,:] = 1./u.shape[0] + (u[:,:i+1]**2).sum(1)
return self._leverage[:self.amax,:]
def fset(self, lev):
self._leverage = lev
def fdel(self):
del self._leverage
return locals()
leverage = property(**leverage())
def row_metric():
doc = "row metric"
def fget(self):
if not hasattr(self, "_row_metric"):
self._row_metric = scale(self.xc, axis=0)
return self._row_metric
def fset(self, w):
self._row_metric = w
def fdel(self):
del self._row_metric
if hasattr(self, "_xd"):
del self._xd
return locals()
row_metric = property(**row_metric())
def column_metric():
doc = "column metric"
def fget(self):
if not hasattr(self, "_column_metric"):
self._column_metric = scale(self.xc, axis=1)
return self._column_metric
def fset(self, w):
self._column_metric = w
# update model
def fdel(self):
del self._column_metric
if hasattr(self, "_xd"):
del self._xd
return locals()
column_metric = property(**column_metric())
def blm_update(self, a, b):
pass
def append_columns(self, cols):
pass
def append_rows(self, rows):
pass
def delete_columns(self, index):
pass
def delete_rows(self, index):
pass
def reweight(self, w):
pass
if __name__ == "__main__":
from numpy.random import rand
X = rand(4,10)
pcaobj = PCA(X)
print "explained variance" + str(pcaobj.explained_variance)