This repository has been archived on 2024-07-04. You can view files and clone it, but cannot push or open issues or pull requests.
laydi/workflows/pca_workflow.py
2006-08-28 12:06:05 +00:00

263 lines
8.4 KiB
Python

import gtk
import system.workflow as wf
from system.workflow import Stage, Function
import pickle
from scipy import log2,transpose,dot,divide,shape,mean,resize,zeros
from scipy.linalg import svd,inv,norm,get_blas_funcs,eig
from system import dataset, logger, plots
class PCAWorkflow(wf.Workflow):
name = 'PCA Workflow'
ident = 'pca'
description = 'PCA workflow. Uses real microarray data from a study of diabetes (Mootha et al.).'
def __init__(self, app):
wf.Workflow.__init__(self, app)
#self.add_project(app.project)
#logger.log('notice','Current project added to: %s' %self.name)
load = Stage('load', 'Load Data')
load.add_function(LoadMoothaData())
self.add_stage(load)
preproc = Stage('preprocess', 'Preprocessing')
preproc.add_function(Log2Function())
self.add_stage(preproc)
annot = Stage('annot', 'Affy annotations')
annot.add_function(LoadAnnotationsFunction())
self.add_stage(annot)
model = Stage('model', 'Model')
model.add_function(PCAFunction(self))
self.add_stage(model)
logger.log('debug', '\tPCA\'s workflow is now active')
class LoadAnnotationsFunction(Function):
def __init__(self):
Function.__init__(self, 'load', 'Load Annotations')
def load_affy_file(self, filename):
f = open(filename)
logger.log('notice', 'Loading annotation file: %s' % filename)
self.file = f
def on_response(self, dialog, response):
if response == gtk.RESPONSE_OK:
logger.log('notice', 'Reading file: %s' % dialog.get_filename())
self.load_affy_file(dialog.get_filename())
def run(self,data):
btns = ('Open', gtk.RESPONSE_OK, \
'Cancel', gtk.RESPONSE_CANCEL)
dialog = gtk.FileChooserDialog('Open Affy Annotation File',
buttons=btns)
dialog.connect('response', self.on_response)
dialog.run()
dialog.destroy()
### Reading and parsing here
annot = read_affy_annot(self.file)
i_want = 'Pathway'
nothing = '---'
ids_in_data = set(data.names('genes')) #assuming we have genes
sanity_check = set(annot.keys())
if not ids_in_data.intersection(sanity_check) == ids_in_data:
logger.log('debug','Some identifers in data does not exist in affy file!')
for affy_id,description in annot:
if affy_id in ids_in_data:
pathways = description[i_want]
if not pathways[0][0]=='--':
pass
D = []
return [D]
class PCAFunction(Function):
def __init__(self,workflow):
Function.__init__(self, 'pca', 'PCA')
self.output = None
self.workflow = workflow
def run(self,data):
logger.log('debug', 'datatype: %s' % type(data))
if not isinstance(data,dataset.Dataset):
return None
#logger.log('debug', 'dimensions: %s' % data.dims)
## calculations
T,P,E,tsq = self.pca(data._array,5,tsq_loads=False)
comp_def = ('comp',('1','2','3','4','5'))
singel_def = ('1',('s'))
# pull out input identifiers:
row_ids = data.get_identifiers('genes')
col_ids = data.get_identifiers('samples')
T = dataset.Dataset(T,[('samples',col_ids) ,comp_def],name='T2')
P = dataset.Dataset(P,[('genes',row_ids),comp_def],name='P')
E = dataset.Dataset(E,[('samples',col_ids),('genes',row_ids)],name='E')
#tsq = dataset.Dataset(tsq,[singel_def,data_ids[1])
## plots
loading_plot1 = plots.ScatterPlot(P,P,'genes','comp','1','2')
loading_plot2 = plots.ScatterPlot(P,P,'genes','comp','3','4')
score_plot = plots.ScatterPlot(T,T,'samples','comp','1','2')
return [T,P,E,loading_plot1,loading_plot2,score_plot]
def pca(self,X,a_opt,cent=True,scale='loads',tsq_loads=False):
"""Principal component analysis
input:
Xc -- matrix, data
a_opt -- scalar, max number of comp. to calculate
cent -- bool, centering [True]
crit -- string, pc criteria ['exp_var',['ief','rpv','average']]
scale -- string, scaling ['loads',['scores']]
tsq_loads -- bool, calculate t-squared? [True]
reg -- float, covariance regularizer for tsq calculations [0.2]
output:
T,P,E,r
"""
nSamples,nVarX = shape(X)
if cent:
Xc = self.mat_center(X)
else:
Xc = X
u,s,vh = self.esvd(Xc)
if scale=='scores':
T = u*s
T = T[:,:a_opt]
P = transpose(vh)
P = P[:,:a_opt]
elif scale=='loads':
T = u
T = T[:,:a_opt]
P = transpose(vh)*s
P = P[:,:a_opt]
E = Xc - dot(T,transpose(P))
varEach = s**2
totVar = sum(varEach)
r = divide(varEach,totVar)*100
return T,P,E,r
def mat_center(self,X,axis=0,ret_mn=False):
"""Mean center matrix along axis.
input:
X -- matrix, data
axis -- dim,
ret_mn -- bool, return mean
output:
Xc, [mnX]
NB: axis = 1 is column-centering, axis=0=row-centering
default is row centering (axis=0)
"""
try:
rows,cols = shape(X)
except ValueError:
print "The X data needs to be two-dimensional"
if axis==0:
mnX = mean(X,axis)
Xs = X - resize(mnX,(rows,cols))
elif axis==1:
mnX = mean(X,axis)
Xs = transpose(transpose(X) - resize(mnX,(cols,rows)))
if ret_mn:
return Xs,mnX
else:
return Xs
def esvd(self,data,economy=1):
"""SVD with the option of economy sized calculation
Calculate subspaces of X'X or XX' depending on the shape
of the matrix.
Good for extreme fat or thin matrices.
"""
mm = self.mm
m,n = shape(data)
if m>=n:
u,s,v = svd(mm(data,data,trans_a=1))
u = mm(data,v,trans_b=1)
for i in xrange(n):
s[i] = norm(u[:,i])
u[:,i] = u[:,i]/s[i]
else:
u,s,v = svd(mm(data,data,trans_b=1))
v = mm(u,data,trans_a=1)
for i in xrange(m):
s[i] = norm(v[i,:])
v[i,:] = v[i,:]/s[i]
return u,s,v
def mm(self,a,b, alpha=1.0, beta=0.0, c=None, trans_a=0,
trans_b=0):
"""Fast matrix multiplication
Return alpha*(a*b) + beta*c.
a,b,c : matrices
alpha, beta: scalars
trans_a : 0 (a not transposed),
1 (a transposed),
2 (a conjugate transposed)
trans_b : 0 (b not transposed),
1 (b transposed),
2 (b conjugate transposed)
"""
if c:
gemm,= get_blas_funcs(('gemm',),(a,b,c))
else:
gemm,= get_blas_funcs(('gemm',),(a,b))
return gemm(alpha, a, b, beta, c, trans_a, trans_b)
class LoadMoothaData(Function):
def __init__(self):
Function.__init__(self, 'load', 'Load diabetes data')
def run(self,data):
data_file = open('full_data.pickle','r')
data = pickle.load(data_file)
data_file.close()
sample_file = open('sample_labels.pickle','r')
sample_names = pickle.load(sample_file)
sample_file.close()
typecode='f'
nSamps = len(sample_names)
nVars = len(data.keys())
gene_ids = []
x = zeros((nSamps,nVars),typecode)
for i,(id,desc) in enumerate(data.items()):
gene_ids.append(id)
x[:,i] = desc[0].astype(typecode)
gene_def = ('genes',gene_ids)
sample_def = ('samples', sample_names)
X = dataset.Dataset(x,identifiers=[sample_def,gene_def]) # samples x genes
return [X]
class Log2Function(Function):
def __init__(self):
Function.__init__(self, 'log', 'Log2')
def run(self,data):
x = log2(data._array)
#pull out identifiers
ids = []
for dim in data.get_dim_names():
ids.append((dim,data.get_identifiers(dim)))
return [dataset.Dataset(x,identifiers=ids,name='Log2_X')]
PCAWorkflow.name = 'PCA Workflow'