This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
laydi/fluents/lib/blmfuncs.py

674 lines
23 KiB
Python
Raw Normal View History

2006-12-18 12:59:12 +01:00
"""This module contains bilinear models(Functions)
"""
2007-01-04 14:53:47 +01:00
2007-01-25 12:58:10 +01:00
import pygtk
2007-01-04 14:53:47 +01:00
import gtk
2007-01-25 12:58:10 +01:00
import gtk.glade
2007-01-04 14:53:47 +01:00
from fluents.workflow import Function, OptionsDialog, Options
2006-12-18 12:59:12 +01:00
from fluents.dataset import Dataset
from fluents import plots, dataset, workflow, logger
import scipy
from engines import *
from cx_stats import leverage, variances, hotelling
from cx_utils import mat_center
from validation import *
import blmplots
import engines
2007-01-25 12:58:10 +01:00
import copy
2006-12-18 12:59:12 +01:00
class Model(Function):
"""Base class of bilinear models.
"""
def __init__(self,id='johndoe',name='JohnDoe'):
Function.__init__(self,id,name)
self.name = name
self._options = None
self._data = {}
self._dataset = {}
self._packers = {}
self.model = {}
def clear(self):
""" Clears model paramters
"""
self.model = {}
self._data = {}
self._packers = {}
class PCA(Model):
def __init__(self,id='pca',name='PCA'):
Model.__init__(self,id,name)
self._options = PcaOptions()
2007-01-04 14:53:47 +01:00
2007-01-25 12:58:10 +01:00
def validation(self, amax, cv_val_sets, pert_val_sets, cv_val_method, pert_val_method):
"""Model validation and estimate of optimal numer of components.
2006-12-18 12:59:12 +01:00
"""
2007-01-25 12:58:10 +01:00
if self._options['calc_cv']:
if cv_val_method == 'random':
sep, aopt = pca_cv_val(self.model['E0'], amax, cv_val_sets)
self.model['sep'] = sep
if self._options['calc_pert']:
if pert_val_method == 'random_diag':
sep, aopt = pca_alter_val(self.model['E0'], amax, pert_val_sets)
self.model['sep'] = sep
if self._options['calc_cv']==False and self._options['calc_pert']==False:
self.model['sep'] = None
aopt = self._options['amax']
if self._options['auto_aopt']:
logger.log("notice", "Auto aopt: " + str(aopt))
self._options['aopt'] = aopt
if aopt==1:
logger.log('notice', 'Aopt at first component!')
2006-12-18 12:59:12 +01:00
def confidence(self, aopt, n_sets, alpha, p_center,
crot, strict, cov_center ):
"""Returns a confidence measure for model parameters.
Based on aopt.
"""
2007-01-25 12:58:10 +01:00
if aopt<2:
aopt = 2
logger.log('notice','Hotellings T2 needs more than 1 comp.\n switching to 2!!')
2006-12-18 12:59:12 +01:00
jk_segments = pca_jkP(self.model['E0'], aopt, n_sets)
Pcal = self.model['P'][:,:aopt]
tsq = hotelling(jk_segments, Pcal, p_center,
2007-01-25 12:58:10 +01:00
cov_center, alpha, crot, strict)
2006-12-18 12:59:12 +01:00
self.model['p_tsq'] = tsq
def make_model(self, amax, mode, scale):
"""Model on optimal number of components.
"""
dat = pca(self.model['E0'], amax, scale, mode)
# explained variance
var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P'])
dat['var_x'] = var_x
dat['exp_var_x'] = exp_var_x
#fixme###
do_lev_s = False
do_lev_v = False
#####
if do_lev_s:
# sample leverages
tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of T-columns
s_lev = leverage(amax, tnorm)
dat['s_lev'] = s_lev
if do_lev_v:
# variable leverages
v_lev = leverage(amax, dat['P'])
dat['v_lev'] = v_lev
self.model.update(dat)
def as_dataset(self, param, dtype='dataset'):
"""Return model parameter as Dataset.
"""
if not param in self.model.keys():
return
DX = self._dataset['X'] #input dataset
dim_name_0, dim_name_1 = DX.get_dim_name()
# samples
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
# vars
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
# components (hidden)
2007-01-25 12:58:10 +01:00
pc_ids = ['_amax', map(str,range(self._options['amax'])) ]
pc_ids_opt = ['_aopt', map(str, range(self._options['aopt'])) ]
2006-12-18 12:59:12 +01:00
zero_dim = ['_doe', ['0']] # null dim, vector (hidden)
match_ids = {'E':[ids_0, ids_1],
'E0':[ids_0, ids_1],
'P':[ids_1, pc_ids],
'T':[ids_0, pc_ids],
'W':[ids_1, pc_ids],
'p_tsq':[ids_1, zero_dim],
'rmsep':[pc_ids, zero_dim],
'var_leverages':[ids_1, zero_dim],
'sample_leverages':[pc_ids, zero_dim],
'exp_var_x': [pc_ids, zero_dim],
'var_x': [pc_ids, zero_dim],
}
out = Dataset(self.model[param], match_ids[param], name=param)
return out
def get_out_plots(self, options):
out=[]
for plt in options['out_plots']:
#try:
out.append(plt(self))
#except:
2007-01-25 12:58:10 +01:00
# logger.log('debug', 'Plot: %s failed') %str(plt)
2006-12-18 12:59:12 +01:00
return out
2007-01-04 14:53:47 +01:00
def run_o(self, data):
2006-12-18 12:59:12 +01:00
"""Run pca with present options.
"""
self.clear()
options = self._options
2007-01-25 12:58:10 +01:00
for item in options.items():
print item
2006-12-18 12:59:12 +01:00
self._dataset['X'] = data
self._data['X'] = data.asarray().astype('<f8')
if options['center']:
center = options['center_mth']
self.model['E0'] = center(self._data['X'])
else:
self.model['E0'] = data.asarray()
2007-01-25 12:58:10 +01:00
self.validation(**options.validation_options())
self.model['aopt'] = self._options['aopt']
2006-12-18 12:59:12 +01:00
self.make_model(**options.make_model_options())
if options['calc_conf']:
self.confidence(**options.confidence_options())
out = [self.as_dataset(p) for p in options['out_data']]
for plt in self.get_out_plots(options):
out.append(plt)
return out
2007-01-04 14:53:47 +01:00
def run(self, data):
"""Run Pca with option gui.
"""
dialog = PcaOptionsDialog([data], self._options)
dialog.show_all()
response = dialog.run()
dialog.hide()
2006-12-18 12:59:12 +01:00
2007-01-04 14:53:47 +01:00
if response == gtk.RESPONSE_OK:
# set output data and plots
dialog.set_output()
#run with current data and options
return self.run_o(data)
2006-12-18 12:59:12 +01:00
class PLS(Model):
def __init__(self, id='pls', name='PLS'):
Model.__init__(self, id, name)
self._options = PlsOptions()
def pre_validation(self, amax, n_sets, val_engine):
"""Returns rmsec,rmsep for model.
"""
2007-01-25 12:58:10 +01:00
rmsep, aopt = val_engine(self.model['E0'], self.model['F0'],
2006-12-18 12:59:12 +01:00
amax, n_sets)
self.model['rmsep'] = rmsep.mean(0)
2007-01-25 12:58:10 +01:00
self.model['aopt'] = aopt
2006-12-18 12:59:12 +01:00
def confidence(self, aopt, n_sets, alpha, p_center,
crot, strict, cov_center ):
"""Returns a confidence measure for model parameters
Supported parameters: W
"""
aopt = self.model['aopt']
jk_segments = pls_jkW(self.model['E0'], self.model['F0'],
aopt, n_sets)
Wcal = self.model['W'][:,:aopt]
tsq = hotelling(jk_segments, Wcal, p_center,
alpha, crot, strict, cov_center)
self.model['w_tsq'] = tsq
def permutation_confidence(self, a, b, aopt, reg, n_iter, algo,
sim_method):
"""Estimates sign. vars by controlling fdr."""
qvals_sorted, qvals = pls_qvals(a, b, aopt=None,
alpha=.4, n_iter=20, algo='pls',
sim_method='shuffle', )
def make_model(self, a, b, amax, scale, mode, engine):
"""Make model on amax components.
"""
dat = engine(a, b, amax, scale, mode)
self.model.update(dat)
def as_dataset(self, name, dtype='Dataset'):
"""Return any model parameter as Dataset
No ids matching
"""
if name not in self.model.keys():
return
DX, DY = self._dataset['X'], self._dataset['Y']
dim_name_0, dim_name_1 = DX.get_dim_name()
dim_name_2, dim_name_3 = DY.get_dim_name()
#samples
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
# x vars
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
# y vars
ids_3 = [dim_name_3, DY.get_identifiers(dim_name_3, sorted=True)]
# components (hidden)
pc_ids = ['_comp', map(str, range(self.model['aopt']))]
zero_dim = ['_doe',['0']] # null dim, vector (hidden)
match_ids = {'E':[ids_0, ids_1],
'P':[ids_1, pc_ids],
'T':[ids_0, pc_ids],
'W': [ids_1, pc_ids],
'R': [ids_1, pc_ids],
'Q':[ids_3, pc_ids],
'F':[ids_0, ids_3],
'B':[ids_1, ids_3],
'qval':[ids_1, zero_dim],
'qval_sorted':[ids_1, zero_dim],
'w_tsq':[ids_1, zero_dim],
'rmsep':[pc_ids, zero_dim],
}
array = self.model[name]
M = Dataset(array,identifiers=match_ids[name],name=name)
return M
def get_out_plots(self, options):
out=[]
for plt in options['out_plots']:
#try:
out.append(plt(self))
#except:
# logger.log('debug', 'Plot: %s failed' %plt)
return out
2007-01-04 14:53:47 +01:00
def run_o(self, a, b):
2006-12-18 12:59:12 +01:00
options = self._options
self._dataset['X'] = a
self._dataset['Y'] = b
self._data['X'] = a.asarray()
self._data['Y'] = b.asarray()
if options['center']:
self.model['E0'] = options['center_mth'](self._data['X'])
self.model['F0'] = options['center_mth'](self._data['Y'])
else:
self.model['E0'] = self._data['X']
self.model['F0'] = self._data['Y']
self.pre_validation(**options.pre_validation_options())
self.make_model(self.model['E0'], self.model['F0'],
**options.make_model_options())
# variance captured
var_x, exp_var_x = variances(self.model['E0'], self.model['T'], self.model['P'])
self.model['var_x'] = var_x
self.model['exp_var_x'] = exp_var_x
var_y, exp_var_y = variances(self.model['F0'], self.model['T'], self.model['Q'])
self.model['var_y'] = var_y
self.model['exp_var_y'] = exp_var_y
if options['calc_conf']:
self.confidence(**options.confidence_options())
out = [self.as_dataset(p) for p in options['out_data']]
for plt in self.get_out_plots(options):
out.append(plt)
return out
2007-01-04 14:53:47 +01:00
def run(self, a, b):
"""Run Pls with option gui.
"""
dialog = PlsOptionsDialog([a, b], self._options)
dialog.show_all()
response = dialog.run()
dialog.hide()
if response == gtk.RESPONSE_OK:
# set output data and plots
dialog.set_output()
#run with current data and options
return self.run_o(a, b)
2006-12-18 12:59:12 +01:00
class Packer:
"""A compression object used to speed up model calculations.
Often used in conjunction with crossvalidation and perturbations
analysis.
"""
def __init__(self,array):
self._shape = array.shape
self._array = array
self._packed_data = None
def expand(self,a):
if self._inflater!=None:
return dot(self._inflater,a)
def collapse(self,axis=None,mode='svd'):
if not axis:
axis = argmin(self._array.shape) # default is the smallest dim
if axis == 1:
self._array = self._array.T
u,s,vt = svd(self._array,full_matrices=0)
self._inflater = vt.T
self._packed_data = u*s
return self._packed_data
def get_packed_data(self):
return self._packed_data
class PcaOptions(Options):
"""Options for Principal Component Analysis.
"""
def __init__(self):
Options.__init__(self)
self._set_default()
def _set_default(self):
opt = {}
opt['algo'] = 'pca'
opt['engine'] = engines.pca
opt['mode'] = 'normal' # how much info to calculate
2007-01-25 12:58:10 +01:00
opt['amax'] = 10
opt['aopt'] = 100
opt['auto_aopt'] = False
2006-12-18 12:59:12 +01:00
opt['center'] = True
opt['center_mth'] = mat_center
opt['scale'] = 'scores'
2007-01-25 12:58:10 +01:00
opt['calc_conf'] = False
opt['n_sets'] = 5
2006-12-18 12:59:12 +01:00
opt['strict'] = True
opt['p_center'] = 'med'
opt['alpha'] = .8
opt['cov_center'] = 'med'
opt['crot'] = True
2007-01-25 12:58:10 +01:00
opt['calc_cv'] = False
opt['calc_pert'] = True
opt['pert_val_method'] = 'random_diag'
opt['cv_val_method'] = 'random'
opt['cv_val_sets'] = 10
opt['pert_val_sets'] = 10
2006-12-18 12:59:12 +01:00
2007-01-04 14:53:47 +01:00
opt['all_data'] = [('T', 'scores', True),
('P', 'loadings', True),
('E','residuals', False),
('p_tsq', 't2', False),
2007-01-25 12:58:10 +01:00
('rmsep', 'RMSEP', False)
2007-01-04 14:53:47 +01:00
]
opt['all_plots'] = [(blmplots.PcaScorePlot, 'Scores', True),
(blmplots.PcaLoadingPlot, 'Loadings', True),
2007-01-25 12:58:10 +01:00
(blmplots.LineViewXc, 'Line view', True),
(blmplots.PredictionErrorPlot, 'Residual Error', True)
2007-01-04 14:53:47 +01:00
]
2006-12-18 12:59:12 +01:00
opt['out_data'] = ['T','P', 'p_tsq']
opt['out_plots'] = [blmplots.PcaScorePlot,blmplots.PcaLoadingPlot,blmplots.LineViewXc]
self.update(opt)
def make_model_options(self):
"""Options for make_model method."""
opt_list = ['scale','mode', 'amax']
return self._copy_from_list(opt_list)
def confidence_options(self):
"""Options for confidence method."""
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
'strict', 'crot', 'cov_center']
return self._copy_from_list(opt_list)
2007-01-25 12:58:10 +01:00
def validation_options(self):
2006-12-18 12:59:12 +01:00
"""Options for pre_validation method."""
2007-01-25 12:58:10 +01:00
opt_list = ['amax', 'cv_val_sets', 'pert_val_sets',
'cv_val_method', 'pert_val_method']
2006-12-18 12:59:12 +01:00
return self._copy_from_list(opt_list)
class PlsOptions(Options):
"""Options for Partial Least Squares Regression.
"""
def __init__(self):
Options.__init__(self)
self._set_default()
def _set_default(self):
opt = {}
opt['algo'] = 'pls'
opt['engine'] = engines.pls
opt['mode'] = 'normal' # how much info to calculate
opt['lod'] = 'compact' # how much info to store
opt['amax'] = 3
opt['aopt'] = 3
opt['center'] = True
opt['center_mth'] = mat_center
opt['scale'] = 'scores'
2007-01-25 12:58:10 +01:00
opt['calc_conf'] = False
2006-12-18 12:59:12 +01:00
opt['n_sets'] = 10
opt['strict'] = True
opt['p_center'] = 'med'
opt['alpha'] = .2
opt['cov_center'] = 'med'
opt['crot'] = True
2007-01-25 12:58:10 +01:00
opt['calc_cv'] = True
opt['calc_pert'] = False
2006-12-18 12:59:12 +01:00
opt['val_engine'] = w_pls_cv_val
2007-01-04 14:53:47 +01:00
opt['all_data'] = [('T', 'scores', True),
('P', 'loadings', True),
('E','residuals', False),
('p_tsq', 't2', False),
2007-01-25 12:58:10 +01:00
('rmsep', 'RMSEP', False)
2007-01-04 14:53:47 +01:00
]
2006-12-18 12:59:12 +01:00
2007-01-04 14:53:47 +01:00
opt['all_plots'] = [(blmplots.PlsScorePlot, 'Scores', True),
(blmplots.PlsLoadingPlot, 'Loadings', True),
(blmplots.LineViewXc, 'Line view', True)]
2006-12-18 12:59:12 +01:00
opt['out_plots'] = [blmplots.PlsScorePlot,
2007-01-25 12:58:10 +01:00
blmplots.PlsLoadingPlot,
2006-12-18 12:59:12 +01:00
blmplots.LineViewXc]
2007-01-04 14:53:47 +01:00
opt['out_data'] = None
2006-12-18 12:59:12 +01:00
opt['pack'] = False
opt['calc_qvals'] = False
opt['q_pert_mth'] = 'shuffle_vars'
opt['q_iter'] = 20
2007-01-04 14:53:47 +01:00
2006-12-18 12:59:12 +01:00
self.update(opt)
def make_model_options(self):
"""Options for make_model method."""
opt_list = ['scale','mode', 'amax', 'engine']
return self._copy_from_list(opt_list)
def confidence_options(self):
"""Options for confidence method."""
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
2007-01-25 12:58:10 +01:00
'strict', 'crot', 'cov_center']
2006-12-18 12:59:12 +01:00
return self._copy_from_list(opt_list)
def pre_validation_options(self):
"""Options for pre_validation method."""
opt_list = ['amax', 'n_sets', 'val_engine']
return self._copy_from_list(opt_list)
2007-01-04 14:53:47 +01:00
class PcaOptionsDialog(OptionsDialog):
"""Options dialog for Principal Component Analysis.
"""
def __init__(self, data, options, input_names=['X']):
OptionsDialog.__init__(self, data, options, input_names)
glade_file = os.path.join(fluents.DATADIR, 'pca_options.glade')
2007-01-25 12:58:10 +01:00
notebook_name = "vbox1"
page_name = "Options"
self.add_page_from_glade(glade_file, notebook_name, page_name)
# connect signals to handlers
dic = {"on_amax_value_changed" : self.on_amax_changed,
"on_aopt_value_changed" : self.on_aopt_changed,
"auto_aopt_toggled" : self.auto_aopt_toggled,
"center_toggled" : self.center_toggled,
"on_scale_changed" : self.on_scale_changed,
"on_val_none" : self.val_toggled,
"on_val_cv" : self.cv_toggled,
"on_val_pert" : self.pert_toggled,
"on_cv_method_changed" : self.on_cv_method_changed,
"on_cv_sets_changed" : self.on_cv_sets_changed,
"on_pert_sets_changed" : self.on_pert_sets_changed,
"on_conf_toggled" : self.on_conf_toggled
}
self.wTree.signal_autoconnect(dic)
# set/ensure valid default values/ranges
amax_sb = self.wTree.get_widget("amax_spinbutton")
max_comp = min(data[0].shape) # max num of components
if self._options['amax']>max_comp:
logger.log('debug', 'amax default too large ... adjusting')
self._options['amax'] = max_comp
amax_sb.get_adjustment().set_all(self._options['amax'], 1, max_comp, 1, 0, 0)
# aopt spin button
aopt_sb = self.wTree.get_widget("aopt_spinbutton")
if self._options['aopt']>self._options['amax']:
self._options['aopt'] = self._options['amax'] + 1 - 1
aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, self._options['amax'], 1, 0, 0)
# scale
scale_cb = self.wTree.get_widget("scale_combobox")
scale_cb.set_active(0)
# validation frames
if self._options['calc_cv']==False:
cv_frame = self.wTree.get_widget("cv_frame")
cv_frame.set_sensitive(False)
if self._options['calc_pert']==False:
pert_frame = self.wTree.get_widget("pert_frame")
pert_frame.set_sensitive(False)
cv = self.wTree.get_widget("cv_method").set_active(0)
pm = self.wTree.get_widget("pert_method").set_active(0)
# confidence
if self._options['calc_conf']==True:
self.wTree.get_widget("subset_frame").set_sensitive(True)
else:
self.wTree.get_widget("subset_frame").set_sensitive(False)
def on_amax_changed(self, sb):
logger.log("debug", "amax changed: new value: %s" %sb.get_value_as_int())
amax = sb.get_value_as_int()
# update aopt if needed
if amax<self._options['aopt']:
self._options['aopt'] = amax
aopt_sb = self.wTree.get_widget("aopt_spinbutton")
aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, amax, 1, 0, 0)
self._options['amax'] = sb.get_value_as_int()
def on_aopt_changed(self, sb):
aopt = sb.get_value_as_int()
self._options['aopt'] = aopt
def auto_aopt_toggled(self, tb):
aopt_sb = self.wTree.get_widget("aopt_spinbutton")
if tb.get_active():
self._options['auto_aopt'] = True
aopt_sb.set_sensitive(False)
else:
self._options['auto_aopt'] = False
aopt_sb.set_sensitive(True)
def center_toggled(self, tb):
if tb.get_active():
self._options['center'] = True
else:
logger.log("debug", "centering set to False")
self._options['center'] = False
def on_scale_changed(self, cb):
scale = cb.get_active_text()
if scale=='Scores':
self._options['scale'] = 'scores'
elif scale=='Loadings':
self._options['scale'] = 'loads'
else:
raise IOError
def val_toggled(self, tb):
"""Callback for validation: None. """
cv_frame = self.wTree.get_widget("cv_frame")
pert_frame = self.wTree.get_widget("pert_frame")
cv_tb = self.wTree.get_widget("cv_toggle")
p_tb = self.wTree.get_widget("pert_toggle")
if tb.get_active():
self._options['calc_cv'] = False
self._options['calc_pert'] = False
cv_frame.set_sensitive(False)
pert_frame.set_sensitive(False)
cv_tb.set_sensitive(False)
p_tb.set_sensitive(False)
else:
p_tb.set_sensitive(True)
cv_tb.set_sensitive(True)
if p_tb.get_active():
pert_frame.set_sensitive(True)
self._options['calc_pert'] = True
if cv_tb.get_active():
cv_frame.set_sensitive(True)
self._options['calc_cv'] = True
def cv_toggled(self, tb):
cv_frame = self.wTree.get_widget("cv_frame")
if tb.get_active():
cv_frame.set_sensitive(True)
self._options['calc_cv'] = True
else:
cv_frame.set_sensitive(False)
self._options['calc_cv'] = False
def pert_toggled(self, tb):
pert_frame = self.wTree.get_widget("pert_frame")
if tb.get_active():
pert_frame.set_sensitive(True)
self._options['calc_pert'] = True
else:
pert_frame.set_sensitive(False)
self._options['calc_pert'] = False
def on_cv_method_changed(self, cb):
method = cb.get_active_text()
if method == 'Random':
self._options['cv_val_method'] = 'random'
def on_pert_method_changed(self, cb):
method = cb.get_active_text()
if method == 'Random diags':
self._options['pert_val_method'] = 'random_diag'
def on_cv_sets_changed(self, sb):
val = sb.get_value_as_int()
self._options['cv_val_sets'] = val
def on_pert_sets_changed(self, sb):
val = sb.get_value_as_int()
self._options['pert_val_sets'] = val
def on_conf_toggled(self, tb):
if tb.get_active():
self._options['calc_conf'] = False
self.wTree.get_widget("subset_frame").set_sensitive(False)
else:
self._options['calc_conf'] = True
self.wTree.get_widget("subset_frame").set_sensitive(True)
2007-01-04 14:53:47 +01:00
class PlsOptionsDialog(OptionsDialog):
"""Options dialog for Partial Least Squares Regression.
"""
2007-01-25 12:58:10 +01:00
2007-01-04 14:53:47 +01:00
def __init__(self, data, options, input_names=['X', 'Y']):
OptionsDialog.__init__(self, data, options, input_names)