"""This module contains bilinear models(Functions) """ import pygtk import gtk import gtk.glade from fluents.workflow import Function, OptionsDialog, Options from fluents.dataset import Dataset from fluents import plots, dataset, workflow, logger import scipy from engines import * from cx_stats import leverage, variances, hotelling from cx_utils import mat_center from validation import * import blmplots import engines import copy class Model(Function): """Base class of bilinear models. """ def __init__(self,id='johndoe',name='JohnDoe'): Function.__init__(self,id,name) self.name = name self._options = None self._data = {} self._dataset = {} self._packers = {} self.model = {} def clear(self): """ Clears model paramters """ self.model = {} self._data = {} self._packers = {} class PCA(Model): def __init__(self,id='pca',name='PCA'): Model.__init__(self,id,name) self._options = PcaOptions() def validation(self, amax, cv_val_sets, pert_val_sets, cv_val_method, pert_val_method): """Model validation and estimate of optimal numer of components. """ if self._options['calc_cv']: if cv_val_method == 'random': sep, aopt = pca_cv_val(self.model['E0'], amax, cv_val_sets) self.model['sep'] = sep if self._options['calc_pert']: if pert_val_method == 'random_diag': sep, aopt = pca_alter_val(self.model['E0'], amax, pert_val_sets) self.model['sep'] = sep if self._options['calc_cv']==False and self._options['calc_pert']==False: self.model['sep'] = None aopt = self._options['amax'] if self._options['auto_aopt']: logger.log("notice", "Auto aopt: " + str(aopt)) self._options['aopt'] = aopt if aopt==1: logger.log('notice', 'Aopt at first component!') def confidence(self, aopt, n_sets, alpha, p_center, crot, strict, cov_center ): """Returns a confidence measure for model parameters. Based on aopt. """ if aopt<2: aopt = 2 logger.log('notice','Hotellings T2 needs more than 1 comp.\n switching to 2!!') jk_segments = pca_jkP(self.model['E0'], aopt, n_sets) Pcal = self.model['P'][:,:aopt] tsq = hotelling(jk_segments, Pcal, p_center, cov_center, alpha, crot, strict) self.model['p_tsq'] = tsq def make_model(self, amax, mode, scale): """Model on optimal number of components. """ dat = pca(self.model['E0'], amax, scale, mode) # explained variance var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P']) dat['var_x'] = var_x dat['exp_var_x'] = exp_var_x #fixme### do_lev_s = False do_lev_v = False ##### if do_lev_s: # sample leverages tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of T-columns s_lev = leverage(amax, tnorm) dat['s_lev'] = s_lev if do_lev_v: # variable leverages v_lev = leverage(amax, dat['P']) dat['v_lev'] = v_lev self.model.update(dat) def as_dataset(self, param, dtype='dataset'): """Return model parameter as Dataset. """ if not param in self.model.keys(): return DX = self._dataset['X'] #input dataset dim_name_0, dim_name_1 = DX.get_dim_name() # samples ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)] # vars ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)] # components (hidden) pc_ids = ['_amax', map(str,range(self._options['amax'])) ] pc_ids_opt = ['_aopt', map(str, range(self._options['aopt'])) ] zero_dim = ['_doe', ['0']] # null dim, vector (hidden) match_ids = {'E':[ids_0, ids_1], 'E0':[ids_0, ids_1], 'P':[ids_1, pc_ids], 'T':[ids_0, pc_ids], 'W':[ids_1, pc_ids], 'p_tsq':[ids_1, zero_dim], 'rmsep':[pc_ids, zero_dim], 'var_leverages':[ids_1, zero_dim], 'sample_leverages':[pc_ids, zero_dim], 'exp_var_x': [pc_ids, zero_dim], 'var_x': [pc_ids, zero_dim], } out = Dataset(self.model[param], match_ids[param], name=param) return out def get_out_plots(self, options): out=[] for plt in options['out_plots']: #try: out.append(plt(self)) #except: # logger.log('debug', 'Plot: %s failed') %str(plt) return out def run_o(self, data): """Run pca with present options. """ self.clear() options = self._options for item in options.items(): print item self._dataset['X'] = data self._data['X'] = data.asarray().astype('max_comp: logger.log('debug', 'amax default too large ... adjusting') self._options['amax'] = max_comp amax_sb.get_adjustment().set_all(self._options['amax'], 1, max_comp, 1, 0, 0) # aopt spin button aopt_sb = self.wTree.get_widget("aopt_spinbutton") if self._options['aopt']>self._options['amax']: self._options['aopt'] = self._options['amax'] + 1 - 1 aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, self._options['amax'], 1, 0, 0) # scale scale_cb = self.wTree.get_widget("scale_combobox") scale_cb.set_active(0) # validation frames if self._options['calc_cv']==False: cv_frame = self.wTree.get_widget("cv_frame") cv_frame.set_sensitive(False) if self._options['calc_pert']==False: pert_frame = self.wTree.get_widget("pert_frame") pert_frame.set_sensitive(False) cv = self.wTree.get_widget("cv_method").set_active(0) pm = self.wTree.get_widget("pert_method").set_active(0) # confidence if self._options['calc_conf']==True: self.wTree.get_widget("subset_frame").set_sensitive(True) else: self.wTree.get_widget("subset_frame").set_sensitive(False) def on_amax_changed(self, sb): logger.log("debug", "amax changed: new value: %s" %sb.get_value_as_int()) amax = sb.get_value_as_int() # update aopt if needed if amax