Demo workflow

This commit is contained in:
Arnar Flatberg 2008-02-05 11:34:14 +00:00
parent 848ba7f80c
commit 14d24d02c2

518
workflows/demo.py Normal file
View File

@ -0,0 +1,518 @@
import sys,os
import os.path
import webbrowser
import cPickle
import scipy
import networkx as nx
from fluents import logger,plots,workflow,dataset,main
from fluents.lib import blmfuncs,nx_utils,cx_utils
import gobrowser
class SmallTestWorkflow(workflow.Workflow):
name = 'Demo'
ident = 'demo'
description = 'A small test workflow for gene expression analysis.'
chip = 'hgu'
def __init__(self):
workflow.Workflow.__init__(self)
# DATA IMPORT
load = workflow.Stage('load', 'Data')
load_small = LoadDataFunction('load-small', 'Small', self)
load.add_function(load_small)
load_medium = LoadDataFunction('load-geneid', 'GeneID', self, 'geneid')
load.add_function(load_medium)
load_medium = LoadDataFunction('load-full', 'FullChip', self, 'full')
load.add_function(load_medium)
self.add_stage(load)
# NETWORK PREPROCESSING
#net = workflow.Stage('net', 'Network integration')
#net.add_function(DiffKernelFunction())
#net.add_function(ModKernelFunction())
#self.add_stage(net)
# Models
model = workflow.Stage('models', 'Models')
model.add_function(blmfuncs.PCA())
model.add_function(blmfuncs.PLS())
model.add_function(SAM())
self.add_stage(model)
query = workflow.Stage('query', 'Gene Query')
query.add_function(NCBIQuery())
query.add_function(KEGGQuery())
query.add_function(SubgraphQuery())
self.add_stage(query)
# Background knowledge
go = workflow.Stage('go', 'Gene Ontology')
go.add_function(gobrowser.PlotDagFunction())
go.add_function(GoEnrichment())
go.add_function(GoEnrichmentCond())
go.add_function(MapGO2Gene())
go.add_function(MapGene2GO())
self.add_stage(go)
# EXTRA PLOTS
#plt = workflow.Stage('net', 'Network')
#plt.add_function(nx_analyser.KeggNetworkAnalyser())
#self.add_stage(plt)
logger.log('debug', 'Small test workflow is now active')
class LoadDataFunction(workflow.Function):
"""Loads all datasets in a given directory."""
def __init__(self, ident, label, wf, dir=''):
workflow.Function.__init__(self, ident, label)
self._dir = dir
self._wf = wf
def run(self):
path = os.path.join(main.options.datadir, self._wf.ident, self._dir)
files = os.listdir(path)
out = []
for fn in files:
if fn.endswith('.ftsv'):
out.append(dataset.read_ftsv(os.path.join(path, fn)))
return out
##### WORKFLOW SPECIFIC FUNCTIONS ######
class SAM(workflow.Function):
def __init__(self, id='sam', name='SAM'):
workflow.Function.__init__(self, id, name)
def run(self, x, y):
n_iter = 50 #B
alpha = 0.01 #cut off on qvals
###############
# Main function call
# setup prelimenaries
import rpy
rpy.r.library("siggenes")
rpy.r.library("multtest")
cl = scipy.dot(y.asarray(), scipy.diag(scipy.arange(y.shape[1]))).sum(1)
data = x.asarray().T
sam = rpy.r.sam(data, cl=cl, B=n_iter, var_equal=False,med=False,s0=scipy.nan,rand=scipy.nan)
qvals = scipy.asarray(rpy.r.slot(sam, "p.value"))
pvals = scipy.asarray(rpy.r.slot(sam, "q.value"))
sam_index = (qvals<alpha).nonzero()[0]
# Update selection object
dim_name = x.get_dim_name(1)
sam_selection = x.get_identifiers(dim_name, indices=sam_index)
main.project.set_selection(dim_name, sam_selection)
sel = dataset.Selection('SAM selection')
sel.select(dim_name, sam_selection)
logger.log('notice','Number of significant varibles (SAM): %s' %len(sam_selection))
# ## OUTPUT ###
xcolname = x.get_dim_name(1) # genes
x_col_ids = [xcolname, x.get_identifiers(xcolname, sorted=True)]
sing_id = ['_john', ['0']] #singleton
D_qvals = dataset.Dataset(qvals, (x_col_ids, sing_id), name='q_vals')
D_pvals = dataset.Dataset(pvals, (x_col_ids, sing_id), name='p_vals')
# plots
s_indx = qvals.flatten().argsort()
s_ids = [x_col_ids[0],[x_col_ids[1][i] for i in s_indx]]
xindex = scipy.arange(len(qvals))
qvals_s = qvals.take(s_indx)
D_qs = dataset.Dataset(qvals_s, (s_ids, sing_id), name="sorted qvals")
Dind = dataset.Dataset(xindex, (s_ids, sing_id), name="dum")
st = plots.ScatterPlot(D_qs, Dind, 'gene_ids', '_john', '0', '0', s=10, name='SAM qvals')
return [D_qvals, D_pvals, D_qs, st, sel]
class DiffKernelFunction(workflow.Function):
def __init__(self):
workflow.Function.__init__(self, 'diffkernel', 'Diffusion')
def run(self, x, a):
"""x is gene expression data, a is the network.
"""
#sanity check:
g = a.asnetworkx()
genes = x.get_identifiers(x.get_dim_name(1), sorted=True)
W = nx.adj_matrix(g, nodelist=genes)
X = x.asarray()
Xc, mn_x = cx_utils.mat_center(X, ret_mn=True)
out = []
alpha = 1.0
beta = 1.0
K = nx_utils.K_diffusion(W, alpha=alpha, beta=beta,normalised=True)
Xp = scipy.dot(Xc, K) + mn_x
# dataset
row_ids = (x.get_dim_name(0),
x.get_identifiers(x.get_dim_name(0),
sorted=True))
col_ids = (x.get_dim_name(1),
x.get_identifiers(x.get_dim_name(1),
sorted=True))
xout = dataset.Dataset(Xp,
(row_ids, col_ids),
name=x.get_name()+'_diff'+str(alpha))
out.append(xout)
return out
class ModKernelFunction(workflow.Function):
def __init__(self):
workflow.Function.__init__(self, 'mokernel', 'Modularity')
def run(self,x,a):
X = x.asarray()
g = a.asnetworkx()
genes = x.get_identifiers(x.get_dim_name(1), sorted=True)
W = nx.adj_matrix(g, nodelist=genes)
out=[]
alpha=.2
Xc,mn_x = cx_utils.mat_center(X, ret_mn=True)
K = nx_utils.K_modularity(W, alpha=alpha)
Xp = scipy.dot(Xc, K)
Xp = Xp + mn_x
# dataset
row_ids = (x.get_dim_name(0),
x.get_identifiers(x.get_dim_name(0),
sorted=True))
col_ids = (x.get_dim_name(1),
x.get_identifiers(x.get_dim_name(1),
sorted=True))
xout = dataset.Dataset(Xp,
(row_ids,col_ids),
name=x.get_name()+'_mod'+str(alpha))
out.append(xout)
return out
class NCBIQuery(workflow.Function):
def __init__(self, gene_id_name='gene_ids'):
self._gene_id_name = gene_id_name
workflow.Function.__init__(self, 'query', 'NCBI')
def run(self):
selection = main.project.get_selection()
if not selection.has_key(self._gene_id_name):
logger.log("notice", "Expected gene ids: %s, but got: %s" %(self._gene_id_name, selection.keys()))
return None
if len(selection[self._gene_id_name])==0:
logger.log("notice", "No selected genes to query")
return None
base = 'http://www.ncbi.nlm.nih.gov/entrez/query.fcgi?'
options = {r'&db=' : 'gene',
r'&cmd=' : 'retrieve',
r'&dopt=' : 'full_report'}
gene_str = ''.join([gene + "+" for gene in selection[self._gene_id_name]])
options[r'&list_uids='] = gene_str[:-1]
opt_str = ''.join([key+value for key,value in options.items()])
web_str = base + opt_str
webbrowser.open(web_str)
class KEGGQuery(workflow.Function):
def __init__(self, org='hsa', gene_id_name='gene_ids'):
self._org=org
self._gene_id_name = gene_id_name
workflow.Function.__init__(self, 'query', 'KEGG')
def run(self, selection):
if not selection.has_key(self._gene_id_name):
logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._gene_id_name, selection.keys()))
return None
if len(selection[self._gene_id_name])==0:
logger.log("notice", "No selected genes to query")
return None
base = r'http://www.genome.jp/dbget-bin/www_bget?'
gene_str = ''.join([gene + "+" for gene in selection[self._gene_id_name]])
gene_str = gene_str[:-1]
gene_str = self._org + "+" + gene_str
web_str = base + gene_str
webbrowser.open(web_str)
class GoEnrichment(workflow.Function):
def __init__(self):
workflow.Function.__init__(self, 'goenrich', 'Go Enrichment')
def run(self, data):
import rpy
rpy.r.library("GOstats")
# Get universe
# Here, we are using a defined dataset to represent the universe
if not 'gene_ids' in data:
logger.log('notice', 'No dimension called [gene_ids] in dataset: %s' %data.get_name())
return
universe = list(data.get_identifiers('gene_ids'))
logger.log('notice', 'Universe consists of %s gene ids from %s' %(len(universe), data.get_name()))
# Get current selection and validate
curr_sel = main.project.get_selection()
selected_genes = list(curr_sel['gene_ids'])
if len(selected_genes)==0:
logger.log('notice', 'This function needs a current selection!')
return
# Hypergeometric parameter object
pval_cutoff = 0.9999
cond = False
test_direction = 'over'
params = rpy.r.new("GOHyperGParams",
geneIds=selected_genes,
annotation="hgu133a",
ontology="BP",
pvalueCutoff=pval_cutoff,
conditional=cond,
testDirection=test_direction
)
# run test
# result.keys(): ['Count', 'Term', 'OddsRatio', 'Pvalue', 'ExpCount', 'GOBPID', 'Size']
result = rpy.r.summary(rpy.r.hyperGTest(params))
# dataset
terms = result['GOBPID']
pvals = scipy.log(scipy.asarray(result['Pvalue']))
row_ids = ('go-terms', terms)
col_ids = ('_john', ['_doe'])
xout = dataset.Dataset(pvals,
(row_ids, col_ids),
name='P values (enrichment)')
return [xout]
class GoEnrichmentCond(workflow.Function):
""" Enrichment conditioned on dag structure."""
def __init__(self):
workflow.Function.__init__(self, 'goenrich', 'Go Cond. Enrich.')
def run(self, data):
import rpy
rpy.r.library("GOstats")
# Get universe
# Here, we are using a defined dataset to represent the universe
if not 'gene_ids' in data:
logger.log('notice', 'No dimension called [gene_ids] in dataset: %s', data.get_name())
return
universe = list(data.get_identifiers('gene_ids'))
logger.log('notice', 'Universe consists of %s gene ids from %s' %(len(universe), data.get_name()))
# Get current selection and validate
curr_sel = main.project.get_selection()
selected_genes = list(curr_sel['gene_ids'])
if len(selected_genes)==0:
logger.log('notice', 'This function needs a current selection!')
return
# Hypergeometric parameter object
pval_cutoff = 1
cond = True
test_direction = 'over'
params = rpy.r.new("GOHyperGParams",
geneIds=selected_genes,
annotation="hgu133a",
ontology="BP",
pvalueCutoff=pval_cutoff,
conditional=cond,
testDirection=test_direction
)
# run test
# result.keys(): ['Count', 'Term', 'OddsRatio', 'Pvalue', 'ExpCount', 'GOBPID', 'Size']
result = rpy.r.summary(rpy.r.hyperGTest(params))
# dataset
terms = result['GOBPID']
pvals = scipy.log(scipy.asarray(result['Pvalue']))
row_ids = ('go-terms', terms)
col_ids = ('_john', ['_doe'])
xout = dataset.Dataset(pvals,
(row_ids, col_ids),
name='P values (enrichment)')
return [xout]
class MapGene2GO(workflow.Function):
def __init__(self, ont='bp', gene_id_name='gene_ids'):
self._ont = ont
self._gene_id_name = gene_id_name
workflow.Function.__init__(self, 'gene2go', 'gene->GO')
# load data at init
try:
fname = "/home/flatberg/fluents/data/gene2go.pcl"
self._gene2go = cPickle.load(open(fname))
except:
logger.log("notice", "could not load mapping")
def run(self):
selection = main.project.get_selection()
if not selection.has_key(self._gene_id_name):
logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._gene_id_name, selection.keys()))
return None
if len(selection[self._gene_id_name])==0:
logger.log("notice", "No selected genes to query")
return None
gene_ids = selection[self._gene_id_name]
go_ids = set()
for gene in gene_ids:
go_ids_new = self._gene2go.get(gene, [])
if not go_ids_new:
logger.log("notice", "Could not find any goterms for %s" %gene)
go_ids.update(self._gene2go.get(gene, []))
main.project.set_selection('go-terms', go_ids)
logger.log("notice", "GO terms updated")
class MapGO2Gene(workflow.Function):
def __init__(self, ont='bp', gene_id_name='go-terms'):
self._ont = ont
self._gene_id_name = gene_id_name
workflow.Function.__init__(self, 'go2gene', 'GO->gene')
# load data at init
try:
fname = "/home/flatberg/fluents/data/go2gene.pcl"
self._go2gene = cPickle.load(open(fname))
except:
logger.log("notice", "could not load mapping")
def run(self):
selection = main.project.get_selection()
if not selection.has_key(self._gene_id_name):
logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._gene_id_name, selection.keys()))
return None
if len(selection[self._gene_id_name])==0:
logger.log("notice", "No selected genes to query")
return None
go_ids = selection[self._gene_id_name]
gene_ids = set()
for go in go_ids:
if not self._go2gene.get(go,[]):
logger.log("notice", "Could not find any gene ids for %s" %go)
gene_ids.update(self._go2gene.get(go,[]))
main.project.set_selection('gene_ids', gene_ids)
logger.log("notice", "GO terms updated")
class SubgraphQuery(workflow.Function):
def __init__(self, graph='kegg', dim='gene_ids'):
self._gtype = graph
self._dim = dim
workflow.Function.__init__(self, 'keggraph', 'KeggGraph')
def run(self, Dw, DA):
max_edge_ratio = .20
max_cov_ratio = .25
neigh_type = 'cov'
neigh_type = 'cosine'
#neigh_type = 'heat'
# 1.) Operate on a subset selection
selection = main.project.get_selection()
if not selection.has_key(self._dim):
print "not"
logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._dim, selection.keys()))
return None
if len(selection[self._dim]) == 0:
print "not3"
logger.log("notice", "No selected genes to query")
return None
Dw = Dw.subdata(self._dim, selection[self._dim])
print Dw.shape
# 2.) Pairwise goodness in loading space
W = Dw.asarray()
if neigh_type == 'cov':
W -= W.mean(1)[:,scipy.newaxis]
wcov = scipy.dot(W, W.T)/(W.shape[1]-1)
wcov_min = wcov.max()*max_cov_ratio
indices = scipy.where(wcov >= wcov_min)[0]
elif neigh_type == 'cosine':
import hcluster
dp = hcluster.squareform(hcluster.pdist(W, 'cosine'))
min_dist = dp.max()*0.1
p1, p2 = scipy.where(dp <= min_dist)
acc_gene_ids = Dw.get_identifiers(self._dim, indices=indices)
# 3.) Subgraphs
G = DA.asnetworkx()
common_gids = [i for i in G.nodes() if i in acc_gene_ids]
G = nx.subgraph(G, common_gids)
S = nx.connected_component_subgraphs(G)
n = map(len, S)
print n
SS = [s for s in S if len(s)>=3]
if not SS:
print "No subgraphs here"
return None
# 4.) Identify close subgraphs
# 5.) Rank subgraphs
#main.project.set_selection('gene_ids', acc_gene_ids)
#logger.log("notice", "Gene ids updated")
plt = GraphQueryScatterPlot(SS, Dw)
return [plt]
class GraphQueryScatterPlot(plots.ScatterPlot):
def __init__(self, S, Dw, *args, **kw):
self.S = S
self.Dw = Dw
id_dim, sel_dim = Dw.get_dim_name()
self._dim = id_dim
id_1, = Dw.get_identifiers(sel_dim, [0])
id_2, = Dw.get_identifiers(sel_dim, [1])
print id_1
print id_2
plots.ScatterPlot.__init__(self, Dw, Dw, id_dim, sel_dim, id_1, id_2, c='g', s=50,name="Kegg test", alpha=.5)
self.overlay_subgraphs()
def overlay_subgraphs(self):
for subgraph in self.S:
nodes = subgraph.nodes()
print self._dim
sub_dw = self.Dw.subdata(self._dim, nodes)
nodes = sub_dw.get_identifiers(self._dim, sorted=True)
xy = sub_dw.asarray()[:,:2]
pos = {}
for i, node in enumerate(nodes):
pos[node] = xy[i]
nx.draw_networkx_nodes(subgraph, pos, node_size=200, ax=self.axes, zorder=3)
nx.draw_networkx_edges(subgraph, pos, ax=self.axes, zorder=3)
def on_update(self, selection, graph):
g = nx.subgraph(graph, selection)
S = nx.connected_components_subgraphs(g)
def _subgraph_score(self, subgraph):
pass