This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
laydi/fluents/lib/blmfuncs.py

477 lines
16 KiB
Python

"""This module contains bilinear models(Functions)
"""
import gtk
from fluents.workflow import Function, OptionsDialog, Options
from fluents.dataset import Dataset
from fluents import plots, dataset, workflow, logger
import scipy
from engines import *
from cx_stats import leverage, variances, hotelling
from cx_utils import mat_center
from validation import *
import blmplots
import engines
class Model(Function):
"""Base class of bilinear models.
"""
def __init__(self,id='johndoe',name='JohnDoe'):
Function.__init__(self,id,name)
self.name = name
self._options = None
self._data = {}
self._dataset = {}
self._packers = {}
self.model = {}
def clear(self):
""" Clears model paramters
"""
self.model = {}
self._data = {}
self._packers = {}
class PCA(Model):
def __init__(self,id='pca',name='PCA'):
Model.__init__(self,id,name)
self._options = PcaOptions()
def pre_validation(self, amax, n_sets, val_engine):
"""Model calculations for maximum number of components.
"""
rmsep = val_engine(self.model['E0'], amax, n_sets)
self.model['rmsep'] = rmsep
self.model['aopt'] = rmsep.argmin()
def confidence(self, aopt, n_sets, alpha, p_center,
crot, strict, cov_center ):
"""Returns a confidence measure for model parameters.
Based on aopt.
"""
aopt = self.model['aopt']
jk_segments = pca_jkP(self.model['E0'], aopt, n_sets)
Pcal = self.model['P'][:,:aopt]
tsq = hotelling(jk_segments, Pcal, p_center,
cov_center, alpha, crot, strict)
self.model['p_tsq'] = tsq
def make_model(self, amax, mode, scale):
"""Model on optimal number of components.
"""
dat = pca(self.model['E0'], amax, scale, mode)
# explained variance
var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P'])
dat['var_x'] = var_x
dat['exp_var_x'] = exp_var_x
#fixme###
do_lev_s = False
do_lev_v = False
#####
if do_lev_s:
# sample leverages
tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of T-columns
s_lev = leverage(amax, tnorm)
dat['s_lev'] = s_lev
if do_lev_v:
# variable leverages
v_lev = leverage(amax, dat['P'])
dat['v_lev'] = v_lev
self.model.update(dat)
def as_dataset(self, param, dtype='dataset'):
"""Return model parameter as Dataset.
"""
if not param in self.model.keys():
return
DX = self._dataset['X'] #input dataset
dim_name_0, dim_name_1 = DX.get_dim_name()
# samples
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
# vars
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
# components (hidden)
pc_ids = ['_comp_a', map(str,range(self.model['aopt'])) ]
pc_ids_opt = ['_comp_o', map(str, range(self.model['aopt'])) ]
zero_dim = ['_doe', ['0']] # null dim, vector (hidden)
match_ids = {'E':[ids_0, ids_1],
'E0':[ids_0, ids_1],
'P':[ids_1, pc_ids],
'T':[ids_0, pc_ids],
'W':[ids_1, pc_ids],
'p_tsq':[ids_1, zero_dim],
'rmsep':[pc_ids, zero_dim],
'var_leverages':[ids_1, zero_dim],
'sample_leverages':[pc_ids, zero_dim],
'exp_var_x': [pc_ids, zero_dim],
'var_x': [pc_ids, zero_dim],
}
out = Dataset(self.model[param], match_ids[param], name=param)
return out
def get_out_plots(self, options):
out=[]
for plt in options['out_plots']:
#try:
out.append(plt(self))
#except:
# print plt
#logger.log('debug', 'Plot: %s failed') %plt
return out
def run_o(self, data):
"""Run pca with present options.
"""
self.clear()
options = self._options
self._dataset['X'] = data
self._data['X'] = data.asarray().astype('<f8')
if options['center']:
center = options['center_mth']
self.model['E0'] = center(self._data['X'])
else:
self.model['E0'] = data.asarray()
self.pre_validation(**options.pre_validation_options())
self.make_model(**options.make_model_options())
if options['calc_conf']:
self.confidence(**options.confidence_options())
out = [self.as_dataset(p) for p in options['out_data']]
for plt in self.get_out_plots(options):
out.append(plt)
return out
def run(self, data):
"""Run Pca with option gui.
"""
dialog = PcaOptionsDialog([data], self._options)
dialog.show_all()
response = dialog.run()
dialog.hide()
if response == gtk.RESPONSE_OK:
# set output data and plots
dialog.set_output()
#run with current data and options
return self.run_o(data)
class PLS(Model):
def __init__(self, id='pls', name='PLS'):
Model.__init__(self, id, name)
self._options = PlsOptions()
def pre_validation(self, amax, n_sets, val_engine):
"""Returns rmsec,rmsep for model.
"""
rmsep = val_engine(self.model['E0'], self.model['F0'],
amax, n_sets)
self.model['rmsep'] = rmsep.mean(0)
self.model['aopt'] = rmsep.mean(0).argmin()
def confidence(self, aopt, n_sets, alpha, p_center,
crot, strict, cov_center ):
"""Returns a confidence measure for model parameters
Supported parameters: W
"""
aopt = self.model['aopt']
jk_segments = pls_jkW(self.model['E0'], self.model['F0'],
aopt, n_sets)
Wcal = self.model['W'][:,:aopt]
tsq = hotelling(jk_segments, Wcal, p_center,
alpha, crot, strict, cov_center)
self.model['w_tsq'] = tsq
def permutation_confidence(self, a, b, aopt, reg, n_iter, algo,
sim_method):
"""Estimates sign. vars by controlling fdr."""
qvals_sorted, qvals = pls_qvals(a, b, aopt=None,
alpha=.4, n_iter=20, algo='pls',
sim_method='shuffle', )
def make_model(self, a, b, amax, scale, mode, engine):
"""Make model on amax components.
"""
dat = engine(a, b, amax, scale, mode)
self.model.update(dat)
def as_dataset(self, name, dtype='Dataset'):
"""Return any model parameter as Dataset
No ids matching
"""
if name not in self.model.keys():
return
DX, DY = self._dataset['X'], self._dataset['Y']
dim_name_0, dim_name_1 = DX.get_dim_name()
dim_name_2, dim_name_3 = DY.get_dim_name()
#samples
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
# x vars
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
# y vars
ids_3 = [dim_name_3, DY.get_identifiers(dim_name_3, sorted=True)]
# components (hidden)
pc_ids = ['_comp', map(str, range(self.model['aopt']))]
zero_dim = ['_doe',['0']] # null dim, vector (hidden)
match_ids = {'E':[ids_0, ids_1],
'P':[ids_1, pc_ids],
'T':[ids_0, pc_ids],
'W': [ids_1, pc_ids],
'R': [ids_1, pc_ids],
'Q':[ids_3, pc_ids],
'F':[ids_0, ids_3],
'B':[ids_1, ids_3],
'qval':[ids_1, zero_dim],
'qval_sorted':[ids_1, zero_dim],
'w_tsq':[ids_1, zero_dim],
'rmsep':[pc_ids, zero_dim],
}
array = self.model[name]
M = Dataset(array,identifiers=match_ids[name],name=name)
return M
def get_out_plots(self, options):
out=[]
for plt in options['out_plots']:
#try:
out.append(plt(self))
#except:
# logger.log('debug', 'Plot: %s failed' %plt)
return out
def run_o(self, a, b):
options = self._options
self._dataset['X'] = a
self._dataset['Y'] = b
self._data['X'] = a.asarray()
self._data['Y'] = b.asarray()
if options['center']:
self.model['E0'] = options['center_mth'](self._data['X'])
self.model['F0'] = options['center_mth'](self._data['Y'])
else:
self.model['E0'] = self._data['X']
self.model['F0'] = self._data['Y']
self.pre_validation(**options.pre_validation_options())
self.make_model(self.model['E0'], self.model['F0'],
**options.make_model_options())
# variance captured
var_x, exp_var_x = variances(self.model['E0'], self.model['T'], self.model['P'])
self.model['var_x'] = var_x
self.model['exp_var_x'] = exp_var_x
var_y, exp_var_y = variances(self.model['F0'], self.model['T'], self.model['Q'])
self.model['var_y'] = var_y
self.model['exp_var_y'] = exp_var_y
if options['calc_conf']:
self.confidence(**options.confidence_options())
out = [self.as_dataset(p) for p in options['out_data']]
for plt in self.get_out_plots(options):
out.append(plt)
return out
def run(self, a, b):
"""Run Pls with option gui.
"""
dialog = PlsOptionsDialog([a, b], self._options)
dialog.show_all()
response = dialog.run()
dialog.hide()
if response == gtk.RESPONSE_OK:
# set output data and plots
dialog.set_output()
#run with current data and options
return self.run_o(a, b)
class Packer:
"""A compression object used to speed up model calculations.
Often used in conjunction with crossvalidation and perturbations
analysis.
"""
def __init__(self,array):
self._shape = array.shape
self._array = array
self._packed_data = None
def expand(self,a):
if self._inflater!=None:
return dot(self._inflater,a)
def collapse(self,axis=None,mode='svd'):
if not axis:
axis = argmin(self._array.shape) # default is the smallest dim
if axis == 1:
self._array = self._array.T
u,s,vt = svd(self._array,full_matrices=0)
self._inflater = vt.T
self._packed_data = u*s
return self._packed_data
def get_packed_data(self):
return self._packed_data
class PcaOptions(Options):
"""Options for Principal Component Analysis.
"""
def __init__(self):
Options.__init__(self)
self._set_default()
def _set_default(self):
opt = {}
opt['algo'] = 'pca'
opt['engine'] = engines.pca
opt['mode'] = 'normal' # how much info to calculate
opt['lod'] = 'compact' # how much info to store
opt['amax'] = 5
opt['aopt'] = 5
opt['center'] = True
opt['center_mth'] = mat_center
opt['scale'] = 'scores'
opt['calc_conf'] = True
opt['n_sets'] = 5
opt['strict'] = True
opt['p_center'] = 'med'
opt['alpha'] = .8
opt['cov_center'] = 'med'
opt['crot'] = True
opt['val_engine'] = pca_alter_val
opt['val_n_sets'] = 10
opt['all_data'] = [('T', 'scores', True),
('P', 'loadings', True),
('E','residuals', False),
('p_tsq', 't2', False),
('rmsep', 'root mean square error of prediction', False)
]
opt['all_plots'] = [(blmplots.PcaScorePlot, 'Scores', True),
(blmplots.PcaLoadingPlot, 'Loadings', True),
(blmplots.LineViewXc, 'Line view', True)
]
opt['out_data'] = ['T','P', 'p_tsq']
opt['out_plots'] = [blmplots.PcaScorePlot,blmplots.PcaLoadingPlot,blmplots.LineViewXc]
self.update(opt)
def make_model_options(self):
"""Options for make_model method."""
opt_list = ['scale','mode', 'amax']
return self._copy_from_list(opt_list)
def confidence_options(self):
"""Options for confidence method."""
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
'strict', 'crot', 'cov_center']
return self._copy_from_list(opt_list)
def pre_validation_options(self):
"""Options for pre_validation method."""
opt_list = ['amax', 'n_sets', 'val_engine']
return self._copy_from_list(opt_list)
class PlsOptions(Options):
"""Options for Partial Least Squares Regression.
"""
def __init__(self):
Options.__init__(self)
self._set_default()
def _set_default(self):
opt = {}
opt['algo'] = 'pls'
opt['engine'] = engines.pls
opt['mode'] = 'normal' # how much info to calculate
opt['lod'] = 'compact' # how much info to store
opt['amax'] = 3
opt['aopt'] = 3
opt['center'] = True
opt['center_mth'] = mat_center
opt['scale'] = 'scores'
opt['calc_conf'] = True
opt['n_sets'] = 10
opt['strict'] = True
opt['p_center'] = 'med'
opt['alpha'] = .2
opt['cov_center'] = 'med'
opt['crot'] = True
opt['val_engine'] = w_pls_cv_val
opt['all_data'] = [('T', 'scores', True),
('P', 'loadings', True),
('E','residuals', False),
('p_tsq', 't2', False),
('rmsep', 'root mean square error of prediction', False)
]
opt['all_plots'] = [(blmplots.PlsScorePlot, 'Scores', True),
(blmplots.PlsLoadingPlot, 'Loadings', True),
(blmplots.LineViewXc, 'Line view', True)]
opt['out_plots'] = [blmplots.PlsScorePlot,
blmplots.PlsLoadingPlot,
blmplots.LineViewXc]
opt['out_data'] = None
opt['pack'] = False
opt['calc_qvals'] = False
opt['q_pert_mth'] = 'shuffle_vars'
opt['q_iter'] = 20
self.update(opt)
def make_model_options(self):
"""Options for make_model method."""
opt_list = ['scale','mode', 'amax', 'engine']
return self._copy_from_list(opt_list)
def confidence_options(self):
"""Options for confidence method."""
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
'strict', 'crot', 'cov_center']
return self._copy_from_list(opt_list)
def pre_validation_options(self):
"""Options for pre_validation method."""
opt_list = ['amax', 'n_sets', 'val_engine']
return self._copy_from_list(opt_list)
class PcaOptionsDialog(OptionsDialog):
"""Options dialog for Principal Component Analysis.
"""
def __init__(self, data, options, input_names=['X']):
OptionsDialog.__init__(self, data, options, input_names)
class PlsOptionsDialog(OptionsDialog):
"""Options dialog for Partial Least Squares Regression.
"""
def __init__(self, data, options, input_names=['X', 'Y']):
OptionsDialog.__init__(self, data, options, input_names)