"""This module contains bilinear models(Functions) """ import os import pygtk import gtk import gtk.glade import fluents from fluents.workflow import Function, OptionsDialog, Options from fluents.dataset import Dataset from fluents import plots, dataset, workflow, logger import scipy from engines import * from cx_stats import leverage, variances, hotelling from cx_utils import mat_center from validation import * import blmplots import engines import copy class Model(Function): """Base class of bilinear models. """ def __init__(self,id='johndoe',name='JohnDoe'): Function.__init__(self,id,name) self.name = name self._options = None self._data = {} self._dataset = {} self._packers = {} self.model = {} def clear(self): """ Clears model paramters """ self.model = {} self._data = {} self._packers = {} class PCA(Model): def __init__(self,id='pca',name='PCA'): Model.__init__(self,id,name) self._options = PcaOptions() def validation(self, amax, cv_val_sets, pert_val_sets, cv_val_method, pert_val_method): """Model validation and estimate of optimal numer of components. """ if self._options['calc_cv']: if cv_val_method == 'random': sep, aopt = pca_cv_val(self.model['E0'], amax, cv_val_sets) self.model['sep'] = sep if self._options['calc_pert']: if pert_val_method == 'random_diag': sep, aopt = pca_alter_val(self.model['E0'], amax, pert_val_sets) self.model['sep'] = sep if self._options['calc_cv']==False and self._options['calc_pert']==False: self.model['sep'] = None aopt = self._options['amax'] if self._options['auto_aopt']: logger.log("notice", "Auto aopt: " + str(aopt)) self._options['aopt'] = aopt if aopt==1: logger.log('notice', 'Aopt at first component!') def confidence(self, aopt, n_sets, alpha, p_center, crot, strict, cov_center ): """Returns a confidence measure for model parameters. Based on aopt. """ if aopt<2: aopt = 2 logger.log('notice','Hotellings T2 needs more than 1 comp.\n switching to 2!!') jk_segments = pca_jkP(self.model['E0'], aopt, n_sets) Pcal = self.model['P'][:,:aopt] # add the scale to P tnorm = scipy.apply_along_axis(norm, 0, self.model['T'][:,:aopt]) Pcal = Pcal*tnorm tsq = hotelling(jk_segments, Pcal, p_center, cov_center, alpha, crot, strict) self.model['p_tsq'] = tsq def make_model(self, amax, mode, scale): """Model on optimal number of components. """ dat = pca(self.model['E0'], amax, scale, mode) # explained variance var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P']) dat['var_x'] = var_x dat['exp_var_x'] = exp_var_x #fixme### do_lev_s = False do_lev_v = False ##### if do_lev_s: # sample leverages tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of Ts s_lev = leverage(amax, tnorm) dat['s_lev'] = s_lev if do_lev_v: # variable leverages v_lev = leverage(amax, dat['P']) dat['v_lev'] = v_lev self.model.update(dat) def as_dataset(self, param, dtype='dataset'): """Return model parameter as Dataset. """ if not param in self.model.keys(): return DX = self._dataset['X'] #input dataset dim_name_0, dim_name_1 = DX.get_dim_name() # samples ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)] # vars ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)] # components (hidden) pc_ids = ['_amax', map(str,range(self._options['amax'])) ] pc_ids_opt = ['_aopt', map(str, range(self._options['aopt'])) ] zero_dim = ['_doe', ['0']] # null dim, vector (hidden) match_ids = {'E':[ids_0, ids_1], 'E0':[ids_0, ids_1], 'P':[ids_1, pc_ids], 'T':[ids_0, pc_ids], 'W':[ids_1, pc_ids], 'p_tsq':[ids_1, zero_dim], 'rmsep':[pc_ids, zero_dim], 'var_leverages':[ids_1, zero_dim], 'sample_leverages':[pc_ids, zero_dim], 'exp_var_x': [pc_ids, zero_dim], 'var_x': [pc_ids, zero_dim], } out = Dataset(self.model[param], match_ids[param], name=param) return out def get_out_plots(self, options): out=[] for plt in options['out_plots']: #try: out.append(plt(self)) #except: # logger.log('debug', 'Plot: %s failed') %str(plt) return out def run_o(self, data): """Run pca with present options. """ self.clear() options = self._options for item in options.items(): print item self._dataset['X'] = data self._data['X'] = data.asarray().astype('n: val_engine = w_pls_cv_val else: val_engine = pls_val if self._options['calc_cv']==True: rmsep, aopt = val_engine(self.model['E0'], self.model['F0'], amax, n_sets) self.model['rmsep'] = rmsep[:,:-1] self.model['aopt'] = aopt else: self.model['rmsep'] = None self.model['aopt'] = self._options['aopt'] def confidence(self, aopt, n_sets, alpha, p_center, crot, strict, cov_center ): """Returns a confidence measure for model parameters Supported parameters: W """ aopt = self.model['aopt'] if self._options['calc_conf']: jk_segments = pls_jkW(self.model['E0'], self.model['F0'], aopt, n_sets) Wcal = self.model['W'][:,:aopt] # ensure that Wcal is scaled tnorm = scipy.apply_along_axis(norm, 0, self.model['T'][:,:aopt]) Wcal = Wcal*tnorm tsq = hotelling(jk_segments, Wcal, p_center, alpha, crot, strict, cov_center) self.model['w_tsq'] = tsq else: self.model['w_tsq'] = None def permutation_confidence(self, a, b, aopt, reg, n_iter, algo, sim_method): """Estimates cut off on significant vars by controlling fdr.""" if self._options['calc_qvals']==True: qvals_sorted, qvals = pls_qvals(a, b, aopt=None, alpha=reg, n_iter=n_iter, algo='pls', sim_method=sim_method) self.model['qval'] = qvals self.model['qval_sorted'] = qvals_sorted else: self.model['qval'] = None self.model['qval_sorted'] = None def make_model(self, a, b, amax, scale, mode, engine): """Make model on amax components. """ dat = engine(a, b, amax, scale, mode) self.model.update(dat) def as_dataset(self, name, dtype='Dataset'): """Return any model parameter as Dataset No ids matching """ if name not in self.model.keys(): return DX, DY = self._dataset['X'], self._dataset['Y'] dim_name_0, dim_name_1 = DX.get_dim_name() dim_name_2, dim_name_3 = DY.get_dim_name() #samples ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)] # x vars ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)] # y vars ids_3 = [dim_name_3, DY.get_identifiers(dim_name_3, sorted=True)] # components (hidden) pc_ids = ['_comp', map(str, range(self._options['amax']))] pc_ids_opt = ['_comp', map(str, range(self.model['aopt']))] zero_dim = ['_doe',['0']] # null dim, vector (hidden) match_ids = {'E':[ids_0, ids_1], 'P':[ids_1, pc_ids], 'T':[ids_0, pc_ids], 'W': [ids_1, pc_ids], 'R': [ids_1, pc_ids], 'Q':[ids_3, pc_ids], 'F':[ids_0, ids_3], 'B':[ids_1, ids_3], 'qval':[ids_1, zero_dim], 'qval_sorted':[ids_1, zero_dim], 'w_tsq':[ids_1, zero_dim], 'rmsep':[ids_3, pc_ids], } array = self.model[name] M = Dataset(array, identifiers=match_ids[name], name=name) return M def get_out_plots(self, options): out=[] for plt in options['out_plots']: #try: out.append(plt(self)) #except: # logger.log('debug', 'Plot: %s failed' %plt) return out def run_o(self, a, b): """Run PLS with present options.""" options = self._options self._dataset['X'] = a self._dataset['Y'] = b self._data['X'] = a.asarray() self._data['Y'] = b.asarray() if options['center']: self.model['E0'] = options['center_mth'](self._data['X']) self.model['F0'] = options['center_mth'](self._data['Y']) else: self.model['E0'] = self._data['X'] self.model['F0'] = self._data['Y'] self.validation(**options.validation_options()) self.make_model(self.model['E0'], self.model['F0'], **options.make_model_options()) # variance captured var_x, exp_var_x = variances(self.model['E0'], self.model['T'], self.model['P']) self.model['var_x'] = var_x self.model['exp_var_x'] = exp_var_x var_y, exp_var_y = variances(self.model['F0'], self.model['T'], self.model['Q']) self.model['var_y'] = var_y self.model['exp_var_y'] = exp_var_y if options['calc_conf']: self.confidence(**options.confidence_options()) out = [self.as_dataset(p) for p in options['out_data']] for plt in self.get_out_plots(options): out.append(plt) return out def run(self, a, b): """Run PLS with option gui. """ dialog = PlsOptionsDialog([a, b], self._options) dialog.show_all() response = dialog.run() dialog.hide() if response == gtk.RESPONSE_OK: # set output data and plots dialog.set_output() #run with current data and options for key, val in self._options.items(): print (key, val) return self.run_o(a, b) class Packer: """A compression object used to speed up model calculations. Often used in conjunction with crossvalidation and perturbations analysis. """ def __init__(self,array): self._shape = array.shape self._array = array self._packed_data = None def expand(self,a): if self._inflater!=None: return dot(self._inflater,a) def collapse(self,axis=None,mode='svd'): if not axis: axis = argmin(self._array.shape) # default is the smallest dim if axis == 1: self._array = self._array.T u, s, vt = svd(self._array,full_matrices=0) self._inflater = vt.T self._packed_data = u*s return self._packed_data def get_packed_data(self): return self._packed_data class PcaOptions(Options): """Options for Principal Component Analysis. """ def __init__(self): Options.__init__(self) self._set_default() def _set_default(self): opt = {} opt['algo'] = 'pca' opt['engine'] = engines.pca opt['mode'] = 'normal' # how much info to calculate opt['amax'] = 10 opt['aopt'] = 100 opt['auto_aopt'] = False opt['center'] = True opt['center_mth'] = mat_center opt['scale'] = 'scores' opt['calc_conf'] = False opt['n_sets'] = 5 opt['strict'] = True opt['p_center'] = 'med' opt['alpha'] = .8 opt['cov_center'] = 'med' opt['crot'] = True opt['calc_cv'] = False opt['calc_pert'] = True opt['pert_val_method'] = 'random_diag' opt['cv_val_method'] = 'random' opt['cv_val_sets'] = 10 opt['pert_val_sets'] = 10 opt['all_data'] = [('T', 'scores', True), ('P', 'loadings', True), ('E','residuals', False), ('p_tsq', 't2', False), ('rmsep', 'RMSEP', False) ] # (class, name, sensitive, ticked) opt['all_plots'] = [(blmplots.PcaScorePlot, 'Scores', True), (blmplots.PcaLoadingPlot, 'Loadings', True), (blmplots.LineViewXc, 'Line view', True), (blmplots.PredictionErrorPlot, 'Residual Error', False) ] opt['out_data'] = ['T','P', 'p_tsq'] opt['out_plots'] = [blmplots.PcaScorePlot,blmplots.PcaLoadingPlot,blmplots.LineViewXc] self.update(opt) def make_model_options(self): """Options for make_model method.""" opt_list = ['scale', 'mode', 'amax'] return self._copy_from_list(opt_list) def confidence_options(self): """Options for confidence method.""" opt_list = ['n_sets', 'aopt', 'alpha', 'p_center', 'strict', 'crot', 'cov_center'] return self._copy_from_list(opt_list) def validation_options(self): """Options for pre_validation method.""" opt_list = ['amax', 'cv_val_sets', 'pert_val_sets', 'cv_val_method', 'pert_val_method'] return self._copy_from_list(opt_list) class PlsOptions(Options): """Options for Partial Least Squares Regression. """ def __init__(self): Options.__init__(self) self._set_default() def _set_default(self): opt = {} opt['algo'] = 'pls' opt['engine'] = engines.pls opt['mode'] = 'normal' # how much info to calculate opt['amax'] = 10 opt['aopt'] = 10 opt['auto_aopt'] = False opt['center'] = True opt['center_mth'] = mat_center opt['scale'] = 'scores' opt['calc_conf'] = False opt['n_sets'] = 5 opt['strict'] = True opt['p_center'] = 'med' opt['alpha'] = .8 opt['cov_center'] = 'med' opt['crot'] = True opt['calc_cv'] = False opt['cv_val_method'] = 'random' opt['cv_val_sets'] = opt['n_sets'] opt['all_data'] = [('T', 'scores', True), ('P', 'loadings', True), ('E','residuals', False), ('p_tsq', 't2', False), ('rmsep', 'RMSEP', False) ] # (class, name, sensitive, ticked) opt['all_plots'] = [(blmplots.PlsScorePlot, 'Scores', True), (blmplots.PlsLoadingPlot, 'Loadings', True), (blmplots.LineViewXc, 'Line view', True), (blmplots.PredictionErrorPlot, 'Residual Error', False), (blmplots.RMSEPPlot, 'RMSEP', False) ] opt['out_data'] = ['T','P', 'p_tsq'] opt['out_plots'] = [blmplots.PlsScorePlot,blmplots.PlsLoadingPlot,blmplots.LineViewXc] opt['out_data'] = None opt['pack'] = False opt['calc_qvals'] = False opt['q_pert_method'] = 'shuffle_rows' opt['q_iter'] = 20 self.update(opt) def make_model_options(self): """Options for make_model method.""" opt_list = ['scale','mode', 'amax', 'engine'] return self._copy_from_list(opt_list) def confidence_options(self): """Options for confidence method.""" opt_list = ['n_sets', 'aopt', 'alpha', 'p_center', 'strict', 'crot', 'cov_center'] return self._copy_from_list(opt_list) def validation_options(self): """Options for pre_validation method.""" opt_list = ['amax', 'n_sets', 'cv_val_method'] return self._copy_from_list(opt_list) def permutation_confidence(self): opt_list = ['q_pert_method', 'q_iter'] return self._copy_from_list(opt_list) class PcaOptionsDialog(OptionsDialog): """Options dialog for Principal Component Analysis. """ def __init__(self, data, options, input_names=['X']): OptionsDialog.__init__(self, data, options, input_names) #glade_file = os.path.join(fluents.DATADIR, 'pca_options.glade') glade_file = os.path.join("/home/flatberg/fluents/fluents/", 'pca_options.glade') notebook_name = "vbox1" page_name = "Options" self.add_page_from_glade(glade_file, notebook_name, page_name) # connect signals to handlers dic = {"on_amax_value_changed" : self.on_amax_changed, "on_aopt_value_changed" : self.on_aopt_changed, "auto_aopt_toggled" : self.auto_aopt_toggled, "center_toggled" : self.center_toggled, #"on_scale_changed" : self.on_scale_changed, "on_val_none" : self.val_toggled, "on_val_cv" : self.cv_toggled, "on_val_pert" : self.pert_toggled, "on_cv_method_changed" : self.on_cv_method_changed, "on_cv_sets_changed" : self.on_cv_sets_changed, "on_pert_sets_changed" : self.on_pert_sets_changed, "on_conf_toggled" : self.on_conf_toggled, "on_subset_loc_changed" : self.on_subset_loc_changed, "on_cov_loc_changed" : self.on_cov_loc_changed, "on_alpha_changed" : self.on_alpha_changed, "on_rot_changed" : self.on_rot_changed } self.wTree.signal_autoconnect(dic) # set/ensure valid default values/ranges # amax_sb = self.wTree.get_widget("amax_spinbutton") max_comp = min(data[0].shape) # max num of components if self._options['amax']>max_comp: logger.log('debug', 'amax default too large ... adjusting') self._options['amax'] = max_comp amax_sb.get_adjustment().set_all(self._options['amax'], 1, max_comp, 1, 0, 0) # aopt spin button aopt_sb = self.wTree.get_widget("aopt_spinbutton") if self._options['aopt']>self._options['amax']: self._options['aopt'] = self._options['amax'] + 1 - 1 aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, self._options['amax'], 1, 0, 0) # scale # scale_cb = self.wTree.get_widget("scale_combobox") # scale_cb.set_active(0) # validation frames if self._options['calc_cv']==False: cv_frame = self.wTree.get_widget("cv_frame") cv_frame.set_sensitive(False) if self._options['calc_pert']==False: pert_frame = self.wTree.get_widget("pert_frame") pert_frame.set_sensitive(False) cv = self.wTree.get_widget("cv_method").set_active(0) pm = self.wTree.get_widget("pert_method").set_active(0) # confidence if self._options['calc_conf']==True: self.wTree.get_widget("subset_expander").set_sensitive(True) else: self.wTree.get_widget("subset_expander").set_sensitive(False) cb = self.wTree.get_widget("subset_loc") _m = {'med': 0, 'mean': 1, 'full_model': 2} cb.set_active(_m.get(self._options['p_center'])) cb = self.wTree.get_widget("cov_loc") _m = {'med': 0, 'mean': 1} cb.set_active(_m.get(self._options['cov_center'])) hs = self.wTree.get_widget("alpha_scale") hs.set_value(self._options['alpha']) def on_amax_changed(self, sb): logger.log("debug", "amax changed: new value: %s" %sb.get_value_as_int()) amax = sb.get_value_as_int() # update aopt if needed if amaxmax_comp: logger.log('debug', 'amax default too large ... adjusting') self._options['amax'] = max_comp amax_sb.get_adjustment().set_all(self._options['amax'], 1, max_comp, 1, 0, 0) # aopt spin button aopt_sb = self.wTree.get_widget("aopt_spinbutton") if self._options['aopt']>self._options['amax']: self._options['aopt'] = self._options['amax'] + 1 - 1 aopt_sb.get_adjustment().set_all(self._options['aopt'], 1, self._options['amax'], 1, 0, 0) # scale # scale_cb = self.wTree.get_widget("scale_combobox") # scale_cb.set_active(0) # validation frames if self._options['calc_cv']==False: cv_frame = self.wTree.get_widget("cv_frame") cv_frame.set_sensitive(False) cv = self.wTree.get_widget("cv_method").set_active(0) # confidence if self._options['calc_conf']==True: self.wTree.get_widget("subset_expander").set_sensitive(True) else: self.wTree.get_widget("subset_expander").set_sensitive(False) cb = self.wTree.get_widget("subset_loc") _m = {'med': 0, 'mean': 1, 'full_model': 2} cb.set_active(_m.get(self._options['p_center'])) cb = self.wTree.get_widget("cov_loc") _m = {'med': 0, 'mean': 1} cb.set_active(_m.get(self._options['cov_center'])) hs = self.wTree.get_widget("alpha_scale") hs.set_value(self._options['alpha']) tb = self.wTree.get_widget("qvals") tb.set_sensitive(True) def on_amax_changed(self, sb): logger.log("debug", "amax changed: new value: %s" %sb.get_value_as_int()) amax = sb.get_value_as_int() # update aopt if needed if amax