This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
laydi/fluents/lib/blmfuncs.py

674 lines
23 KiB
Python

"""This module contains bilinear models(Functions)
"""
import pygtk
import gtk
import gtk.glade
from fluents.workflow import Function, OptionsDialog, Options
from fluents.dataset import Dataset
from fluents import plots, dataset, workflow, logger
import scipy
from engines import *
from cx_stats import leverage, variances, hotelling
from cx_utils import mat_center
from validation import *
import blmplots
import engines
import copy
class Model(Function):
"""Base class of bilinear models.
"""
def __init__(self,id='johndoe',name='JohnDoe'):
Function.__init__(self,id,name)
self.name = name
self._options = None
self._data = {}
self._dataset = {}
self._packers = {}
self.model = {}
def clear(self):
""" Clears model paramters
"""
self.model = {}
self._data = {}
self._packers = {}
class PCA(Model):
def __init__(self,id='pca',name='PCA'):
Model.__init__(self,id,name)
self._options = PcaOptions()
def validation(self, amax, cv_val_sets, pert_val_sets, cv_val_method, pert_val_method):
"""Model validation and estimate of optimal numer of components.
"""
if self._options['calc_cv']:
if cv_val_method == 'random':
sep, aopt = pca_cv_val(self.model['E0'], amax, cv_val_sets)
self.model['sep'] = sep
if self._options['calc_pert']:
if pert_val_method == 'random_diag':
sep, aopt = pca_alter_val(self.model['E0'], amax, pert_val_sets)
self.model['sep'] = sep
if self._options['calc_cv']==False and self._options['calc_pert']==False:
self.model['sep'] = None
aopt = self._options['amax']
if self._options['auto_aopt']:
logger.log("notice", "Auto aopt: " + str(aopt))
self._options['aopt'] = aopt
if aopt==1:
logger.log('notice', 'Aopt at first component!')
def confidence(self, aopt, n_sets, alpha, p_center,
crot, strict, cov_center ):
"""Returns a confidence measure for model parameters.
Based on aopt.
"""
if aopt<2:
aopt = 2
logger.log('notice','Hotellings T2 needs more than 1 comp.\n switching to 2!!')
jk_segments = pca_jkP(self.model['E0'], aopt, n_sets)
Pcal = self.model['P'][:,:aopt]
tsq = hotelling(jk_segments, Pcal, p_center,
cov_center, alpha, crot, strict)
self.model['p_tsq'] = tsq
def make_model(self, amax, mode, scale):
"""Model on optimal number of components.
"""
dat = pca(self.model['E0'], amax, scale, mode)
# explained variance
var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P'])
dat['var_x'] = var_x
dat['exp_var_x'] = exp_var_x
#fixme###
do_lev_s = False
do_lev_v = False
#####
if do_lev_s:
# sample leverages
tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of T-columns
s_lev = leverage(amax, tnorm)
dat['s_lev'] = s_lev
if do_lev_v:
# variable leverages
v_lev = leverage(amax, dat['P'])
dat['v_lev'] = v_lev
self.model.update(dat)
def as_dataset(self, param, dtype='dataset'):
"""Return model parameter as Dataset.
"""
if not param in self.model.keys():
return
DX = self._dataset['X'] #input dataset
dim_name_0, dim_name_1 = DX.get_dim_name()
# samples
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
# vars
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
# components (hidden)
pc_ids = ['_amax', map(str,range(self._options['amax'])) ]
pc_ids_opt = ['_aopt', map(str, range(self._options['aopt'])) ]
zero_dim = ['_doe', ['0']] # null dim, vector (hidden)
match_ids = {'E':[ids_0, ids_1],
'E0':[ids_0, ids_1],
'P':[ids_1, pc_ids],
'T':[ids_0, pc_ids],
'W':[ids_1, pc_ids],
'p_tsq':[ids_1, zero_dim],
'rmsep':[pc_ids, zero_dim],
'var_leverages':[ids_1, zero_dim],
'sample_leverages':[pc_ids, zero_dim],
'exp_var_x': [pc_ids, zero_dim],
'var_x': [pc_ids, zero_dim],
}
out = Dataset(self.model[param], match_ids[param], name=param)
return out
def get_out_plots(self, options):
out=[]
for plt in options['out_plots']:
#try:
out.append(plt(self))
#except:
# logger.log('debug', 'Plot: %s failed') %str(plt)
return out
def run_o(self, data):
"""Run pca with present options.
"""
self.clear()
options = self._options
for item in options.items():
print item
self._dataset['X'] = data
self._data['X'] = data.asarray().astype('<f8')
if options['center']:
center = options['center_mth']
self.model['E0'] = center(self._data['X'])
else:
self.model['E0'] = data.asarray()
self.validation(**options.validation_options())
self.model['aopt'] = self._options['aopt']
self.make_model(**options.make_model_options())
if options['calc_conf']:
self.confidence(**options.confidence_options())
out = [self.as_dataset(p) for p in options['out_data']]
for plt in self.get_out_plots(options):
out.append(plt)
return out
def run(self, data):
"""Run Pca with option gui.
"""
dialog = PcaOptionsDialog([data], self._options)
dialog.show_all()
response = dialog.run()
dialog.hide()
if response == gtk.RESPONSE_OK:
# set output data and plots
dialog.set_output()
#run with current data and options
return self.run_o(data)
class PLS(Model):
def __init__(self, id='pls', name='PLS'):
Model.__init__(self, id, name)
self._options = PlsOptions()
def pre_validation(self, amax, n_sets, val_engine):
"""Returns rmsec,rmsep for model.
"""
rmsep, aopt = val_engine(self.model['E0'], self.model['F0'],
amax, n_sets)
self.model['rmsep'] = rmsep.mean(0)
self.model['aopt'] = aopt
def confidence(self, aopt, n_sets, alpha, p_center,
crot, strict, cov_center ):
"""Returns a confidence measure for model parameters
Supported parameters: W
"""
aopt = self.model['aopt']
jk_segments = pls_jkW(self.model['E0'], self.model['F0'],
aopt, n_sets)
Wcal = self.model['W'][:,:aopt]
tsq = hotelling(jk_segments, Wcal, p_center,
alpha, crot, strict, cov_center)
self.model['w_tsq'] = tsq
def permutation_confidence(self, a, b, aopt, reg, n_iter, algo,
sim_method):
"""Estimates sign. vars by controlling fdr."""
qvals_sorted, qvals = pls_qvals(a, b, aopt=None,
alpha=.4, n_iter=20, algo='pls',
sim_method='shuffle', )
def make_model(self, a, b, amax, scale, mode, engine):
"""Make model on amax components.
"""
dat = engine(a, b, amax, scale, mode)
self.model.update(dat)
def as_dataset(self, name, dtype='Dataset'):
"""Return any model parameter as Dataset
No ids matching
"""
if name not in self.model.keys():
return
DX, DY = self._dataset['X'], self._dataset['Y']
dim_name_0, dim_name_1 = DX.get_dim_name()
dim_name_2, dim_name_3 = DY.get_dim_name()
#samples
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
# x vars
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
# y vars
ids_3 = [dim_name_3, DY.get_identifiers(dim_name_3, sorted=True)]
# components (hidden)
pc_ids = ['_comp', map(str, range(self.model['aopt']))]
zero_dim = ['_doe',['0']] # null dim, vector (hidden)
match_ids = {'E':[ids_0, ids_1],
'P':[ids_1, pc_ids],
'T':[ids_0, pc_ids],
'W': [ids_1, pc_ids],
'R': [ids_1, pc_ids],
'Q':[ids_3, pc_ids],
'F':[ids_0, ids_3],
'B':[ids_1, ids_3],
'qval':[ids_1, zero_dim],
'qval_sorted':[ids_1, zero_dim],
'w_tsq':[ids_1, zero_dim],
'rmsep':[pc_ids, zero_dim],
}
array = self.model[name]
M = Dataset(array,identifiers=match_ids[name],name=name)
return M
def get_out_plots(self, options):
out=[]
for plt in options['out_plots']:
#try:
out.append(plt(self))
#except:
# logger.log('debug', 'Plot: %s failed' %plt)
return out
def run_o(self, a, b):
options = self._options
self._dataset['X'] = a
self._dataset['Y'] = b
self._data['X'] = a.asarray()
self._data['Y'] = b.asarray()
if options['center']:
self.model['E0'] = options['center_mth'](self._data['X'])
self.model['F0'] = options['center_mth'](self._data['Y'])
else:
self.model['E0'] = self._data['X']
self.model['F0'] = self._data['Y']
self.pre_validation(**options.pre_validation_options())
self.make_model(self.model['E0'], self.model['F0'],
**options.make_model_options())
# variance captured
var_x, exp_var_x = variances(self.model['E0'], self.model['T'], self.model['P'])
self.model['var_x'] = var_x
self.model['exp_var_x'] = exp_var_x
var_y, exp_var_y = variances(self.model['F0'], self.model['T'], self.model['Q'])
self.model['var_y'] = var_y
self.model['exp_var_y'] = exp_var_y
if options['calc_conf']:
self.confidence(**options.confidence_options())
out = [self.as_dataset(p) for p in options['out_data']]
for plt in self.get_out_plots(options):
out.append(plt)
return out
def run(self, a, b):
"""Run Pls with option gui.
"""
dialog = PlsOptionsDialog([a, b], self._options)
dialog.show_all()
response = dialog.run()
dialog.hide()
if response == gtk.RESPONSE_OK:
# set output data and plots
dialog.set_output()
#run with current data and options
return self.run_o(a, b)
class Packer:
"""A compression object used to speed up model calculations.
Often used in conjunction with crossvalidation and perturbations
analysis.
"""
def __init__(self,array):
self._shape = array.shape
self._array = array
self._packed_data = None
def expand(self,a):
if self._inflater!=None:
return dot(self._inflater,a)
def collapse(self,axis=None,mode='svd'):
if not axis:
axis = argmin(self._array.shape) # default is the smallest dim
if axis == 1:
self._array = self._array.T
u,s,vt = svd(self._array,full_matrices=0)
self._inflater = vt.T
self._packed_data = u*s
return self._packed_data
def get_packed_data(self):
return self._packed_data
class PcaOptions(Options):
"""Options for Principal Component Analysis.
"""
def __init__(self):
Options.__init__(self)
self._set_default()
def _set_default(self):
opt = {}
opt['algo'] = 'pca'
opt['engine'] = engines.pca
opt['mode'] = 'normal' # how much info to calculate
opt['amax'] = 10
opt['aopt'] = 100
opt['auto_aopt'] = False
opt['center'] = True
opt['center_mth'] = mat_center
opt['scale'] = 'scores'
opt['calc_conf'] = False
opt['n_sets'] = 5
opt['strict'] = True
opt['p_center'] = 'med'
opt['alpha'] = .8
opt['cov_center'] = 'med'
opt['crot'] = True
opt['calc_cv'] = False
opt['calc_pert'] = True
opt['pert_val_method'] = 'random_diag'
opt['cv_val_method'] = 'random'
opt['cv_val_sets'] = 10
opt['pert_val_sets'] = 10
opt['all_data'] = [('T', 'scores', True),
('P', 'loadings', True),
('E','residuals', False),
('p_tsq', 't2', False),
('rmsep', 'RMSEP', False)
]
opt['all_plots'] = [(blmplots.PcaScorePlot, 'Scores', True),
(blmplots.PcaLoadingPlot, 'Loadings', True),
(blmplots.LineViewXc, 'Line view', True),
(blmplots.PredictionErrorPlot, 'Residual Error', True)
]
opt['out_data'] = ['T','P', 'p_tsq']
opt['out_plots'] = [blmplots.PcaScorePlot,blmplots.PcaLoadingPlot,blmplots.LineViewXc]
self.update(opt)
def make_model_options(self):
"""Options for make_model method."""
opt_list = ['scale','mode', 'amax']
return self._copy_from_list(opt_list)
def confidence_options(self):
"""Options for confidence method."""
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
'strict', 'crot', 'cov_center']
return self._copy_from_list(opt_list)
def validation_options(self):
"""Options for pre_validation method."""
opt_list = ['amax', 'cv_val_sets', 'pert_val_sets',
'cv_val_method', 'pert_val_method']
return self._copy_from_list(opt_list)
class PlsOptions(Options):
"""Options for Partial Least Squares Regression.
"""
def __init__(self):
Options.__init__(self)
self._set_default()
def _set_default(self):
opt = {}
opt['algo'] = 'pls'
opt['engine'] = engines.pls
opt['mode'] = 'normal' # how much info to calculate
opt['lod'] = 'compact' # how much info to store
opt['amax'] = 3
opt['aopt'] = 3
opt['center'] = True
opt['center_mth'] = mat_center
opt['scale'] = 'scores'
opt['calc_conf'] = False
opt['n_sets'] = 10
opt['strict'] = True
opt['p_center'] = 'med'
opt['alpha'] = .2
opt['cov_center'] = 'med'
opt['crot'] = True
opt['calc_cv'] = True
opt['calc_pert'] = False
opt['val_engine'] = w_pls_cv_val
opt['all_data'] = [('T', 'scores', True),
('P', 'loadings', True),
('E','residuals', False),
('p_tsq', 't2', False),
('rmsep', 'RMSEP', False)
]
opt['all_plots'] = [(blmplots.PlsScorePlot, 'Scores', True),
(blmplots.PlsLoadingPlot, 'Loadings', True),
(blmplots.LineViewXc, 'Line view', True)]
opt['out_plots'] = [blmplots.PlsScorePlot,
blmplots.PlsLoadingPlot,
blmplots.LineViewXc]
opt['out_data'] = None
opt['pack'] = False
opt['calc_qvals'] = False
opt['q_pert_mth'] = 'shuffle_vars'
opt['q_iter'] = 20
self.update(opt)
def make_model_options(self):
"""Options for make_model method."""
opt_list = ['scale','mode', 'amax', 'engine']
return self._copy_from_list(opt_list)
def confidence_options(self):
"""Options for confidence method."""
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
'strict', 'crot', 'cov_center']
return self._copy_from_list(opt_list)
def pre_validation_options(self):
"""Options for pre_validation method."""
opt_list = ['amax', 'n_sets', 'val_engine']
return self._copy_from_list(opt_list)
class PcaOptionsDialog(OptionsDialog):
"""Options dialog for Principal Component Analysis.
"""
def __init__(self, data, options, input_names=['X']):
OptionsDialog.__init__(self, data, options, input_names)
glade_file = os.path.join(fluents.DATADIR, 'pca_options.glade')
notebook_name = "vbox1"
page_name = "Options"
self.add_page_from_glade(glade_file, notebook_name, page_name)
# connect signals to handlers
dic = {"on_amax_value_changed" : self.on_amax_changed,
"on_aopt_value_changed" : self.on_aopt_changed,
"auto_aopt_toggled" : self.auto_aopt_toggled,
"center_toggled" : self.center_toggled,
"on_scale_changed" : self.on_scale_changed,
"on_val_none" : self.val_toggled,
"on_val_cv" : self.cv_toggled,
"on_val_pert" : self.pert_toggled,
"on_cv_method_changed" : self.on_cv_method_changed,
"on_cv_sets_changed" : self.on_cv_sets_changed,
"on_pert_sets_changed" : self.on_pert_sets_changed,
"on_conf_toggled" : self.on_conf_toggled
}
self.wTree.signal_autoconnect(dic)
# set/ensure valid default values/ranges
amax_sb = self.wTree.get_widget("amax_spinbutton")
max_comp = min(data[0].shape) # max num of components
if self._options['amax']>max_comp:
logger.log('debug', 'amax default too large ... adjusting')
self._options['amax'] = max_comp
amax_sb.get_adjustment().set_all(self._options['amax'], 1, max_comp, 1, 0, 0)
# aopt spin button
aopt_sb = self.wTree.get_widget("aopt_spinbutton")
if self._options['aopt']>self._options['amax']:
self._options['aopt'] = self._options['amax'] + 1 - 1
aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, self._options['amax'], 1, 0, 0)
# scale
scale_cb = self.wTree.get_widget("scale_combobox")
scale_cb.set_active(0)
# validation frames
if self._options['calc_cv']==False:
cv_frame = self.wTree.get_widget("cv_frame")
cv_frame.set_sensitive(False)
if self._options['calc_pert']==False:
pert_frame = self.wTree.get_widget("pert_frame")
pert_frame.set_sensitive(False)
cv = self.wTree.get_widget("cv_method").set_active(0)
pm = self.wTree.get_widget("pert_method").set_active(0)
# confidence
if self._options['calc_conf']==True:
self.wTree.get_widget("subset_frame").set_sensitive(True)
else:
self.wTree.get_widget("subset_frame").set_sensitive(False)
def on_amax_changed(self, sb):
logger.log("debug", "amax changed: new value: %s" %sb.get_value_as_int())
amax = sb.get_value_as_int()
# update aopt if needed
if amax<self._options['aopt']:
self._options['aopt'] = amax
aopt_sb = self.wTree.get_widget("aopt_spinbutton")
aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, amax, 1, 0, 0)
self._options['amax'] = sb.get_value_as_int()
def on_aopt_changed(self, sb):
aopt = sb.get_value_as_int()
self._options['aopt'] = aopt
def auto_aopt_toggled(self, tb):
aopt_sb = self.wTree.get_widget("aopt_spinbutton")
if tb.get_active():
self._options['auto_aopt'] = True
aopt_sb.set_sensitive(False)
else:
self._options['auto_aopt'] = False
aopt_sb.set_sensitive(True)
def center_toggled(self, tb):
if tb.get_active():
self._options['center'] = True
else:
logger.log("debug", "centering set to False")
self._options['center'] = False
def on_scale_changed(self, cb):
scale = cb.get_active_text()
if scale=='Scores':
self._options['scale'] = 'scores'
elif scale=='Loadings':
self._options['scale'] = 'loads'
else:
raise IOError
def val_toggled(self, tb):
"""Callback for validation: None. """
cv_frame = self.wTree.get_widget("cv_frame")
pert_frame = self.wTree.get_widget("pert_frame")
cv_tb = self.wTree.get_widget("cv_toggle")
p_tb = self.wTree.get_widget("pert_toggle")
if tb.get_active():
self._options['calc_cv'] = False
self._options['calc_pert'] = False
cv_frame.set_sensitive(False)
pert_frame.set_sensitive(False)
cv_tb.set_sensitive(False)
p_tb.set_sensitive(False)
else:
p_tb.set_sensitive(True)
cv_tb.set_sensitive(True)
if p_tb.get_active():
pert_frame.set_sensitive(True)
self._options['calc_pert'] = True
if cv_tb.get_active():
cv_frame.set_sensitive(True)
self._options['calc_cv'] = True
def cv_toggled(self, tb):
cv_frame = self.wTree.get_widget("cv_frame")
if tb.get_active():
cv_frame.set_sensitive(True)
self._options['calc_cv'] = True
else:
cv_frame.set_sensitive(False)
self._options['calc_cv'] = False
def pert_toggled(self, tb):
pert_frame = self.wTree.get_widget("pert_frame")
if tb.get_active():
pert_frame.set_sensitive(True)
self._options['calc_pert'] = True
else:
pert_frame.set_sensitive(False)
self._options['calc_pert'] = False
def on_cv_method_changed(self, cb):
method = cb.get_active_text()
if method == 'Random':
self._options['cv_val_method'] = 'random'
def on_pert_method_changed(self, cb):
method = cb.get_active_text()
if method == 'Random diags':
self._options['pert_val_method'] = 'random_diag'
def on_cv_sets_changed(self, sb):
val = sb.get_value_as_int()
self._options['cv_val_sets'] = val
def on_pert_sets_changed(self, sb):
val = sb.get_value_as_int()
self._options['pert_val_sets'] = val
def on_conf_toggled(self, tb):
if tb.get_active():
self._options['calc_conf'] = False
self.wTree.get_widget("subset_frame").set_sensitive(False)
else:
self._options['calc_conf'] = True
self.wTree.get_widget("subset_frame").set_sensitive(True)
class PlsOptionsDialog(OptionsDialog):
"""Options dialog for Partial Least Squares Regression.
"""
def __init__(self, data, options, input_names=['X', 'Y']):
OptionsDialog.__init__(self, data, options, input_names)