First import of chemometrics utils
This commit is contained in:
432
fluents/lib/blmfuncs.py
Normal file
432
fluents/lib/blmfuncs.py
Normal file
@@ -0,0 +1,432 @@
|
||||
"""This module contains bilinear models(Functions)
|
||||
"""
|
||||
import sys
|
||||
# add library
|
||||
sys.path.append('/home/flatberg/fluents/fluents/lib')
|
||||
import time
|
||||
from fluents.workflow import Function
|
||||
from fluents.dataset import Dataset
|
||||
from fluents import plots, dataset, workflow, logger
|
||||
import scipy
|
||||
from engines import *
|
||||
from cx_stats import leverage, variances, hotelling
|
||||
from cx_utils import mat_center
|
||||
from validation import *
|
||||
import blmplots
|
||||
import engines
|
||||
|
||||
|
||||
class Model(Function):
|
||||
"""Base class of bilinear models.
|
||||
"""
|
||||
def __init__(self,id='johndoe',name='JohnDoe'):
|
||||
Function.__init__(self,id,name)
|
||||
self.name = name
|
||||
self._options = None
|
||||
self._data = {}
|
||||
self._dataset = {}
|
||||
self._packers = {}
|
||||
self.model = {}
|
||||
|
||||
def clear(self):
|
||||
""" Clears model paramters
|
||||
"""
|
||||
self.model = {}
|
||||
self._data = {}
|
||||
self._packers = {}
|
||||
|
||||
|
||||
class PCA(Model):
|
||||
def __init__(self,id='pca',name='PCA'):
|
||||
Model.__init__(self,id,name)
|
||||
self._options = PcaOptions()
|
||||
|
||||
def pre_validation(self, amax, n_sets, val_engine):
|
||||
"""Model calculations for maximum number of components.
|
||||
"""
|
||||
rmsep = val_engine(self.model['E0'], amax, n_sets)
|
||||
self.model['rmsep'] = rmsep
|
||||
self.model['aopt'] = rmsep.argmin()
|
||||
|
||||
def confidence(self, aopt, n_sets, alpha, p_center,
|
||||
crot, strict, cov_center ):
|
||||
"""Returns a confidence measure for model parameters.
|
||||
Based on aopt.
|
||||
"""
|
||||
aopt = self.model['aopt']
|
||||
jk_segments = pca_jkP(self.model['E0'], aopt, n_sets)
|
||||
Pcal = self.model['P'][:,:aopt]
|
||||
tsq = hotelling(jk_segments, Pcal, p_center,
|
||||
cov_center, alpha, crot, strict)
|
||||
self.model['p_tsq'] = tsq
|
||||
|
||||
def make_model(self, amax, mode, scale):
|
||||
"""Model on optimal number of components.
|
||||
"""
|
||||
dat = pca(self.model['E0'], amax, scale, mode)
|
||||
|
||||
# explained variance
|
||||
var_x, exp_var_x = variances(self.model['E0'], dat['T'], dat['P'])
|
||||
dat['var_x'] = var_x
|
||||
dat['exp_var_x'] = exp_var_x
|
||||
|
||||
#fixme###
|
||||
do_lev_s = False
|
||||
do_lev_v = False
|
||||
#####
|
||||
if do_lev_s:
|
||||
# sample leverages
|
||||
tnorm = scipy.apply_along_axis(norm, 0, dat['T']) # norm of T-columns
|
||||
s_lev = leverage(amax, tnorm)
|
||||
dat['s_lev'] = s_lev
|
||||
if do_lev_v:
|
||||
# variable leverages
|
||||
v_lev = leverage(amax, dat['P'])
|
||||
dat['v_lev'] = v_lev
|
||||
|
||||
self.model.update(dat)
|
||||
|
||||
def as_dataset(self, param, dtype='dataset'):
|
||||
"""Return model parameter as Dataset.
|
||||
"""
|
||||
if not param in self.model.keys():
|
||||
return
|
||||
DX = self._dataset['X'] #input dataset
|
||||
dim_name_0, dim_name_1 = DX.get_dim_name()
|
||||
# samples
|
||||
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
|
||||
# vars
|
||||
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
|
||||
# components (hidden)
|
||||
pc_ids = ['_comp_a', map(str,range(self.model['aopt'])) ]
|
||||
pc_ids_opt = ['_comp_o', map(str, range(self.model['aopt'])) ]
|
||||
zero_dim = ['_doe', ['0']] # null dim, vector (hidden)
|
||||
match_ids = {'E':[ids_0, ids_1],
|
||||
'E0':[ids_0, ids_1],
|
||||
'P':[ids_1, pc_ids],
|
||||
'T':[ids_0, pc_ids],
|
||||
'W':[ids_1, pc_ids],
|
||||
'p_tsq':[ids_1, zero_dim],
|
||||
'rmsep':[pc_ids, zero_dim],
|
||||
'var_leverages':[ids_1, zero_dim],
|
||||
'sample_leverages':[pc_ids, zero_dim],
|
||||
'exp_var_x': [pc_ids, zero_dim],
|
||||
'var_x': [pc_ids, zero_dim],
|
||||
}
|
||||
|
||||
out = Dataset(self.model[param], match_ids[param], name=param)
|
||||
return out
|
||||
|
||||
def get_out_plots(self, options):
|
||||
out=[]
|
||||
for plt in options['out_plots']:
|
||||
#try:
|
||||
out.append(plt(self))
|
||||
#except:
|
||||
# print plt
|
||||
#logger.log('debug', 'Plot: %s failed') %plt
|
||||
return out
|
||||
|
||||
def run(self, data):
|
||||
"""Run pca with present options.
|
||||
"""
|
||||
self.clear()
|
||||
options = self._options
|
||||
self._dataset['X'] = data
|
||||
self._data['X'] = data.asarray().astype('<f8')
|
||||
if options['center']:
|
||||
center = options['center_mth']
|
||||
self.model['E0'] = center(self._data['X'])
|
||||
else:
|
||||
self.model['E0'] = data.asarray()
|
||||
|
||||
self.pre_validation(**options.pre_validation_options())
|
||||
self.make_model(**options.make_model_options())
|
||||
if options['calc_conf']:
|
||||
self.confidence(**options.confidence_options())
|
||||
|
||||
out = [self.as_dataset(p) for p in options['out_data']]
|
||||
for plt in self.get_out_plots(options):
|
||||
out.append(plt)
|
||||
return out
|
||||
|
||||
|
||||
class PLS(Model):
|
||||
def __init__(self, id='pls', name='PLS'):
|
||||
Model.__init__(self, id, name)
|
||||
self._options = PlsOptions()
|
||||
|
||||
def pre_validation(self, amax, n_sets, val_engine):
|
||||
"""Returns rmsec,rmsep for model.
|
||||
"""
|
||||
rmsep = val_engine(self.model['E0'], self.model['F0'],
|
||||
amax, n_sets)
|
||||
self.model['rmsep'] = rmsep.mean(0)
|
||||
self.model['aopt'] = rmsep.mean(0).argmin()
|
||||
|
||||
def confidence(self, aopt, n_sets, alpha, p_center,
|
||||
crot, strict, cov_center ):
|
||||
"""Returns a confidence measure for model parameters
|
||||
Supported parameters: W
|
||||
"""
|
||||
aopt = self.model['aopt']
|
||||
jk_segments = pls_jkW(self.model['E0'], self.model['F0'],
|
||||
aopt, n_sets)
|
||||
Wcal = self.model['W'][:,:aopt]
|
||||
tsq = hotelling(jk_segments, Wcal, p_center,
|
||||
alpha, crot, strict, cov_center)
|
||||
self.model['w_tsq'] = tsq
|
||||
|
||||
def permutation_confidence(self, a, b, aopt, reg, n_iter, algo,
|
||||
sim_method):
|
||||
"""Estimates sign. vars by controlling fdr."""
|
||||
|
||||
qvals_sorted, qvals = pls_qvals(a, b, aopt=None,
|
||||
alpha=.4, n_iter=20, algo='pls',
|
||||
sim_method='shuffle', )
|
||||
|
||||
|
||||
def make_model(self, a, b, amax, scale, mode, engine):
|
||||
"""Make model on amax components.
|
||||
"""
|
||||
dat = engine(a, b, amax, scale, mode)
|
||||
self.model.update(dat)
|
||||
|
||||
def as_dataset(self, name, dtype='Dataset'):
|
||||
"""Return any model parameter as Dataset
|
||||
No ids matching
|
||||
"""
|
||||
if name not in self.model.keys():
|
||||
return
|
||||
DX, DY = self._dataset['X'], self._dataset['Y']
|
||||
dim_name_0, dim_name_1 = DX.get_dim_name()
|
||||
dim_name_2, dim_name_3 = DY.get_dim_name()
|
||||
#samples
|
||||
ids_0 = [dim_name_0, DX.get_identifiers(dim_name_0, sorted=True)]
|
||||
# x vars
|
||||
ids_1 = [dim_name_1, DX.get_identifiers(dim_name_1, sorted=True)]
|
||||
# y vars
|
||||
ids_3 = [dim_name_3, DY.get_identifiers(dim_name_3, sorted=True)]
|
||||
# components (hidden)
|
||||
pc_ids = ['_comp', map(str, range(self.model['aopt']))]
|
||||
zero_dim = ['_doe',['0']] # null dim, vector (hidden)
|
||||
|
||||
match_ids = {'E':[ids_0, ids_1],
|
||||
'P':[ids_1, pc_ids],
|
||||
'T':[ids_0, pc_ids],
|
||||
'W': [ids_1, pc_ids],
|
||||
'R': [ids_1, pc_ids],
|
||||
'Q':[ids_3, pc_ids],
|
||||
'F':[ids_0, ids_3],
|
||||
'B':[ids_1, ids_3],
|
||||
'qval':[ids_1, zero_dim],
|
||||
'qval_sorted':[ids_1, zero_dim],
|
||||
'w_tsq':[ids_1, zero_dim],
|
||||
'rmsep':[pc_ids, zero_dim],
|
||||
}
|
||||
|
||||
array = self.model[name]
|
||||
M = Dataset(array,identifiers=match_ids[name],name=name)
|
||||
return M
|
||||
|
||||
def get_out_plots(self, options):
|
||||
out=[]
|
||||
for plt in options['out_plots']:
|
||||
#try:
|
||||
out.append(plt(self))
|
||||
#except:
|
||||
# logger.log('debug', 'Plot: %s failed' %plt)
|
||||
return out
|
||||
|
||||
def run(self,a,b):
|
||||
options = self._options
|
||||
self._dataset['X'] = a
|
||||
self._dataset['Y'] = b
|
||||
self._data['X'] = a.asarray()
|
||||
self._data['Y'] = b.asarray()
|
||||
if options['center']:
|
||||
self.model['E0'] = options['center_mth'](self._data['X'])
|
||||
self.model['F0'] = options['center_mth'](self._data['Y'])
|
||||
else:
|
||||
self.model['E0'] = self._data['X']
|
||||
self.model['F0'] = self._data['Y']
|
||||
|
||||
self.pre_validation(**options.pre_validation_options())
|
||||
self.make_model(self.model['E0'], self.model['F0'],
|
||||
**options.make_model_options())
|
||||
# variance captured
|
||||
var_x, exp_var_x = variances(self.model['E0'], self.model['T'], self.model['P'])
|
||||
self.model['var_x'] = var_x
|
||||
self.model['exp_var_x'] = exp_var_x
|
||||
|
||||
var_y, exp_var_y = variances(self.model['F0'], self.model['T'], self.model['Q'])
|
||||
self.model['var_y'] = var_y
|
||||
self.model['exp_var_y'] = exp_var_y
|
||||
|
||||
if options['calc_conf']:
|
||||
self.confidence(**options.confidence_options())
|
||||
|
||||
out = [self.as_dataset(p) for p in options['out_data']]
|
||||
for plt in self.get_out_plots(options):
|
||||
out.append(plt)
|
||||
return out
|
||||
|
||||
|
||||
class Packer:
|
||||
"""A compression object used to speed up model calculations.
|
||||
|
||||
Often used in conjunction with crossvalidation and perturbations
|
||||
analysis.
|
||||
"""
|
||||
def __init__(self,array):
|
||||
self._shape = array.shape
|
||||
self._array = array
|
||||
self._packed_data = None
|
||||
|
||||
def expand(self,a):
|
||||
if self._inflater!=None:
|
||||
return dot(self._inflater,a)
|
||||
|
||||
def collapse(self,axis=None,mode='svd'):
|
||||
if not axis:
|
||||
axis = argmin(self._array.shape) # default is the smallest dim
|
||||
|
||||
if axis == 1:
|
||||
self._array = self._array.T
|
||||
u,s,vt = svd(self._array,full_matrices=0)
|
||||
self._inflater = vt.T
|
||||
self._packed_data = u*s
|
||||
return self._packed_data
|
||||
|
||||
def get_packed_data(self):
|
||||
return self._packed_data
|
||||
|
||||
|
||||
class Options(dict):
|
||||
"""Options base class.
|
||||
"""
|
||||
def __init__(self, *args,**kw):
|
||||
dict.__init__(self, *args, **kw)
|
||||
|
||||
def _copy_from_list(self, key_list):
|
||||
d = {}
|
||||
for key in key_list:
|
||||
d[key] = self.get(key,None)
|
||||
return d
|
||||
|
||||
class PcaOptions(Options):
|
||||
"""Options for Principal Component Analysis.
|
||||
"""
|
||||
def __init__(self):
|
||||
Options.__init__(self)
|
||||
self._set_default()
|
||||
|
||||
def _set_default(self):
|
||||
opt = {}
|
||||
opt['algo'] = 'pca'
|
||||
opt['engine'] = engines.pca
|
||||
opt['mode'] = 'normal' # how much info to calculate
|
||||
opt['lod'] = 'compact' # how much info to store
|
||||
opt['amax'] = 5
|
||||
opt['aopt'] = 5
|
||||
opt['center'] = True
|
||||
opt['center_mth'] = mat_center
|
||||
opt['scale'] = 'scores'
|
||||
opt['calc_conf'] = True
|
||||
opt['n_sets'] = 5
|
||||
|
||||
opt['strict'] = True
|
||||
opt['p_center'] = 'med'
|
||||
opt['alpha'] = .8
|
||||
opt['cov_center'] = 'med'
|
||||
opt['crot'] = True
|
||||
|
||||
opt['val_engine'] = pca_alter_val
|
||||
opt['val_n_sets'] = 10
|
||||
|
||||
opt['all_data'] = ['T','P','E','p_tsq','rmsep']
|
||||
opt['all_plots'] = ['PcaScorePlot', 'PcaLoadingPlot',
|
||||
'PcaRmsepPlot']
|
||||
|
||||
opt['out_data'] = ['T','P', 'p_tsq']
|
||||
opt['out_plots'] = [blmplots.PcaScorePlot,blmplots.PcaLoadingPlot,blmplots.LineViewXc]
|
||||
|
||||
self.update(opt)
|
||||
|
||||
def make_model_options(self):
|
||||
"""Options for make_model method."""
|
||||
opt_list = ['scale','mode', 'amax']
|
||||
return self._copy_from_list(opt_list)
|
||||
|
||||
def confidence_options(self):
|
||||
"""Options for confidence method."""
|
||||
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
|
||||
'strict', 'crot', 'cov_center']
|
||||
return self._copy_from_list(opt_list)
|
||||
|
||||
def pre_validation_options(self):
|
||||
"""Options for pre_validation method."""
|
||||
opt_list = ['amax', 'n_sets', 'val_engine']
|
||||
return self._copy_from_list(opt_list)
|
||||
|
||||
|
||||
class PlsOptions(Options):
|
||||
"""Options for Partial Least Squares Regression.
|
||||
"""
|
||||
def __init__(self):
|
||||
Options.__init__(self)
|
||||
self._set_default()
|
||||
|
||||
def _set_default(self):
|
||||
opt = {}
|
||||
opt['algo'] = 'pls'
|
||||
opt['engine'] = engines.pls
|
||||
opt['mode'] = 'normal' # how much info to calculate
|
||||
opt['lod'] = 'compact' # how much info to store
|
||||
opt['amax'] = 3
|
||||
opt['aopt'] = 3
|
||||
opt['center'] = True
|
||||
opt['center_mth'] = mat_center
|
||||
opt['scale'] = 'scores'
|
||||
opt['calc_conf'] = True
|
||||
opt['n_sets'] = 10
|
||||
|
||||
opt['strict'] = True
|
||||
opt['p_center'] = 'med'
|
||||
opt['alpha'] = .2
|
||||
opt['cov_center'] = 'med'
|
||||
opt['crot'] = True
|
||||
|
||||
opt['val_engine'] = w_pls_cv_val
|
||||
|
||||
opt['all_data'] = ['T','P','E','p_tsq','rmsep']
|
||||
opt['all_plots'] = ['PcaScorePlot', 'PcaLoadingPlot',
|
||||
'PcaRmsepPlot']
|
||||
|
||||
opt['out_data'] = []
|
||||
opt['out_plots'] = [blmplots.PlsScorePlot,
|
||||
blmplots.PlsLoadingPlot,
|
||||
blmplots.LineViewXc]
|
||||
#blmplots.PlsQvalScatter]
|
||||
|
||||
opt['pack'] = False
|
||||
opt['calc_qvals'] = False
|
||||
opt['q_pert_mth'] = 'shuffle_vars'
|
||||
opt['q_iter'] = 20
|
||||
self.update(opt)
|
||||
|
||||
def make_model_options(self):
|
||||
"""Options for make_model method."""
|
||||
opt_list = ['scale','mode', 'amax', 'engine']
|
||||
return self._copy_from_list(opt_list)
|
||||
|
||||
def confidence_options(self):
|
||||
"""Options for confidence method."""
|
||||
opt_list = ['n_sets', 'aopt', 'alpha', 'p_center',
|
||||
'strict', 'crot', 'cov_center']
|
||||
return self._copy_from_list(opt_list)
|
||||
|
||||
def pre_validation_options(self):
|
||||
"""Options for pre_validation method."""
|
||||
opt_list = ['amax', 'n_sets', 'val_engine']
|
||||
return self._copy_from_list(opt_list)
|
Reference in New Issue
Block a user