477 lines
16 KiB
Python
477 lines
16 KiB
Python
"""This module contains bilinear models(Functions)
|
|
"""
|
|
|
|
import gtk
|
|
from fluents.workflow import Function, OptionsDialog, Options
|
|
from fluents.dataset import Dataset
|
|
from fluents import plots, dataset, workflow, logger
|
|
import scipy
|
|
from engines import *
|
|
from cx_stats import leverage, variances, hotelling
|
|
from cx_utils import mat_center
|
|
from validation import *
|
|
import blmplots
|
|
import engines
|
|
|
|
|
|
class Model(Function):
|
|
"""Base class of bilinear models.
|
|
"""
|
|
def __init__(self,id='johndoe',name='JohnDoe'):
|
|
Function.__init__(self,id,name)
|
|
self.name = name
|
|
self._options = None
|
|
self._data = {}
|
|
self._dataset = {}
|
|
self._packers = {}
|
|
self.model = {}
|
|
|
|
def clear(self):
|
|
""" Clears model paramters
|
|
"""
|
|
self.model = {}
|
|
self._data = {}
|
|
self._packers = {}
|
|
|
|
|
|
class PCA(Model):
|
|
def __init__(self,id='pca',name='PCA'):
|
|
Model.__init__(self,id,name)
|
|
self._options = PcaOptions()
|
|
|
|
def pre_validation(self, amax, n_sets, val_engine):
|
|
"""Model calculations for maximum number of components.
|
|
"""
|
|
rmsep = val_engine(self.model['E0'], amax, n_sets)
|
|
self.model['rmsep'] = rmsep
|
|
self.model['aopt'] = rmsep.argmin()
|
|
|
|
def confidence(self, aopt, n_sets, alpha, p_center,
|
|
crot, strict, cov_center ):
|
|
"""Returns a confidence measure for model parameters.
|
|
Based on aopt.
|
|
"""
|
|
aopt = self.model['aopt']
|
|
jk_segments = pca_jkP(self.model['E0'], aopt, n_sets)
|
|
Pcal = self.model['P'][:,:aopt]
|
|
tsq = hotelling(jk_segments, Pcal, p_center,
|
|
cov_center, alpha, crot, strict)
|
|
self.model['p_tsq'] = tsq
|
|
|
|
def make_model(self, amax, mode, scale):
|
|
"""Model on optimal number of components.
|
|
"""
|
|
dat = pca(self.model['E0'], amax, scale, mode)
|
|
|
|
# explained variance
|
|
var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P'])
|
|
dat['var_x'] = var_x
|
|
dat['exp_var_x'] = exp_var_x
|
|
|
|
#fixme###
|
|
do_lev_s = False
|
|
do_lev_v = False
|
|
#####
|
|
if do_lev_s:
|
|
# sample leverages
|
|
tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of T-columns
|
|
s_lev = leverage(amax, tnorm)
|
|
dat['s_lev'] = s_lev
|
|
if do_lev_v:
|
|
# variable leverages
|
|
v_lev = leverage(amax, dat['P'])
|
|
dat['v_lev'] = v_lev
|
|
|
|
self.model.update(dat)
|
|
|
|
def as_dataset(self, param, dtype='dataset'):
|
|
"""Return model parameter as Dataset.
|
|
"""
|
|
if not param in self.model.keys():
|
|
return
|
|
DX = self._dataset['X'] #input dataset
|
|
dim_name_0, dim_name_1 = DX.get_dim_name()
|
|
# samples
|
|
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
|
|
# vars
|
|
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
|
|
# components (hidden)
|
|
pc_ids = ['_comp_a', map(str,range(self.model['aopt'])) ]
|
|
pc_ids_opt = ['_comp_o', map(str, range(self.model['aopt'])) ]
|
|
zero_dim = ['_doe', ['0']] # null dim, vector (hidden)
|
|
match_ids = {'E':[ids_0, ids_1],
|
|
'E0':[ids_0, ids_1],
|
|
'P':[ids_1, pc_ids],
|
|
'T':[ids_0, pc_ids],
|
|
'W':[ids_1, pc_ids],
|
|
'p_tsq':[ids_1, zero_dim],
|
|
'rmsep':[pc_ids, zero_dim],
|
|
'var_leverages':[ids_1, zero_dim],
|
|
'sample_leverages':[pc_ids, zero_dim],
|
|
'exp_var_x': [pc_ids, zero_dim],
|
|
'var_x': [pc_ids, zero_dim],
|
|
}
|
|
|
|
out = Dataset(self.model[param], match_ids[param], name=param)
|
|
return out
|
|
|
|
def get_out_plots(self, options):
|
|
out=[]
|
|
for plt in options['out_plots']:
|
|
#try:
|
|
out.append(plt(self))
|
|
#except:
|
|
# print plt
|
|
#logger.log('debug', 'Plot: %s failed') %plt
|
|
return out
|
|
|
|
def run_o(self, data):
|
|
"""Run pca with present options.
|
|
"""
|
|
self.clear()
|
|
options = self._options
|
|
self._dataset['X'] = data
|
|
self._data['X'] = data.asarray().astype('<f8')
|
|
if options['center']:
|
|
center = options['center_mth']
|
|
self.model['E0'] = center(self._data['X'])
|
|
else:
|
|
self.model['E0'] = data.asarray()
|
|
|
|
self.pre_validation(**options.pre_validation_options())
|
|
self.make_model(**options.make_model_options())
|
|
if options['calc_conf']:
|
|
self.confidence(**options.confidence_options())
|
|
|
|
out = [self.as_dataset(p) for p in options['out_data']]
|
|
for plt in self.get_out_plots(options):
|
|
out.append(plt)
|
|
return out
|
|
|
|
def run(self, data):
|
|
"""Run Pca with option gui.
|
|
"""
|
|
dialog = PcaOptionsDialog([data], self._options)
|
|
dialog.show_all()
|
|
response = dialog.run()
|
|
dialog.hide()
|
|
|
|
if response == gtk.RESPONSE_OK:
|
|
# set output data and plots
|
|
dialog.set_output()
|
|
|
|
#run with current data and options
|
|
return self.run_o(data)
|
|
|
|
|
|
class PLS(Model):
|
|
def __init__(self, id='pls', name='PLS'):
|
|
Model.__init__(self, id, name)
|
|
self._options = PlsOptions()
|
|
|
|
def pre_validation(self, amax, n_sets, val_engine):
|
|
"""Returns rmsec,rmsep for model.
|
|
"""
|
|
rmsep = val_engine(self.model['E0'], self.model['F0'],
|
|
amax, n_sets)
|
|
self.model['rmsep'] = rmsep.mean(0)
|
|
self.model['aopt'] = rmsep.mean(0).argmin()
|
|
|
|
def confidence(self, aopt, n_sets, alpha, p_center,
|
|
crot, strict, cov_center ):
|
|
"""Returns a confidence measure for model parameters
|
|
Supported parameters: W
|
|
"""
|
|
aopt = self.model['aopt']
|
|
jk_segments = pls_jkW(self.model['E0'], self.model['F0'],
|
|
aopt, n_sets)
|
|
Wcal = self.model['W'][:,:aopt]
|
|
tsq = hotelling(jk_segments, Wcal, p_center,
|
|
alpha, crot, strict, cov_center)
|
|
self.model['w_tsq'] = tsq
|
|
|
|
def permutation_confidence(self, a, b, aopt, reg, n_iter, algo,
|
|
sim_method):
|
|
"""Estimates sign. vars by controlling fdr."""
|
|
|
|
qvals_sorted, qvals = pls_qvals(a, b, aopt=None,
|
|
alpha=.4, n_iter=20, algo='pls',
|
|
sim_method='shuffle', )
|
|
|
|
|
|
def make_model(self, a, b, amax, scale, mode, engine):
|
|
"""Make model on amax components.
|
|
"""
|
|
dat = engine(a, b, amax, scale, mode)
|
|
self.model.update(dat)
|
|
|
|
def as_dataset(self, name, dtype='Dataset'):
|
|
"""Return any model parameter as Dataset
|
|
No ids matching
|
|
"""
|
|
if name not in self.model.keys():
|
|
return
|
|
DX, DY = self._dataset['X'], self._dataset['Y']
|
|
dim_name_0, dim_name_1 = DX.get_dim_name()
|
|
dim_name_2, dim_name_3 = DY.get_dim_name()
|
|
#samples
|
|
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
|
|
# x vars
|
|
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
|
|
# y vars
|
|
ids_3 = [dim_name_3, DY.get_identifiers(dim_name_3, sorted=True)]
|
|
# components (hidden)
|
|
pc_ids = ['_comp', map(str, range(self.model['aopt']))]
|
|
zero_dim = ['_doe',['0']] # null dim, vector (hidden)
|
|
|
|
match_ids = {'E':[ids_0, ids_1],
|
|
'P':[ids_1, pc_ids],
|
|
'T':[ids_0, pc_ids],
|
|
'W': [ids_1, pc_ids],
|
|
'R': [ids_1, pc_ids],
|
|
'Q':[ids_3, pc_ids],
|
|
'F':[ids_0, ids_3],
|
|
'B':[ids_1, ids_3],
|
|
'qval':[ids_1, zero_dim],
|
|
'qval_sorted':[ids_1, zero_dim],
|
|
'w_tsq':[ids_1, zero_dim],
|
|
'rmsep':[pc_ids, zero_dim],
|
|
}
|
|
|
|
array = self.model[name]
|
|
M = Dataset(array,identifiers=match_ids[name],name=name)
|
|
return M
|
|
|
|
def get_out_plots(self, options):
|
|
out=[]
|
|
for plt in options['out_plots']:
|
|
#try:
|
|
out.append(plt(self))
|
|
#except:
|
|
# logger.log('debug', 'Plot: %s failed' %plt)
|
|
return out
|
|
|
|
def run_o(self, a, b):
|
|
options = self._options
|
|
self._dataset['X'] = a
|
|
self._dataset['Y'] = b
|
|
self._data['X'] = a.asarray()
|
|
self._data['Y'] = b.asarray()
|
|
if options['center']:
|
|
self.model['E0'] = options['center_mth'](self._data['X'])
|
|
self.model['F0'] = options['center_mth'](self._data['Y'])
|
|
else:
|
|
self.model['E0'] = self._data['X']
|
|
self.model['F0'] = self._data['Y']
|
|
|
|
self.pre_validation(**options.pre_validation_options())
|
|
self.make_model(self.model['E0'], self.model['F0'],
|
|
**options.make_model_options())
|
|
# variance captured
|
|
var_x, exp_var_x = variances(self.model['E0'], self.model['T'], self.model['P'])
|
|
self.model['var_x'] = var_x
|
|
self.model['exp_var_x'] = exp_var_x
|
|
|
|
var_y, exp_var_y = variances(self.model['F0'], self.model['T'], self.model['Q'])
|
|
self.model['var_y'] = var_y
|
|
self.model['exp_var_y'] = exp_var_y
|
|
|
|
if options['calc_conf']:
|
|
self.confidence(**options.confidence_options())
|
|
|
|
out = [self.as_dataset(p) for p in options['out_data']]
|
|
for plt in self.get_out_plots(options):
|
|
out.append(plt)
|
|
return out
|
|
|
|
def run(self, a, b):
|
|
"""Run Pls with option gui.
|
|
"""
|
|
dialog = PlsOptionsDialog([a, b], self._options)
|
|
dialog.show_all()
|
|
response = dialog.run()
|
|
dialog.hide()
|
|
|
|
if response == gtk.RESPONSE_OK:
|
|
# set output data and plots
|
|
dialog.set_output()
|
|
|
|
#run with current data and options
|
|
return self.run_o(a, b)
|
|
|
|
class Packer:
|
|
"""A compression object used to speed up model calculations.
|
|
|
|
Often used in conjunction with crossvalidation and perturbations
|
|
analysis.
|
|
"""
|
|
def __init__(self,array):
|
|
self._shape = array.shape
|
|
self._array = array
|
|
self._packed_data = None
|
|
|
|
def expand(self,a):
|
|
if self._inflater!=None:
|
|
return dot(self._inflater,a)
|
|
|
|
def collapse(self,axis=None,mode='svd'):
|
|
if not axis:
|
|
axis = argmin(self._array.shape) # default is the smallest dim
|
|
|
|
if axis == 1:
|
|
self._array = self._array.T
|
|
u,s,vt = svd(self._array,full_matrices=0)
|
|
self._inflater = vt.T
|
|
self._packed_data = u*s
|
|
return self._packed_data
|
|
|
|
def get_packed_data(self):
|
|
return self._packed_data
|
|
|
|
|
|
class PcaOptions(Options):
|
|
"""Options for Principal Component Analysis.
|
|
"""
|
|
def __init__(self):
|
|
Options.__init__(self)
|
|
self._set_default()
|
|
|
|
def _set_default(self):
|
|
opt = {}
|
|
opt['algo'] = 'pca'
|
|
opt['engine'] = engines.pca
|
|
opt['mode'] = 'normal' # how much info to calculate
|
|
opt['lod'] = 'compact' # how much info to store
|
|
opt['amax'] = 5
|
|
opt['aopt'] = 5
|
|
opt['center'] = True
|
|
opt['center_mth'] = mat_center
|
|
opt['scale'] = 'scores'
|
|
opt['calc_conf'] = True
|
|
opt['n_sets'] = 5
|
|
|
|
opt['strict'] = True
|
|
opt['p_center'] = 'med'
|
|
opt['alpha'] = .8
|
|
opt['cov_center'] = 'med'
|
|
opt['crot'] = True
|
|
|
|
opt['val_engine'] = pca_alter_val
|
|
opt['val_n_sets'] = 10
|
|
|
|
opt['all_data'] = [('T', 'scores', True),
|
|
('P', 'loadings', True),
|
|
('E','residuals', False),
|
|
('p_tsq', 't2', False),
|
|
('rmsep', 'root mean square error of prediction', False)
|
|
]
|
|
|
|
opt['all_plots'] = [(blmplots.PcaScorePlot, 'Scores', True),
|
|
(blmplots.PcaLoadingPlot, 'Loadings', True),
|
|
(blmplots.LineViewXc, 'Line view', True)
|
|
]
|
|
|
|
opt['out_data'] = ['T','P', 'p_tsq']
|
|
opt['out_plots'] = [blmplots.PcaScorePlot,blmplots.PcaLoadingPlot,blmplots.LineViewXc]
|
|
|
|
self.update(opt)
|
|
|
|
def make_model_options(self):
|
|
"""Options for make_model method."""
|
|
opt_list = ['scale','mode', 'amax']
|
|
return self._copy_from_list(opt_list)
|
|
|
|
def confidence_options(self):
|
|
"""Options for confidence method."""
|
|
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
|
|
'strict', 'crot', 'cov_center']
|
|
return self._copy_from_list(opt_list)
|
|
|
|
def pre_validation_options(self):
|
|
"""Options for pre_validation method."""
|
|
opt_list = ['amax', 'n_sets', 'val_engine']
|
|
return self._copy_from_list(opt_list)
|
|
|
|
|
|
class PlsOptions(Options):
|
|
"""Options for Partial Least Squares Regression.
|
|
"""
|
|
def __init__(self):
|
|
Options.__init__(self)
|
|
self._set_default()
|
|
|
|
def _set_default(self):
|
|
opt = {}
|
|
opt['algo'] = 'pls'
|
|
opt['engine'] = engines.pls
|
|
opt['mode'] = 'normal' # how much info to calculate
|
|
opt['lod'] = 'compact' # how much info to store
|
|
opt['amax'] = 3
|
|
opt['aopt'] = 3
|
|
opt['center'] = True
|
|
opt['center_mth'] = mat_center
|
|
opt['scale'] = 'scores'
|
|
opt['calc_conf'] = True
|
|
opt['n_sets'] = 10
|
|
|
|
opt['strict'] = True
|
|
opt['p_center'] = 'med'
|
|
opt['alpha'] = .2
|
|
opt['cov_center'] = 'med'
|
|
opt['crot'] = True
|
|
|
|
opt['val_engine'] = w_pls_cv_val
|
|
|
|
opt['all_data'] = [('T', 'scores', True),
|
|
('P', 'loadings', True),
|
|
('E','residuals', False),
|
|
('p_tsq', 't2', False),
|
|
('rmsep', 'root mean square error of prediction', False)
|
|
]
|
|
|
|
opt['all_plots'] = [(blmplots.PlsScorePlot, 'Scores', True),
|
|
(blmplots.PlsLoadingPlot, 'Loadings', True),
|
|
(blmplots.LineViewXc, 'Line view', True)]
|
|
|
|
opt['out_plots'] = [blmplots.PlsScorePlot,
|
|
blmplots.PlsLoadingPlot,
|
|
blmplots.LineViewXc]
|
|
opt['out_data'] = None
|
|
|
|
opt['pack'] = False
|
|
opt['calc_qvals'] = False
|
|
opt['q_pert_mth'] = 'shuffle_vars'
|
|
opt['q_iter'] = 20
|
|
|
|
self.update(opt)
|
|
|
|
def make_model_options(self):
|
|
"""Options for make_model method."""
|
|
opt_list = ['scale','mode', 'amax', 'engine']
|
|
return self._copy_from_list(opt_list)
|
|
|
|
def confidence_options(self):
|
|
"""Options for confidence method."""
|
|
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
|
|
'strict', 'crot', 'cov_center']
|
|
return self._copy_from_list(opt_list)
|
|
|
|
def pre_validation_options(self):
|
|
"""Options for pre_validation method."""
|
|
opt_list = ['amax', 'n_sets', 'val_engine']
|
|
return self._copy_from_list(opt_list)
|
|
|
|
|
|
class PcaOptionsDialog(OptionsDialog):
|
|
"""Options dialog for Principal Component Analysis.
|
|
"""
|
|
def __init__(self, data, options, input_names=['X']):
|
|
OptionsDialog.__init__(self, data, options, input_names)
|
|
|
|
|
|
class PlsOptionsDialog(OptionsDialog):
|
|
"""Options dialog for Partial Least Squares Regression.
|
|
"""
|
|
def __init__(self, data, options, input_names=['X', 'Y']):
|
|
OptionsDialog.__init__(self, data, options, input_names)
|