diff --git a/workflows/demo.py b/workflows/demo.py new file mode 100644 index 0000000..3dee4f7 --- /dev/null +++ b/workflows/demo.py @@ -0,0 +1,518 @@ +import sys,os +import os.path +import webbrowser +import cPickle + +import scipy +import networkx as nx + +from fluents import logger,plots,workflow,dataset,main +from fluents.lib import blmfuncs,nx_utils,cx_utils + +import gobrowser + + +class SmallTestWorkflow(workflow.Workflow): + name = 'Demo' + ident = 'demo' + description = 'A small test workflow for gene expression analysis.' + chip = 'hgu' + def __init__(self): + workflow.Workflow.__init__(self) + + # DATA IMPORT + load = workflow.Stage('load', 'Data') + + load_small = LoadDataFunction('load-small', 'Small', self) + load.add_function(load_small) + + load_medium = LoadDataFunction('load-geneid', 'GeneID', self, 'geneid') + load.add_function(load_medium) + + load_medium = LoadDataFunction('load-full', 'FullChip', self, 'full') + load.add_function(load_medium) + + self.add_stage(load) + + # NETWORK PREPROCESSING + #net = workflow.Stage('net', 'Network integration') + #net.add_function(DiffKernelFunction()) + #net.add_function(ModKernelFunction()) + #self.add_stage(net) + + # Models + model = workflow.Stage('models', 'Models') + model.add_function(blmfuncs.PCA()) + model.add_function(blmfuncs.PLS()) + model.add_function(SAM()) + self.add_stage(model) + + query = workflow.Stage('query', 'Gene Query') + query.add_function(NCBIQuery()) + query.add_function(KEGGQuery()) + query.add_function(SubgraphQuery()) + self.add_stage(query) + + # Background knowledge + go = workflow.Stage('go', 'Gene Ontology') + go.add_function(gobrowser.PlotDagFunction()) + go.add_function(GoEnrichment()) + go.add_function(GoEnrichmentCond()) + go.add_function(MapGO2Gene()) + go.add_function(MapGene2GO()) + self.add_stage(go) + + # EXTRA PLOTS + #plt = workflow.Stage('net', 'Network') + #plt.add_function(nx_analyser.KeggNetworkAnalyser()) + #self.add_stage(plt) + + logger.log('debug', 'Small test workflow is now active') + + +class LoadDataFunction(workflow.Function): + """Loads all datasets in a given directory.""" + def __init__(self, ident, label, wf, dir=''): + workflow.Function.__init__(self, ident, label) + self._dir = dir + self._wf = wf + + def run(self): + path = os.path.join(main.options.datadir, self._wf.ident, self._dir) + files = os.listdir(path) + out = [] + for fn in files: + if fn.endswith('.ftsv'): + out.append(dataset.read_ftsv(os.path.join(path, fn))) + return out + + + +##### WORKFLOW SPECIFIC FUNCTIONS ###### + + +class SAM(workflow.Function): + def __init__(self, id='sam', name='SAM'): + workflow.Function.__init__(self, id, name) + + def run(self, x, y): + n_iter = 50 #B + alpha = 0.01 #cut off on qvals + + ############### + + # Main function call + + # setup prelimenaries + import rpy + rpy.r.library("siggenes") + rpy.r.library("multtest") + + cl = scipy.dot(y.asarray(), scipy.diag(scipy.arange(y.shape[1]))).sum(1) + data = x.asarray().T + sam = rpy.r.sam(data, cl=cl, B=n_iter, var_equal=False,med=False,s0=scipy.nan,rand=scipy.nan) + qvals = scipy.asarray(rpy.r.slot(sam, "p.value")) + pvals = scipy.asarray(rpy.r.slot(sam, "q.value")) + + sam_index = (qvalsGO') + # load data at init + try: + fname = "/home/flatberg/fluents/data/gene2go.pcl" + self._gene2go = cPickle.load(open(fname)) + except: + logger.log("notice", "could not load mapping") + + def run(self): + selection = main.project.get_selection() + if not selection.has_key(self._gene_id_name): + logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._gene_id_name, selection.keys())) + return None + if len(selection[self._gene_id_name])==0: + logger.log("notice", "No selected genes to query") + return None + + gene_ids = selection[self._gene_id_name] + go_ids = set() + for gene in gene_ids: + go_ids_new = self._gene2go.get(gene, []) + if not go_ids_new: + logger.log("notice", "Could not find any goterms for %s" %gene) + go_ids.update(self._gene2go.get(gene, [])) + main.project.set_selection('go-terms', go_ids) + logger.log("notice", "GO terms updated") + + +class MapGO2Gene(workflow.Function): + def __init__(self, ont='bp', gene_id_name='go-terms'): + self._ont = ont + self._gene_id_name = gene_id_name + workflow.Function.__init__(self, 'go2gene', 'GO->gene') + # load data at init + try: + fname = "/home/flatberg/fluents/data/go2gene.pcl" + self._go2gene = cPickle.load(open(fname)) + except: + logger.log("notice", "could not load mapping") + + def run(self): + selection = main.project.get_selection() + if not selection.has_key(self._gene_id_name): + logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._gene_id_name, selection.keys())) + return None + if len(selection[self._gene_id_name])==0: + logger.log("notice", "No selected genes to query") + return None + + go_ids = selection[self._gene_id_name] + gene_ids = set() + for go in go_ids: + if not self._go2gene.get(go,[]): + logger.log("notice", "Could not find any gene ids for %s" %go) + gene_ids.update(self._go2gene.get(go,[])) + main.project.set_selection('gene_ids', gene_ids) + logger.log("notice", "GO terms updated") + + +class SubgraphQuery(workflow.Function): + def __init__(self, graph='kegg', dim='gene_ids'): + self._gtype = graph + self._dim = dim + + workflow.Function.__init__(self, 'keggraph', 'KeggGraph') + + def run(self, Dw, DA): + max_edge_ratio = .20 + max_cov_ratio = .25 + neigh_type = 'cov' + neigh_type = 'cosine' + #neigh_type = 'heat' + # 1.) Operate on a subset selection + selection = main.project.get_selection() + if not selection.has_key(self._dim): + print "not" + logger.log("notice", "Expected gene ids: %s, but got. %s" %(self._dim, selection.keys())) + return None + if len(selection[self._dim]) == 0: + print "not3" + logger.log("notice", "No selected genes to query") + return None + Dw = Dw.subdata(self._dim, selection[self._dim]) + print Dw.shape + + # 2.) Pairwise goodness in loading space + W = Dw.asarray() + if neigh_type == 'cov': + W -= W.mean(1)[:,scipy.newaxis] + wcov = scipy.dot(W, W.T)/(W.shape[1]-1) + wcov_min = wcov.max()*max_cov_ratio + indices = scipy.where(wcov >= wcov_min)[0] + elif neigh_type == 'cosine': + import hcluster + dp = hcluster.squareform(hcluster.pdist(W, 'cosine')) + min_dist = dp.max()*0.1 + p1, p2 = scipy.where(dp <= min_dist) + + acc_gene_ids = Dw.get_identifiers(self._dim, indices=indices) + + # 3.) Subgraphs + G = DA.asnetworkx() + common_gids = [i for i in G.nodes() if i in acc_gene_ids] + G = nx.subgraph(G, common_gids) + S = nx.connected_component_subgraphs(G) + n = map(len, S) + print n + SS = [s for s in S if len(s)>=3] + if not SS: + print "No subgraphs here" + return None + # 4.) Identify close subgraphs + + # 5.) Rank subgraphs + + #main.project.set_selection('gene_ids', acc_gene_ids) + #logger.log("notice", "Gene ids updated") + plt = GraphQueryScatterPlot(SS, Dw) + return [plt] + +class GraphQueryScatterPlot(plots.ScatterPlot): + def __init__(self, S, Dw, *args, **kw): + self.S = S + self.Dw = Dw + id_dim, sel_dim = Dw.get_dim_name() + self._dim = id_dim + id_1, = Dw.get_identifiers(sel_dim, [0]) + id_2, = Dw.get_identifiers(sel_dim, [1]) + print id_1 + print id_2 + + plots.ScatterPlot.__init__(self, Dw, Dw, id_dim, sel_dim, id_1, id_2, c='g', s=50,name="Kegg test", alpha=.5) + self.overlay_subgraphs() + + def overlay_subgraphs(self): + for subgraph in self.S: + nodes = subgraph.nodes() + print self._dim + + sub_dw = self.Dw.subdata(self._dim, nodes) + nodes = sub_dw.get_identifiers(self._dim, sorted=True) + xy = sub_dw.asarray()[:,:2] + pos = {} + for i, node in enumerate(nodes): + pos[node] = xy[i] + nx.draw_networkx_nodes(subgraph, pos, node_size=200, ax=self.axes, zorder=3) + nx.draw_networkx_edges(subgraph, pos, ax=self.axes, zorder=3) + + + + def on_update(self, selection, graph): + g = nx.subgraph(graph, selection) + S = nx.connected_components_subgraphs(g) + + def _subgraph_score(self, subgraph): + pass