diff --git a/workflows/smokers.py b/workflows/smokers.py new file mode 100644 index 0000000..09f40fd --- /dev/null +++ b/workflows/smokers.py @@ -0,0 +1,275 @@ +import sys,os +import webbrowser + +from fluents import logger, plots,workflow,dataset +from fluents.lib import blmfuncs,nx_utils,validation,engines,cx_stats,cx_utils +import gobrowser, geneontology +import scipy +import networkx as nx + + +class SmallTestWorkflow(workflow.Workflow): + name = 'Smokers' + ident = 'smokers' + description = 'A small test workflow for gene expression analysis.' + + def __init__(self, app): + workflow.Workflow.__init__(self, app) + + # DATA IMPORT + load = workflow.Stage('load', 'Data') + load.add_function(DatasetLoadFunctionSmokerSmall()) + load.add_function(DatasetLoadFunctionSmokerMedium()) + load.add_function(DatasetLoadFunctionSmokerFull()) + #load.add_function(DatasetLoadFunctionCYCLE()) + self.add_stage(load) + + # PREPROCESSING + prep = workflow.Stage('prep', 'Preprocessing') + prep.add_function(LogFunction()) + self.add_stage(prep) + + # NETWORK PREPROCESSING + net = workflow.Stage('net', 'Network integration') + net.add_function(DiffKernelFunction()) + net.add_function(ModKernelFunction()) + #net.add_function(RandDiffKernelFunction()) + self.add_stage(net) + + # BLM's + model = workflow.Stage('models', 'Models') + model.add_function(blmfuncs.PCA()) + model.add_function(blmfuncs.PLS()) + + #model.add_function(bioconFuncs.SAM(app)) + self.add_stage(model) + + query = workflow.Stage('query', 'Gene Query') + query.add_function(NCBIQuery()) + query.add_function(KEGGQuery()) + self.add_stage(query) + + go = workflow.Stage('go', 'Gene Ontology') + go.add_function(gobrowser.LoadGOFunction()) + go.add_function(gobrowser.GOWeightFunction()) + go.add_function(gobrowser.TTestFunction()) + self.add_stage(go) + + # EXTRA PLOTS + #plt = workflow.Stage('net', 'Network') + #plt.add_function(nx_analyser.KeggNetworkAnalyser()) + #self.add_stage(plt) + + + logger.log('debug', 'Small test workflow is now active') + + +class DatasetLoadFunctionSmokerSmall(workflow.Function): + """Loader for all ftsv files of smokers small datasets.""" + def __init__(self): + workflow.Function.__init__(self, 'load_small', 'Smoker (Small)') + + def run(self): + path = 'data/smokers-small/' + files = os.listdir(path) + out = [] + for fname in files: + if fname.endswith('.ftsv'): + input_file = open(os.path.join(path, fname)) + out.append(dataset.read_ftsv(input_file)) + return out + + +class DatasetLoadFunctionSmokerMedium(workflow.Function): + """Loader for all ftsv files of smokers small datasets.""" + def __init__(self): + workflow.Function.__init__(self, 'load_medium', 'Smoker (Medium)') + + def run(self): + path = 'data/smokers-medium/' + files = os.listdir(path) + out = [] + for fname in files: + if fname.endswith('.ftsv'): + input_file = open(os.path.join(path, fname)) + out.append(dataset.read_ftsv(input_file)) + return out + + +class DatasetLoadFunctionSmokerFull(workflow.Function): + """Loader for all ftsv files of smokers small datasets.""" + def __init__(self): + workflow.Function.__init__(self, 'load_full', 'Smoker (Full)') + + def run(self): + path = 'data/smokers-full/' + files = os.listdir(path) + out = [] + for fname in files: + if fname.endswith('.ftsv'): + input_file = open(os.path.join(path, fname)) + out.append(dataset.read_ftsv(input_file)) + return out + + +class DatasetLoadFunctionCYCLE(workflow.Function): + """Loader for pickled CYCLE datasets.""" + def __init__(self): + workflow.Function.__init__(self, 'load_data', 'Cycle') + + def run(self): + filename='fluents/data/CYCLE' + if filename: + return dataset.from_file(filename) + + +##### WORKFLOW SPECIFIC FUNCTIONS ###### + + +class DiffKernelFunction(workflow.Function): + def __init__(self): + workflow.Function.__init__(self, 'diffkernel', 'Diffusion') + + def run(self, x, a): + """x is gene expression data, a is the network. + """ + #sanity check: + g = a.asnetworkx() + genes = x.get_identifiers(x.get_dim_name(1), sorted=True) + W = nx.adj_matrix(g, nodelist=genes) + X = x.asarray() + Xc, mn_x = cx_utils.mat_center(X, ret_mn=True) + out = [] + alpha=1.0 + beta = 1.0 + K = nx_utils.K_diffusion(W, alpha=alpha, beta=beta,normalised=True) + Xp = scipy.dot(Xc, K) + mn_x + # dataset + row_ids = (x.get_dim_name(0), + x.get_identifiers(x.get_dim_name(0), + sorted=True)) + col_ids = (x.get_dim_name(1), + x.get_identifiers(x.get_dim_name(1), + sorted=True)) + + xout = dataset.Dataset(Xp, + (row_ids, col_ids), + name=x.get_name()+'_diff'+str(alpha)) + out.append(xout) + + return out + + +class RandDiffKernelFunction(workflow.Function): + def __init__(self): + workflow.Function.__init__(self, 'diffkernel', 'Rand. Diff.') + + def run(self, x, a): + """x is gene expression data, a is the network. + """ + #sanity check: + g = a.asnetworkx() + genes = x.get_identifiers(x.get_dim_name(1)) + # randomise nodelist + genes = [genes[i] for i in cx_utils.randperm(x.shape[1])] + W = nx.adj_matrix(g, nodelist=genes) + X = x.asarray() + Xc, mn_x = cx_utils.mat_center(X, ret_mn=True) + out = [] + alpha=1. + beta = 1.0 + K = nx_utils.K_diffusion(W, alpha=alpha, beta=beta,normalised=True) + + Xp = scipy.dot(Xc, K) + mn_x + # dataset + row_ids = (x.get_dim_name(0), + x.get_identifiers(x.get_dim_name(0), + sorted=True)) + col_ids = (x.get_dim_name(1), + x.get_identifiers(x.get_dim_name(1), + sorted=True)) + + xout = dataset.Dataset(Xp, + (row_ids, col_ids), + name=x.get_name()+'_diff'+str(alpha)) + out.append(xout) + + return out + + +class ModKernelFunction(workflow.Function): + def __init__(self): + workflow.Function.__init__(self, 'mokernel', 'Modularity') + + def run(self,x,a): + X = x.asarray() + g = a.asnetworkx() + genes = x.get_identifiers(x.get_dim_name(1), sorted=True) + W = nx.adj_matrix(g, nodelist=genes) + out=[] + alpha=.2 + Xc,mn_x = cx_utils.mat_center(X, ret_mn=True) + K = nx_utils.K_modularity(W, alpha=alpha) + Xp = scipy.dot(Xc, K) + Xp = Xp + mn_x + + # dataset + row_ids = (x.get_dim_name(0), + x.get_identifiers(x.get_dim_name(0), + sorted=True)) + col_ids = (x.get_dim_name(1), + x.get_identifiers(x.get_dim_name(1), + sorted=True)) + xout = dataset.Dataset(Xp, + (row_ids,col_ids), + name=x.get_name()+'_mod'+str(alpha)) + out.append(xout) + return out + + +class NCBIQuery(workflow.Function): + def __init__(self): + workflow.Function.__init__(self, 'query', 'NCBI') + + def run(self, selection): + base = 'http://www.ncbi.nlm.nih.gov/entrez/query.fcgi?' + options = {r'&db=' : 'gene', + r'&cmd=' : 'retrieve', + r'&dopt=' : 'full_report'} + gene_str = ''.join([gene + "+" for gene in selection['gene_id']]) + options[r'&list_uids='] = gene_str[:-1] + opt_str = ''.join([key+value for key,value in options.items()]) + web_str = base + opt_str + webbrowser.open(web_str) + + +class KEGGQuery(workflow.Function): + def __init__(self): + workflow.Function.__init__(self, 'query', 'KEGG') + + def run(self, selection): + if not selection.has_key('genes') \ + or not selection.has_key('orfs'): + return None + + base = r'http://www.genome.jp/dbget-bin/www_bget?' + options = {'org' : 'gene'} + org = 'hsa' + gene_str = ''.join([gene + "+" for gene in selection['gene_id']]) + gene_str = gene_str[:-1] + gene_str = org + "+" + gene_str + web_str = base + gene_str + webbrowser.open(web_str) + + +class LogFunction(workflow.Function): + def __init__(self): + workflow.Function.__init__(self, 'log', 'Log') + + def run(self, data): + logger.log('notice', 'Taking the log of dataset %s' % data.get_name()) + d = data.copy() + d._array = scipy.log(d._array) + d._name = 'log(%s)' % data.get_name() + return [d] +